需求描述:
1、數(shù)據(jù)從 Kafka 寫入 Mongo。
2、相關(guān)配置存放于 Mysql 中,通過(guò) Mysql 進(jìn)行動(dòng)態(tài)讀取。
3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。
4、Kafka 數(shù)據(jù)為 Json 格式,獲取到的數(shù)據(jù)根據(jù)操作類型字段進(jìn)行增刪改操作。
5、讀取時(shí)使用自定義 Source,寫入時(shí)使用自定義 Sink。
6、消費(fèi) Kafka 數(shù)據(jù)時(shí)自定義反序列化。
7、Mongo 使用 Document 進(jìn)行封裝操作。
8、此示例中通過(guò) db.collection 傳參的方式進(jìn)行。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-834536.html
1)導(dǎo)入依賴
這里的依賴比較冗余,大家可以根據(jù)各自需求做刪除或保留。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-834536.html
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>gaei.cn.x5l</groupId>
<artifactId>x8vbusiness</artifactId>
<version>1.0.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<flink.version>1.14.0</flink.version>
<log4j.version>2.17.2</log4j.version>
<hadoop.version>3.1.2</hadoop.version>
<hive.version>3.1.2</hive.version>
<mongo.driver.version>3.12.6</mongo.driver.version>
<mongo.driver.core.version>4.3.1</mongo.driver.core.version>
</properties>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- 基礎(chǔ)依賴 開始-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 基礎(chǔ)依賴 結(jié)束-->
<!-- TABLE 開始-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>1.14.0</version>
<scope>provided</scope>
</dependency>
<!-- 使用 hive sql時(shí)注銷,其他時(shí)候可以放開 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- TABLE 結(jié)束-->
<!-- sql 開始-->
<!-- sql解析 開始 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- sql解析 結(jié)束 -->
<!-- sql連接 kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- sql 結(jié)束-->
<!-- 檢查點(diǎn) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.5</version>
<scope>compile</scope>
</dependency>
<!-- 本地監(jiān)控任務(wù) 開始 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 本地監(jiān)控任務(wù) 結(jié)束 -->
<!-- DataStream 開始 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 重點(diǎn),容易被忽略的jar -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- rocksdb_2 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 其他 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.23</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jyaml</groupId>
<artifactId>jyaml</artifactId>
<version>1.3</version>
</dependency>
<!-- TABLE 開始-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<!-- <version>${flink.version}</version>-->
<version>1.13.5</version>
<scope>provided</scope>
</dependency>
<!-- TABLE 結(jié)束-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!-- <version>5.1.44</version>-->
<version>8.0.27</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>${mongo.driver.core.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>${mongo.driver.core.version}</version>
</dependency>
<!-- 使用 mongodb-driver 重新打包成的 custom-mongo-core -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.12.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.apache.flink:flink-runtime-web_2.11</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass>
</transformer>
<!-- flink sql 需要 -->
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- ... -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2)resources
2.1.appconfig.yml
mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"
2.2.application.properties
url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1
# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo
mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456
#envType=PRE
envType=PRD
# mysql druid 連接池生產(chǎn)環(huán)境連接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生產(chǎn)
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化連接數(shù)
druid.initialSize=1
# 最大連接數(shù)
druid.maxActive=5
# 最大等待時(shí)間
druid.maxWait=3000
2.3.log4j.properties
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.4.log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
<Properties>
<property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
<property name="LOG_LEVEL" value="ERROR" />
</Properties>
<appenders>
<console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="${LOG_PATTERN}"/>
<ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
</console>
<File name="log" fileName="tmp/log/job.log" append="false">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
</File>
</appenders>
<loggers>
<root level="${LOG_LEVEL}">
<appender-ref ref="console"/>
<appender-ref ref="log"/>
</root>
</loggers>
</configuration>
3)util
3.1.KafkaMongoUtils
public class KafkaUtils {
public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumerForMongo(List<String> topic) throws IOException {
Properties prop1 = confFromYaml();
//認(rèn)證環(huán)境
String envType = prop1.getProperty("envType");
Properties prop = new Properties();
System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false "
+ "serviceName=\"" + "kafka" + "\" "
+ "useKeyTab=true "
+ "keyTab=\"" + "/opt/conf/test.keytab" + "\" "
+ "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");
// prop.put("bootstrap.servers", "kfk01.pre.x8v.com:9092");
prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));
prop.put("group.id", "Kafka2Mongo_all");
prop.put("auto.offset.reset", "earliest");
prop.put("enable.auto.commit", "false");
prop.put("max.poll.interval.ms", "60000");
prop.put("max.poll.records", "3000");
prop.put("session.timeout.ms", "600000");
// List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
// .collect(Collectors.toList());
prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
return consumer;
}
public static void main(String[] args) throws Exception {
Properties druidConf = KafkaUtils.getDruidConf();
if (druidConf == null) {
throw new RuntimeException("缺少druid相關(guān)配置信息,請(qǐng)檢查");
}
DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);
Connection connection = dataSource.getConnection();
PreparedStatement showDatabases = connection.prepareStatement("\n" +
"select count(*) from tab_factory");
ResultSet resultSet = showDatabases.executeQuery();
while (resultSet.next()) {
String string = resultSet.getString(1);
System.out.println(string);
}
resultSet.close();
showDatabases.close();
connection.close();
}
public static Properties getDruidConf() {
try {
Properties prop = confFromYaml();
String driverClassName = prop.get("druid.driverClassName").toString();
String url = prop.get("druid.url").toString();
String username = prop.get("druid.username").toString();
String password = prop.get("druid.password").toString();
String initialSize = prop.get("druid.initialSize").toString();
String maxActive = prop.get("druid.maxActive").toString();
String maxWait = prop.get("druid.maxWait").toString();
Properties p = new Properties();
p.put("driverClassName", driverClassName);
p.put("url", url);
p.put("username", username);
p.put("password", password);
p.put("initialSize", initialSize);
p.put("maxActive", maxActive);
p.put("maxWait", maxWait);
// p.forEach((k,v)-> System.out.println("連接池屬性 "+k+"="+v));
return p;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// envType PRE PRD
public static Map<String, String> getKafkaKerberos(String envType) {
Map<String, String> map = new HashMap<>();
if ("PRD".equalsIgnoreCase(envType)) {
map.put("principal", "prd@PRD.PRD.COM");
map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");
} else if ("PRE".equalsIgnoreCase(envType)) {
map.put("principal", "pre@PRE.PRE.COM");
map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");
} /*else if ("TEST".equalsIgnoreCase(envType)) {
map.put("principal","test@TEST.TEST.COM");
map.put("bootstrap.servers","test@TEST.TEST.COM");
} */ else {
System.out.println("沒(méi)有該" + envType + "環(huán)境");
throw new RuntimeException("沒(méi)有該" + envType + "環(huán)境");
}
return map;
}
public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {
Properties prop = confFromYaml();
env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//這里會(huì)造成offset提交的延遲
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));
env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
Integer.valueOf(prop.getProperty("restartStrategy")), // 嘗試重啟的次數(shù),不宜過(guò)小,分布式任務(wù)很容易出問(wèn)題(正常情況),建議3-5次
Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延時(shí)
));
// 設(shè)置狀態(tài)后端存儲(chǔ)方式
// env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
// env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));
return env;
}
public static Properties confFromYaml() {
Properties prop = new Properties();
InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
try {
prop.load(resourceStream);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (resourceStream != null) {
resourceStream.close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
return prop;
}
}
3.2.CustomDeSerializationSchema
public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
private static String encoding = "UTF8";
//是否表示l流的最后一條元素,設(shè)置為false,表示數(shù)據(jù)會(huì)源源不斷的到來(lái)
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
//這里返回一個(gè)ConsumerRecord<String,String>類型的數(shù)據(jù),除了原數(shù)據(jù)還包括topic,offset,partition等信息
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
byte[] key = (record.key() == null ? "".getBytes() : record.key());
return new ConsumerRecord<String, String>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
/*這里我沒(méi)有進(jìn)行空值判斷,生產(chǎn)一定記得處理*/
new String(key, encoding),
new String(record.value(), encoding));
}
//指定數(shù)據(jù)的輸入類型
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
});
}
}
4)kafkacdc2mongo
4.1.Kafka2MongoApp
public class Kafka2MongoApp {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hadoop");
String[] split = args[0].split(",");
// String topic = "mongo_" + database + "_" + collection;
List<String> topicList = new ArrayList<>();
Map<String, String> dbAndCol = new HashMap<>();
for (String s : split) {
String[] t = s.split("\\.");
String e = "mongo_" + t[0] + "_" + t[1];
topicList.add(e);
dbAndCol.put(e, s);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
KafkaUtils.setupFlinkEnv(env);
RichSinkFunction<ConsumerRecord<String, String>> sinkFunction = new RichSinkFunction<ConsumerRecord<String, String>>() {
private Stack<MongoClient> connectionPool = new Stack<>();
private String url = null;
@Override
public void open(Configuration parameters) throws Exception {
initPool();
}
/**
* 初始化連接池,設(shè)置參數(shù)。
*/
private void initPool() {
Properties prop = KafkaUtils.confFromYaml();
url = prop.getProperty("url");
try {
for (int i = 0; i < 5; i++) {
MongoClient client = new MongoClient(new MongoClientURI(url));
connectionPool.push(client);
}
} catch (MongoException e) {
e.printStackTrace();
}
}
@Override
public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
MongoClient mongoClient = null;
try {
String topic = record.topic();
String dbAndColstr = dbAndCol.get(topic);
String[] t = dbAndColstr.split("\\.");
String databaseStr = t[0];
String collectionStr = t[1];
Document doc = Document.parse(record.value());
String operationType = doc.getString("operationType");
String documentKey = doc.getString("documentKey");
Object id = documentKey;
id = doc.get("documentKey");
// 從連接池獲取連接
mongoClient = connectionPool.pop();
MongoCollection<Document> collection = null;
try {
collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);
} catch (Exception e) {
try {
mongoClient.close();
} catch (Exception ignore) {
}
// 鏈接過(guò)期
mongoClient = new MongoClient(new MongoClientURI(url));
collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);
}
if ("delete".equalsIgnoreCase(operationType)) {
collection.deleteOne(new Document("id", id));
}
if (documentKey != null && !documentKey.isEmpty() && !"delete".equals(operationType)) {
Document outputDoc = (Document) doc.get("fullDocument");
outputDoc.put("id", id);
try {
collection.deleteOne(new Document("id", id));
} catch (Exception e) {
System.out.println("添加更新前先刪除:異常信息====>>>" + e.getMessage() + "插入的數(shù)據(jù)是\n" + outputDoc);
}
if ("insert".equalsIgnoreCase(operationType) || "update".equalsIgnoreCase(operationType) || "replace".equalsIgnoreCase(operationType)) {
insertOne(collection, outputDoc);
}
}
} catch (Exception e) {
System.out.printf("mongodb 同步異常,原因是%s,topic是%s,value值是\n%s%n", e.getMessage(), record.topic(), record.value());
} finally {
if (mongoClient != null) {
// 把連接放回連接池
connectionPool.push(mongoClient);
}
}
}
@Override
public void close() throws Exception {
for (MongoClient mongoClient : connectionPool) {
try {
mongoClient.close();
} catch (Exception ignore) {
}
}
}
private void insertOne(MongoCollection<Document> collection, Document doc) {
String collectionName = collection.getNamespace().getCollectionName();
//處理特殊字段
handle(collectionName, doc);
collection.insertOne(doc);
}
//如果有時(shí)間字段需要處理示例如下
private void handle(String collectionName, Document doc) {
if (collectionName.equals("test1")) {
//systemTime 是 Date類型,不是String 2023-10-13 11:37:43.238
formatStringTime(doc, "systemTime");
return;
}
if (collectionName.equals("test2")) {
formatStringTime(doc, "time");
return;
}
if (collectionName.equals("test3") || collectionName.equals("timer_record")) {
formatStringTime(doc, "createTime");
formatStringTime(doc, "updateTime");
return;
}
}
//將String 轉(zhuǎn) date
private void formatStringTime(Document doc, String key) {
try {
String time = doc.getString(key);
if (time == null) {
return;
}
Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(time);
doc.put(key, parse);
} catch (Exception e) {
e.printStackTrace();
}
}
};
env.addSource(KafkaUtils.getKafkaConsumerForMongo(topicList))
.keyBy(e -> {
Document doc = Document.parse(e.value());
return doc.getString("documentKey");
})
.addSink(sinkFunction);
env.execute("kafka2mongo synchronization " + topicList);
}
}
到了這里,關(guān)于【Flink-Kafka-To-Mongo】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Mongo(根據(jù)對(duì)應(yīng)操作類型進(jìn)行增、刪、改操作,寫入時(shí)對(duì)時(shí)間類型字段進(jìn)行單獨(dú)處理)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!