国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink-cdc,clickhouse寫入,多路輸出

這篇具有很好參考價值的文章主要介紹了flink-cdc,clickhouse寫入,多路輸出。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1、場景

kafka日志數(shù)據(jù)從kafka讀取

1、關聯(lián)字典表:完善日志數(shù)據(jù)

2、判斷日志內(nèi)容級別:多路輸出

低級:入clickhouse

高級:入clickhouse的同時推送到kafka供2次數(shù)據(jù)流程處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-703762.html

2、實現(xiàn)

package com.ws.kafka2clickhouse;

import cn.hutool.json.JSONUtil;
import com.ws.kafka2clickhouse.bean.CompanyInfo;
import com.ws.kafka2clickhouse.bean.LogEvent;
import com.ws.kafka2clickhouse.sink.MyClickHouseSink;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

public class Kafka2ClickHouse {


    public static void main(String[] args) throws Exception {
        System.setProperty("java.net.preferIPv4Stack", "true");
        System.setProperty("HADOOP_USER_NAME", "hdfs");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
//        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
//        env.getCheckpointConfig().setCheckpointStorage("file:///D:/out_test/ck");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp01:8020/tmp/kafka2hdfs/");
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // 1、讀取主流日志數(shù)據(jù)
        KafkaSource<String> build = KafkaSource.<String>builder()
                .setTopics("dataSource")
                .setGroupId("group1")
                .setBootstrapServers("hdp01:6667")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> kafka = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka");
        // 2、主流數(shù)據(jù)json轉(zhuǎn)換成POJO對象
        SingleOutputStreamOperator<LogEvent> beans = kafka.map((MapFunction<String, LogEvent>) s -> JSONUtil.toBean(s, LogEvent.class));
        // 3、加載字典表cdc流
        tenv.executeSql(
                "CREATE TABLE dmpt_base_oper_log (\n" +
                        "id bigInt primary key," +
                        "title String" +
                        ") WITH (\n" +
                        "'connector' = 'mysql-cdc',\n" +
                        "'hostname' = 'localhost',\n" +
                        "'port' = '3306',\n" +
                        "'username' = 'root',\n" +
                        "'password' = 'root',\n" +
                        "'database-name' = 'test',\n" +
                        "'table-name' = 'test_recursive'\n" +
                        ")"
        );
        Table result = tenv.sqlQuery("select * from dmpt_base_oper_log");
        DataStream<Row> dict = tenv.toChangelogStream(result);
        dict.print();
        // 4、加工字典數(shù)據(jù),并組裝上 字典表更新類型
        SingleOutputStreamOperator<CompanyInfo> companyDict = dict.map(new RichMapFunction<Row, CompanyInfo>() {
            @Override
            public CompanyInfo map(Row row) throws Exception {
                Long id = (Long) row.getField("id");
                String title = (String) row.getField("title");
                // 攜帶上cdc數(shù)據(jù)的數(shù)據(jù)類型,《新增,刪除,修改》
                RowKind kind = row.getKind();
                return new CompanyInfo(id, title, kind);
            }
        });
        // 5、對字典數(shù)據(jù)進行廣播
        MapStateDescriptor<Long, CompanyInfo> company_info_desc = new MapStateDescriptor<>("company_info_dict", Long.class, CompanyInfo.class);
        BroadcastStream<CompanyInfo> broadcastStream = companyDict.broadcast(company_info_desc);
        // 6、創(chuàng)建測流
        OutputTag<String> tokafka = new OutputTag<String>("tokafka") {
        };


        SingleOutputStreamOperator<LogEvent> beans_company = beans.connect(broadcastStream).process(new BroadcastProcessFunction<LogEvent, CompanyInfo, LogEvent>() {
            @Override
            public void processElement(LogEvent logEvent, ReadOnlyContext readOnlyContext, Collector<LogEvent> collector) throws Exception {
                // 新來一條數(shù)據(jù)流,處理方法
                ReadOnlyBroadcastState<Long, CompanyInfo> broadcastState = readOnlyContext.getBroadcastState(company_info_desc);
                CompanyInfo companyInfo = broadcastState.get(logEvent.getMessageId());
                // 7、如果有單位信息,代表為高級用戶數(shù)據(jù),將消息同時吐到kafka,因此再輸出到主流的同時往測流中也輸出一份
                if (companyInfo != null) {
                    logEvent.setCompanyInfo(companyInfo);
                    readOnlyContext.output(tokafka, JSONUtil.toJsonStr(logEvent));
                }
                collector.collect(logEvent);
            }

            @Override
            public void processBroadcastElement(CompanyInfo companyInfo, Context context, Collector<LogEvent> collector) throws Exception {
                // 新來一條廣播流,處理方法
                BroadcastState<Long, CompanyInfo> broadcastState = context.getBroadcastState(company_info_desc);
                // 新增
                if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.INSERT.name())) {
                    broadcastState.put(companyInfo.getId(), companyInfo);
                } else if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.DELETE.name())) {
                    // 刪除
                    broadcastState.remove(companyInfo.getId());
                } else {
                    // 修改
                    broadcastState.remove(companyInfo.getId());
                    broadcastState.put(companyInfo.getId(), companyInfo);
                }
            }
        });

        //準備向ClickHouse中插入數(shù)據(jù)的sql
        String insetIntoCkSql = "insert into default.dns_logs values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //設置ClickHouse Sink
        SinkFunction<LogEvent> sink = JdbcSink.sink(
                //插入數(shù)據(jù)SQL
                insetIntoCkSql,
                //設置插入ClickHouse數(shù)據(jù)的參數(shù)
                new JdbcStatementBuilder<LogEvent>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, LogEvent logEvent) throws SQLException {
                        try {
                            preparedStatement.setString(1, logEvent.getMessageType());
                            preparedStatement.setLong(2, logEvent.getMessageId());
                            preparedStatement.setString(3, logEvent.getDeviceId());
                            preparedStatement.setString(4, logEvent.getCol1());
                            preparedStatement.setString(5, logEvent.getCol2());
                            preparedStatement.setString(6, logEvent.getCol3());
                            preparedStatement.setString(7, logEvent.getCol4());
                            preparedStatement.setString(8, logEvent.getHeaders().getDeviceTime());
                            preparedStatement.setLong(9, logEvent.getHeaders().get_uid());
                            preparedStatement.setString(10, logEvent.getHeaders().getProductId());
                            preparedStatement.setString(11, logEvent.getHeaders().getOrgId());
                            if (logEvent.getCompanyInfo() != null) {
                                preparedStatement.setString(12, logEvent.getCompanyInfo().getTitle());
                            } else {
                                preparedStatement.setString(12, null);
                            }
                            preparedStatement.setString(13, logEvent.getRegion());
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                },
                //設置批次插入數(shù)據(jù)
                new JdbcExecutionOptions.Builder()
                        // 批次大小,默認5000
                        .withBatchSize(10000)
                        // 批次間隔時間
                        .withBatchIntervalMs(5000).
                        withMaxRetries(3).build(),
                //設置連接ClickHouse的配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUrl("jdbc:clickhouse://192.16.10.118:1111")
                        .withUsername("default")
                        .withPassword("xxxx")
                        .build()
        );
        // 8、所有數(shù)據(jù)進入基礎庫
        beans_company.addSink(sink);
        beans_company.print("基礎庫clickhouse");
        // 9、高級用戶同時推送到分析kafka
        DataStream<String> sideOutput = beans_company.getSideOutput(tokafka);
        sideOutput.print("增強分析kafka");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hdp01:6667");
        // 10、構(gòu)建kafka sink
        KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                return new ProducerRecord<>(
                        "dataZengQiang", // target topic
                        element.getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };

        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
                "dataZengQiang",             // target topic
                serializationSchema,    // serialization schema
                properties,             // producer config
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
        // 11、寫入kafka
        sideOutput.addSink(myProducer);
        env.execute();
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink</artifactId>
    <properties>
        <flink.version>1.13.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flinkSql 需要的依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>

        <!-- clickhouse驅(qū)動 -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>
        <!-- flink-cdc-mysql 連接器-->
        <dependency>
            <groupId>com.ws</groupId>
            <artifactId>mysql-cdc</artifactId>
            <version>2.2.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/flink-connector-mysql-cdc-2.3-SNAPSHOT.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <!-- 把依賴打進jar包 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

到了這里,關于flink-cdc,clickhouse寫入,多路輸出的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    我一開始是flink-cdc,oracle2Mysql,sql 我一開始直接用的oracle【date】類型,mysql【date】類型,sql的校驗通過了,但是真正操作數(shù)據(jù)的時候報錯,告訴我oracle的數(shù)據(jù)格式的日期數(shù)據(jù),不可以直接插入到mysql格式的日期數(shù)據(jù),說白了就是數(shù)據(jù)格式不一致導致的 我想的是既然格式不對

    2024年02月12日
    瀏覽(25)
  • SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    最近做的一個項目,使用的是pg數(shù)據(jù)庫,公司沒有成熟的DCD組件,為了實現(xiàn)數(shù)據(jù)變更消息發(fā)布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka。 監(jiān)聽數(shù)據(jù)變化,進行異步通知,做系統(tǒng)內(nèi)異步任務。 架構(gòu)方案(懶得寫了,看圖吧): -- 創(chuàng)建pg 高線數(shù)據(jù)同步用

    2024年02月02日
    瀏覽(31)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、達夢等數(shù)據(jù)庫開啟日志方法

    目錄 1. 前言 2. 數(shù)據(jù)源安裝與配置 2.1 MySQL 2.1.1 安裝 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安裝 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安裝 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安裝 2.4.2 CDC 配置 2.5達夢 2.4.1安裝 2.4.2CDC配置 3. 驗證 3.1 Flink版本與CDC版本的對應關系 3.2 下載相關包 3.3 添加cdc jar 至lib目錄 3.4 驗

    2024年02月05日
    瀏覽(122)
  • 【Flink-CDC】Flink CDC 介紹和原理概述

    【Flink-CDC】Flink CDC 介紹和原理概述

    CDC是( Change Data Capture 變更數(shù)據(jù)獲取 )的簡稱。 核心思想是, 監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。 CDC 主要分為基于查詢和基于

    2024年01月20日
    瀏覽(25)
  • flink-cdc-學習筆記(一)

    Flink 1.11 引入了 CDC. Flink CDC 是一款基于 Flink 打造一系列數(shù)據(jù)庫的連接器。Flink 是流處理的引擎,其主要消費的數(shù)據(jù)源是類似于一些點擊的日志流、曝光流等數(shù)據(jù),但在業(yè)務場景中,點擊流的日志數(shù)據(jù)只是一部分,具有更大價值的數(shù)據(jù)隱藏在用戶的業(yè)務數(shù)據(jù)庫中。Flink CDC 彌補

    2024年04月10日
    瀏覽(25)
  • ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務同步mysql表的數(shù)據(jù)到es的實踐

    ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務同步mysql表的數(shù)據(jù)到es的實踐

    ApacheStreamPark是流處理極速開發(fā)框架,流批一體 湖倉一體的云原生平臺,一站式流處理計算平臺。 ??特性中的簡單易用和文檔詳盡這兩點我也是深有體會的,部署一點都不簡單,照著官方文檔都不一定能搞出來,下面部署環(huán)節(jié)慢慢來吐槽吧。 ??之前我們寫 Flink SQL 基本上

    2024年02月11日
    瀏覽(28)
  • Flinkx/Datax/Flink-CDC 優(yōu)劣勢對比

    Flinkx/Datax/Flink-CDC 優(yōu)劣勢對比

    Flinkx/Datax/Flink-CDC 優(yōu)劣勢對比_HiBoyljw的博客-CSDN博客 ? ? ? ?FlinkX是一款基于Flink的分布式離線/實時數(shù)據(jù)同步插件,可實現(xiàn)多種異構(gòu)數(shù)據(jù)源高效的數(shù)據(jù)同步,其由袋鼠云于2016年初步研發(fā)完成,目前有穩(wěn)定的研發(fā)團隊持續(xù)維護,已在Github上開源(開源地址詳見文章末尾),并維

    2024年02月07日
    瀏覽(21)
  • 【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    Flink 安裝的教程就不在這里贅敘了,可以看一下以前的文章,這篇文章主要是把流式數(shù)據(jù)寫入的OLAP(ClickHouse)中作查詢分析 Flink 1.13.2, ClickHouse?22.1.3.7 這里直接使用docker安裝,沒有安裝的同學可以使用homebreak來安裝,執(zhí)行下面的命令即可( 已經(jīng)安裝了docker的可以忽略 ) 四指

    2024年02月03日
    瀏覽(26)
  • Flink寫入數(shù)據(jù)到ClickHouse

    Flink寫入數(shù)據(jù)到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依賴 Flink開發(fā)相關依賴 3.Bean實體類 User.java 4.ClickHouse業(yè)務寫入邏輯 ClickHouseSinkFunction.java open():在SinkFunction實例化后調(diào)用,用于初始化連接或資源。這在處理每個并行任務的子任務之前只被調(diào)用一次。 invoke():定義了在每個元素到達Sink操

    2024年02月12日
    瀏覽(13)
  • 用flink cdc sqlserver 將數(shù)據(jù)實時同步到clickhouse

    flink cdc 終于支持 sqlserver 了。 現(xiàn)在互聯(lián)網(wǎng)公司用sqlserver的不多,大部分都是一些國企的老舊系統(tǒng)。我們以前同步數(shù)據(jù),都是用datax,但是不能實時同步數(shù)據(jù)?,F(xiàn)在有了flinkcdc,可以實現(xiàn)實時同步了。 1、首先sqlserver版本:要求sqlserver版本為14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包