1、場景
kafka日志數(shù)據(jù)從kafka讀取
1、關聯(lián)字典表:完善日志數(shù)據(jù)
2、判斷日志內(nèi)容級別:多路輸出
低級:入clickhouse文章來源:http://www.zghlxwxcb.cn/news/detail-703762.html
高級:入clickhouse的同時推送到kafka供2次數(shù)據(jù)流程處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-703762.html
2、實現(xiàn)
package com.ws.kafka2clickhouse;
import cn.hutool.json.JSONUtil;
import com.ws.kafka2clickhouse.bean.CompanyInfo;
import com.ws.kafka2clickhouse.bean.LogEvent;
import com.ws.kafka2clickhouse.sink.MyClickHouseSink;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
public class Kafka2ClickHouse {
public static void main(String[] args) throws Exception {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("HADOOP_USER_NAME", "hdfs");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
// env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
// env.getCheckpointConfig().setCheckpointStorage("file:///D:/out_test/ck");
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp01:8020/tmp/kafka2hdfs/");
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、讀取主流日志數(shù)據(jù)
KafkaSource<String> build = KafkaSource.<String>builder()
.setTopics("dataSource")
.setGroupId("group1")
.setBootstrapServers("hdp01:6667")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafka = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka");
// 2、主流數(shù)據(jù)json轉(zhuǎn)換成POJO對象
SingleOutputStreamOperator<LogEvent> beans = kafka.map((MapFunction<String, LogEvent>) s -> JSONUtil.toBean(s, LogEvent.class));
// 3、加載字典表cdc流
tenv.executeSql(
"CREATE TABLE dmpt_base_oper_log (\n" +
"id bigInt primary key," +
"title String" +
") WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'localhost',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = 'root',\n" +
"'database-name' = 'test',\n" +
"'table-name' = 'test_recursive'\n" +
")"
);
Table result = tenv.sqlQuery("select * from dmpt_base_oper_log");
DataStream<Row> dict = tenv.toChangelogStream(result);
dict.print();
// 4、加工字典數(shù)據(jù),并組裝上 字典表更新類型
SingleOutputStreamOperator<CompanyInfo> companyDict = dict.map(new RichMapFunction<Row, CompanyInfo>() {
@Override
public CompanyInfo map(Row row) throws Exception {
Long id = (Long) row.getField("id");
String title = (String) row.getField("title");
// 攜帶上cdc數(shù)據(jù)的數(shù)據(jù)類型,《新增,刪除,修改》
RowKind kind = row.getKind();
return new CompanyInfo(id, title, kind);
}
});
// 5、對字典數(shù)據(jù)進行廣播
MapStateDescriptor<Long, CompanyInfo> company_info_desc = new MapStateDescriptor<>("company_info_dict", Long.class, CompanyInfo.class);
BroadcastStream<CompanyInfo> broadcastStream = companyDict.broadcast(company_info_desc);
// 6、創(chuàng)建測流
OutputTag<String> tokafka = new OutputTag<String>("tokafka") {
};
SingleOutputStreamOperator<LogEvent> beans_company = beans.connect(broadcastStream).process(new BroadcastProcessFunction<LogEvent, CompanyInfo, LogEvent>() {
@Override
public void processElement(LogEvent logEvent, ReadOnlyContext readOnlyContext, Collector<LogEvent> collector) throws Exception {
// 新來一條數(shù)據(jù)流,處理方法
ReadOnlyBroadcastState<Long, CompanyInfo> broadcastState = readOnlyContext.getBroadcastState(company_info_desc);
CompanyInfo companyInfo = broadcastState.get(logEvent.getMessageId());
// 7、如果有單位信息,代表為高級用戶數(shù)據(jù),將消息同時吐到kafka,因此再輸出到主流的同時往測流中也輸出一份
if (companyInfo != null) {
logEvent.setCompanyInfo(companyInfo);
readOnlyContext.output(tokafka, JSONUtil.toJsonStr(logEvent));
}
collector.collect(logEvent);
}
@Override
public void processBroadcastElement(CompanyInfo companyInfo, Context context, Collector<LogEvent> collector) throws Exception {
// 新來一條廣播流,處理方法
BroadcastState<Long, CompanyInfo> broadcastState = context.getBroadcastState(company_info_desc);
// 新增
if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.INSERT.name())) {
broadcastState.put(companyInfo.getId(), companyInfo);
} else if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.DELETE.name())) {
// 刪除
broadcastState.remove(companyInfo.getId());
} else {
// 修改
broadcastState.remove(companyInfo.getId());
broadcastState.put(companyInfo.getId(), companyInfo);
}
}
});
//準備向ClickHouse中插入數(shù)據(jù)的sql
String insetIntoCkSql = "insert into default.dns_logs values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
//設置ClickHouse Sink
SinkFunction<LogEvent> sink = JdbcSink.sink(
//插入數(shù)據(jù)SQL
insetIntoCkSql,
//設置插入ClickHouse數(shù)據(jù)的參數(shù)
new JdbcStatementBuilder<LogEvent>() {
@Override
public void accept(PreparedStatement preparedStatement, LogEvent logEvent) throws SQLException {
try {
preparedStatement.setString(1, logEvent.getMessageType());
preparedStatement.setLong(2, logEvent.getMessageId());
preparedStatement.setString(3, logEvent.getDeviceId());
preparedStatement.setString(4, logEvent.getCol1());
preparedStatement.setString(5, logEvent.getCol2());
preparedStatement.setString(6, logEvent.getCol3());
preparedStatement.setString(7, logEvent.getCol4());
preparedStatement.setString(8, logEvent.getHeaders().getDeviceTime());
preparedStatement.setLong(9, logEvent.getHeaders().get_uid());
preparedStatement.setString(10, logEvent.getHeaders().getProductId());
preparedStatement.setString(11, logEvent.getHeaders().getOrgId());
if (logEvent.getCompanyInfo() != null) {
preparedStatement.setString(12, logEvent.getCompanyInfo().getTitle());
} else {
preparedStatement.setString(12, null);
}
preparedStatement.setString(13, logEvent.getRegion());
} catch (SQLException e) {
e.printStackTrace();
}
}
},
//設置批次插入數(shù)據(jù)
new JdbcExecutionOptions.Builder()
// 批次大小,默認5000
.withBatchSize(10000)
// 批次間隔時間
.withBatchIntervalMs(5000).
withMaxRetries(3).build(),
//設置連接ClickHouse的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://192.16.10.118:1111")
.withUsername("default")
.withPassword("xxxx")
.build()
);
// 8、所有數(shù)據(jù)進入基礎庫
beans_company.addSink(sink);
beans_company.print("基礎庫clickhouse");
// 9、高級用戶同時推送到分析kafka
DataStream<String> sideOutput = beans_company.getSideOutput(tokafka);
sideOutput.print("增強分析kafka");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hdp01:6667");
// 10、構(gòu)建kafka sink
KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>(
"dataZengQiang", // target topic
element.getBytes(StandardCharsets.UTF_8)); // record contents
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"dataZengQiang", // target topic
serializationSchema, // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
// 11、寫入kafka
sideOutput.addSink(myProducer);
env.execute();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>test</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink</artifactId>
<properties>
<flink.version>1.13.2</flink.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flinkSql 需要的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- clickhouse驅(qū)動 -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<!-- flink-cdc-mysql 連接器-->
<dependency>
<groupId>com.ws</groupId>
<artifactId>mysql-cdc</artifactId>
<version>2.2.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/flink-connector-mysql-cdc-2.3-SNAPSHOT.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- 把依賴打進jar包 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
到了這里,關于flink-cdc,clickhouse寫入,多路輸出的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!