国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Flink-Kafka-To-Mongo】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Mongo(根據(jù)對(duì)應(yīng)操作類型進(jìn)行增、刪、改操作,寫入時(shí)對(duì)時(shí)間類型字段進(jìn)行單獨(dú)處理)

這篇具有很好參考價(jià)值的文章主要介紹了【Flink-Kafka-To-Mongo】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Mongo(根據(jù)對(duì)應(yīng)操作類型進(jìn)行增、刪、改操作,寫入時(shí)對(duì)時(shí)間類型字段進(jìn)行單獨(dú)處理)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

需求描述:

1、數(shù)據(jù)從 Kafka 寫入 Mongo。

2、相關(guān)配置存放于 Mysql 中,通過(guò) Mysql 進(jìn)行動(dòng)態(tài)讀取。

3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。

4、Kafka 數(shù)據(jù)為 Json 格式,獲取到的數(shù)據(jù)根據(jù)操作類型字段進(jìn)行增刪改操作。

5、讀取時(shí)使用自定義 Source,寫入時(shí)使用自定義 Sink。

6、消費(fèi) Kafka 數(shù)據(jù)時(shí)自定義反序列化。

7、Mongo 使用 Document 進(jìn)行封裝操作。

8、此示例中通過(guò) db.collection 傳參的方式進(jìn)行。

1)導(dǎo)入依賴

這里的依賴比較冗余,大家可以根據(jù)各自需求做刪除或保留。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-834536.html

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gaei.cn.x5l</groupId>
    <artifactId>x8vbusiness</artifactId>
    <version>1.0.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>

        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.10</scala.version>
        <flink.version>1.14.0</flink.version>
        <log4j.version>2.17.2</log4j.version>
        <hadoop.version>3.1.2</hadoop.version>
        <hive.version>3.1.2</hive.version>

        <mongo.driver.version>3.12.6</mongo.driver.version>
        <mongo.driver.core.version>4.3.1</mongo.driver.core.version>

    </properties>
    <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <exclusions>-->
            <!--                <exclusion>-->
            <!--                    <groupId>mysql</groupId>-->
            <!--                    <artifactId>mysql-connector-java</artifactId>-->
            <!--                </exclusion>-->
            <!--            </exclusions>-->
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- 基礎(chǔ)依賴  開始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 基礎(chǔ)依賴  結(jié)束-->
        <!-- TABLE  開始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>1.14.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- 使用 hive sql時(shí)注銷,其他時(shí)候可以放開 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- TABLE  結(jié)束-->
        <!-- sql  開始-->
        <!-- sql解析 開始 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- sql解析 結(jié)束 -->
        <!-- sql連接 kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- sql  結(jié)束-->
        <!-- 檢查點(diǎn) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.5</version>
            <scope>compile</scope>
        </dependency>

        <!-- 本地監(jiān)控任務(wù) 開始 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 本地監(jiān)控任務(wù) 結(jié)束 -->
        <!-- DataStream 開始 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <!-- hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- 重點(diǎn),容易被忽略的jar -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>


        </dependency>
        <!-- rocksdb_2 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 其他 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.23</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.jyaml</groupId>
            <artifactId>jyaml</artifactId>
            <version>1.3</version>
        </dependency>


        <!-- TABLE  開始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <!--            <version>${flink.version}</version>-->
            <version>1.13.5</version>
            <scope>provided</scope>
        </dependency>


        <!-- TABLE  結(jié)束-->


        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.3</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--            <version>5.1.44</version>-->
            <version>8.0.27</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.8</version>
        </dependency>



        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>bson</artifactId>
            <version>${mongo.driver.core.version}</version>
        </dependency>


        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-core</artifactId>
            <version>${mongo.driver.core.version}</version>
        </dependency>

        <!--    使用 mongodb-driver 重新打包成的 custom-mongo-core  -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.6</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                    <exclude>org.apache.flink:flink-runtime-web_2.11</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass>
                                </transformer>
                                <!-- flink sql 需要  -->
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>

            </plugins>
        </pluginManagement>

    </build>

</project>

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1

# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo

mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456

#envType=PRE
envType=PRD

# mysql  druid 連接池生產(chǎn)環(huán)境連接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生產(chǎn)
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化連接數(shù)
druid.initialSize=1
# 最大連接數(shù)
druid.maxActive=5
# 最大等待時(shí)間
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
    <Properties>
        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
        <property name="LOG_LEVEL" value="ERROR" />
    </Properties>

    <appenders>
        <console name="console" target="SYSTEM_OUT">
            <PatternLayout pattern="${LOG_PATTERN}"/>
            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
        </console>
        <File name="log" fileName="tmp/log/job.log" append="false">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
        </File>
    </appenders>

    <loggers>
        <root level="${LOG_LEVEL}">
            <appender-ref ref="console"/>
            <appender-ref ref="log"/>
        </root>
    </loggers>
</configuration>

3)util

3.1.KafkaMongoUtils

public class KafkaUtils {
    public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumerForMongo(List<String> topic) throws IOException {
        Properties prop1 = confFromYaml();
        //認(rèn)證環(huán)境
        String envType = prop1.getProperty("envType");


        Properties prop = new Properties();

        System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                + "useTicketCache=false  "
                + "serviceName=\"" + "kafka" + "\" "
                + "useKeyTab=true "
                + "keyTab=\"" + "/opt/conf/test.keytab" + "\" "
                + "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");

//        prop.put("bootstrap.servers", "kfk01.pre.x8v.com:9092");
        prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));
        prop.put("group.id", "Kafka2Mongo_all");
        prop.put("auto.offset.reset", "earliest");
        prop.put("enable.auto.commit", "false");
        prop.put("max.poll.interval.ms", "60000");
        prop.put("max.poll.records", "3000");
        prop.put("session.timeout.ms", "600000");

//        List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());

        prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
        prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");


        FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);

        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);

        return consumer;
    }

    public static void main(String[] args) throws Exception {
        Properties druidConf = KafkaUtils.getDruidConf();
        if (druidConf == null) {
            throw new RuntimeException("缺少druid相關(guān)配置信息,請(qǐng)檢查");
        }

        DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);
        Connection connection = dataSource.getConnection();
        PreparedStatement showDatabases = connection.prepareStatement("\n" +
                "select count(*) from tab_factory");
        ResultSet resultSet = showDatabases.executeQuery();
        while (resultSet.next()) {
            String string = resultSet.getString(1);
            System.out.println(string);
        }
        resultSet.close();
        showDatabases.close();

        connection.close();


    }

    public static Properties getDruidConf() {
        try {
            Properties prop = confFromYaml();
            String driverClassName = prop.get("druid.driverClassName").toString();
            String url = prop.get("druid.url").toString();
            String username = prop.get("druid.username").toString();
            String password = prop.get("druid.password").toString();
            String initialSize = prop.get("druid.initialSize").toString();
            String maxActive = prop.get("druid.maxActive").toString();
            String maxWait = prop.get("druid.maxWait").toString();

            Properties p = new Properties();
            p.put("driverClassName", driverClassName);
            p.put("url", url);
            p.put("username", username);
            p.put("password", password);
            p.put("initialSize", initialSize);
            p.put("maxActive", maxActive);
            p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("連接池屬性 "+k+"="+v));
            return p;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }


    // envType     PRE  PRD
    public static Map<String, String> getKafkaKerberos(String envType) {
        Map<String, String> map = new HashMap<>();
        if ("PRD".equalsIgnoreCase(envType)) {
            map.put("principal", "prd@PRD.PRD.COM");
            map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");
        } else if ("PRE".equalsIgnoreCase(envType)) {
            map.put("principal", "pre@PRE.PRE.COM");
            map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");
        } /*else if ("TEST".equalsIgnoreCase(envType)) {
            map.put("principal","test@TEST.TEST.COM");
            map.put("bootstrap.servers","test@TEST.TEST.COM");
        } */ else {
            System.out.println("沒(méi)有該" + envType + "環(huán)境");
            throw new RuntimeException("沒(méi)有該" + envType + "環(huán)境");
        }

        return map;
    }

    public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {
        Properties prop = confFromYaml();
        env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//這里會(huì)造成offset提交的延遲
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));
        env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                Integer.valueOf(prop.getProperty("restartStrategy")), // 嘗試重啟的次數(shù),不宜過(guò)小,分布式任務(wù)很容易出問(wèn)題(正常情況),建議3-5次
                Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延時(shí)
        ));
        // 設(shè)置狀態(tài)后端存儲(chǔ)方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));
        return env;

    }

    public static Properties confFromYaml() {
        Properties prop = new Properties();
        InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
        try {
            prop.load(resourceStream);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (resourceStream != null) {
                    resourceStream.close();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        return prop;
    }
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    private static String encoding = "UTF8";

    //是否表示l流的最后一條元素,設(shè)置為false,表示數(shù)據(jù)會(huì)源源不斷的到來(lái)
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }

    //這里返回一個(gè)ConsumerRecord<String,String>類型的數(shù)據(jù),除了原數(shù)據(jù)還包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        byte[] key = (record.key() == null ? "".getBytes() : record.key());
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                /*這里我沒(méi)有進(jìn)行空值判斷,生產(chǎn)一定記得處理*/
                new  String(key, encoding),
                new  String(record.value(), encoding));
    }

    //指定數(shù)據(jù)的輸入類型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
        });
    }
}

4)kafkacdc2mongo

4.1.Kafka2MongoApp

public class Kafka2MongoApp {

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        String[] split = args[0].split(",");
        // String topic = "mongo_" + database + "_" + collection;
        List<String> topicList = new ArrayList<>();
        Map<String, String> dbAndCol = new HashMap<>();
        for (String s : split) {
            String[] t = s.split("\\.");
            String e = "mongo_" + t[0] + "_" + t[1];
            topicList.add(e);
            dbAndCol.put(e, s);
        }

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
        KafkaUtils.setupFlinkEnv(env);
        RichSinkFunction<ConsumerRecord<String, String>> sinkFunction = new RichSinkFunction<ConsumerRecord<String, String>>() {

            private Stack<MongoClient> connectionPool = new Stack<>();
            private String url = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                initPool();
            }

            /**
             * 初始化連接池,設(shè)置參數(shù)。
             */
            private void initPool() {
                Properties prop = KafkaUtils.confFromYaml();
                url = prop.getProperty("url");

                try {
                    for (int i = 0; i < 5; i++) {
                        MongoClient client = new MongoClient(new MongoClientURI(url));
                        connectionPool.push(client);
                    }
                } catch (MongoException e) {
                    e.printStackTrace();
                }
            }


            @Override
            public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
                MongoClient mongoClient = null;
                try {
                    String topic = record.topic();
                    String dbAndColstr = dbAndCol.get(topic);
                    String[] t = dbAndColstr.split("\\.");
                    String databaseStr = t[0];
                    String collectionStr = t[1];
                    Document doc = Document.parse(record.value());
                    String operationType = doc.getString("operationType");
                    String documentKey = doc.getString("documentKey");
                    Object id = documentKey;
                    id = doc.get("documentKey");

                    // 從連接池獲取連接
                    mongoClient = connectionPool.pop();

                    MongoCollection<Document> collection = null;
                    try {
                        collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);
                    } catch (Exception e) {
                        try {
                            mongoClient.close();
                        } catch (Exception ignore) {
                        }
                        // 鏈接過(guò)期
                        mongoClient = new MongoClient(new MongoClientURI(url));
                        collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);
                    }

                    if ("delete".equalsIgnoreCase(operationType)) {
                        collection.deleteOne(new Document("id", id));
                    }

                    if (documentKey != null && !documentKey.isEmpty() && !"delete".equals(operationType)) {
                        Document outputDoc = (Document) doc.get("fullDocument");
                        outputDoc.put("id", id);
                        try {
                            collection.deleteOne(new Document("id", id));
                        } catch (Exception e) {
                            System.out.println("添加更新前先刪除:異常信息====>>>" + e.getMessage() + "插入的數(shù)據(jù)是\n" + outputDoc);
                        }
                        if ("insert".equalsIgnoreCase(operationType) || "update".equalsIgnoreCase(operationType) || "replace".equalsIgnoreCase(operationType)) {
                            insertOne(collection, outputDoc);
                        }
                    }
                } catch (Exception e) {
                    System.out.printf("mongodb 同步異常,原因是%s,topic是%s,value值是\n%s%n", e.getMessage(), record.topic(), record.value());
                } finally {
                    if (mongoClient != null) {
                        // 把連接放回連接池
                        connectionPool.push(mongoClient);
                    }
                }
            }

            @Override
            public void close() throws Exception {
                for (MongoClient mongoClient : connectionPool) {
                    try {
                        mongoClient.close();
                    } catch (Exception ignore) {
                    }
                }
            }

            private void insertOne(MongoCollection<Document> collection, Document doc) {
                String collectionName = collection.getNamespace().getCollectionName();
                //處理特殊字段
                handle(collectionName, doc);
                collection.insertOne(doc);
            }
			
			//如果有時(shí)間字段需要處理示例如下
            private void handle(String collectionName, Document doc) {
                if (collectionName.equals("test1")) {
                    //systemTime 是 Date類型,不是String  2023-10-13 11:37:43.238
                    formatStringTime(doc, "systemTime");
                    return;
                }
                if (collectionName.equals("test2")) {
                    formatStringTime(doc, "time");
                    return;
                }
                if (collectionName.equals("test3") || collectionName.equals("timer_record")) {
                    formatStringTime(doc, "createTime");
                    formatStringTime(doc, "updateTime");
                    return;
                }
            }

            //將String 轉(zhuǎn) date
            private void formatStringTime(Document doc, String key) {
                try {
                    String time = doc.getString(key);
                    if (time == null) {
                        return;
                    }
                    Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(time);
                    doc.put(key, parse);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        env.addSource(KafkaUtils.getKafkaConsumerForMongo(topicList))
                .keyBy(e -> {
                    Document doc = Document.parse(e.value());
                    return doc.getString("documentKey");
                })
                .addSink(sinkFunction);

        env.execute("kafka2mongo synchronization " + topicList);
    }
}

到了這里,關(guān)于【Flink-Kafka-To-Mongo】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Mongo(根據(jù)對(duì)應(yīng)操作類型進(jìn)行增、刪、改操作,寫入時(shí)對(duì)時(shí)間類型字段進(jìn)行單獨(dú)處理)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包