国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

實戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)

這篇具有很好參考價值的文章主要介紹了實戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

目錄

前言:

1、springboot引入依賴:

2、yml配置文件

3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器

4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象

5、CDC 數(shù)據(jù)實體類

6、自定義ApplicationContextUtil

7、自定義sink 交由spring管理,處理變更數(shù)據(jù)


前言:

? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查詢了很多獲取增量數(shù)據(jù)的方案,最終選擇了Flink的 flink-connector-sqlserver-cdc ,這個需要用到SQL Server 的CDC(變更數(shù)據(jù)捕獲),通過CDC來獲取增量數(shù)據(jù),處理數(shù)據(jù)前需要對數(shù)據(jù)庫進(jìn)行配置,如果不清楚如何配置可以看看我這篇文章:《SQL Server數(shù)據(jù)庫開啟CDC變更數(shù)據(jù)捕獲操作指引》

廢話不多說,直接上干貨,如有不足還請指正

1、springboot引入依賴:

    <properties>
        <flink.version>1.16.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>9.4.0.jre8</version>
        </dependency>        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>
    </dependencies>

2、yml配置文件

spring:
    datasource:
    url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
    username: sa
    password: root
    driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# 實時同步SQL Server數(shù)據(jù)庫配置
CDC:
  DataSource:
    host: 127.0.0.1
    port: 1433
    database: HM_5001
    tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
    username: sa
    password: sa

3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器

import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.io.Serializable;

/**
 * SQL server CDC變更監(jiān)聽器
 **/
@Component
@Slf4j
public class SQLServerCDCListener implements ApplicationRunner, Serializable {

    /**
     * CDC數(shù)據(jù)源配置
     */
    @Value("${CDC.DataSource.host}")
    private String host;
    @Value("${CDC.DataSource.port}")
    private String port;
    @Value("${CDC.DataSource.database}")
    private String database;
    @Value("${CDC.DataSource.tableList}")
    private String tableList;
    @Value("${CDC.DataSource.username}")
    private String username;
    @Value("${CDC.DataSource.password}")
    private String password;

    private final DataChangeSink dataChangeSink;

    public SQLServerCDCListener(DataChangeSink dataChangeSink) {
        this.dataChangeSink = dataChangeSink;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("開始啟動Flink CDC獲取ERP變更數(shù)據(jù)......");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
                .setParallelism(1);
        streamSource.addSink(dataChangeSink);
        env.execute("SQLServer-stream-cdc");
    }

    /**
     * 構(gòu)造CDC數(shù)據(jù)源
     */
    private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        String[] tables = tableList.replace(" ", "").split(",");
        return SqlServerSource.<DataChangeInfo>builder()
                .hostname(host)
                .port(Integer.parseInt(port))
                .database(database) // monitor sqlserver database
                .tableList(tables) // monitor products table
                .username(username)
                .password(password)
                /*
                 *initial初始化快照,即全量導(dǎo)入后增量導(dǎo)入(檢測更新數(shù)據(jù)寫入)
                 * latest:只進(jìn)行增量導(dǎo)入(不讀取歷史變化) 
                 */
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }
}

4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;

/**
 * SQLServer消息讀取自定義序列化
 **/
@Slf4j
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String CREATE = "CREATE";
    public static final String UPDATE = "UPDATE";

    /**
     *
     * 反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
        try {
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
            // 獲取操作類型  CREATE UPDATE DELETE  1新增 2修改 3刪除
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            ZoneId zone = ZoneId.systemDefault();
            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
            //7.輸出數(shù)據(jù)
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("SQLServer消息讀取自定義序列化報錯:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     *
     * 從源數(shù)據(jù)獲取出變更之前或之后的數(shù)據(jù)
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }



    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {
        return TypeInformation.of(DataChangeInfo.class);
    }
}

5、CDC 數(shù)據(jù)實體類

import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * CDC 數(shù)據(jù)實體類
 */
@Data
public class DataChangeInfo implements Serializable {

    /**
     * 數(shù)據(jù)庫名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 變更時間
     */
    private LocalDateTime changeTime;
    /**
     * 變更類型 1新增 2修改 3刪除
     */
    private Integer eventType;
    /**
     * 變更前數(shù)據(jù)
     */
    private String beforeData;
    /**
     * 變更后數(shù)據(jù)
     */
    private String afterData;
}

6、自定義ApplicationContextUtil

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.io.Serializable;

@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
    /**
     * 上下文
     */
    private static ApplicationContext context;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }
    public static ApplicationContext getApplicationContext() {
        return context;
    }
    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

7、自定義sink 交由spring管理,處理變更數(shù)據(jù)

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

/**
 * 自定義sink 交由spring管理
 * 處理變更數(shù)據(jù)
 **/
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {

    private static final long serialVersionUID = -74375380912179188L;

    private UserMapper userMapper;

    /**
     * 在open()方法中動態(tài)注入Spring容器的類
     * 在啟動SpringBoot項目是加載了Spring容器,其他地方可以使用@Autowired獲取Spring容器中的類;
     * 但是Flink啟動的項目中,默認(rèn)啟動了多線程執(zhí)行相關(guān)代碼,導(dǎo)致在其他線程無法獲取Spring容器,
     * 只有在Spring所在的線程才能使用@Autowired,故在Flink自定義的Sink的open()方法中初始化Spring容器
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        userMapper = ApplicationContextUtil.getBean(UserMapper.class);
    }

    @Override
    public void invoke(DataChangeInfo dataChangeInfo, Context context) {
        log.info("收到變更原始數(shù)據(jù):{}", dataChangeInfo);
        // TODO 開始處理你的數(shù)據(jù)吧
    }

以上是我親自驗證測試的結(jié)果,已發(fā)布生產(chǎn)環(huán)境,如有不足還請指正。文章來源地址http://www.zghlxwxcb.cn/news/detail-499803.html

到了這里,關(guān)于實戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SQL server開啟變更數(shù)據(jù)捕獲(CDC)

    SQL server開啟變更數(shù)據(jù)捕獲(CDC)

    多多點(diǎn)贊,會變好看! 多多留言,會變有錢! 變更數(shù)據(jù)捕獲(Change Data Capture ,簡稱 CDC):記錄 SQL Server 表的插入、更新和刪除操作。開啟cdc的源表在插入、更新和刪除操作時會插入數(shù)據(jù)到日志表中。cdc通過捕獲進(jìn)程將變更數(shù)據(jù)捕獲到變更表中,通過cdc提供的查詢函數(shù),可

    2024年02月11日
    瀏覽(19)
  • Flink實戰(zhàn)-(6)FlinkSQL實現(xiàn)CDC

    FlinkSQL說明 Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。 自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初

    2023年04月26日
    瀏覽(25)
  • 基于 Flink SQL CDC 數(shù)據(jù)處理的終極武器

    基于 Flink SQL CDC 數(shù)據(jù)處理的終極武器

    來源互聯(lián)網(wǎng)多篇文章總結(jié) 業(yè)務(wù)系統(tǒng)經(jīng)常會遇到需要更新數(shù)據(jù)到多個存儲的需求。例如:一個訂單系統(tǒng)剛剛開始只需要寫入數(shù)據(jù)庫即可完成業(yè)務(wù)使用。某天 BI 團(tuán)隊期望對數(shù)據(jù)庫做全文索引,于是我們同時要寫多一份數(shù)據(jù)到 ES 中,改造后一段時間,又有需求需要寫入到 Redis 緩存

    2024年02月16日
    瀏覽(17)
  • 深入淺出 SQL Server CDC 數(shù)據(jù)同步

    深入淺出 SQL Server CDC 數(shù)據(jù)同步

    SQL Server 是一款老牌關(guān)系型數(shù)據(jù)庫,自 1988 年由 Microsoft、Sybase 和 Ashton-Tate 三家公司共同推出,不斷迭代更新至今,擁有相當(dāng)廣泛的用戶群體。 如今,我們提到 SQL Server 通常指 Microsoft SQL Server 2000 之后的版本。 SQL Server 2008 是一個里程碑版本,加入了大量新特性,包括 新的語法

    2024年02月12日
    瀏覽(39)
  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(93)
  • 【flink報錯】flink cdc無主鍵時的操作

    “org.apache.flink.table.api.validationexception: ‘scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys” 報錯提示當(dāng)表沒有主鍵時,必須設(shè)置 ‘scan.incremental.snapshot.chunk.key-column’。 這里表沒有主鍵,不是flink table中設(shè)置的primary key,而是物理表中沒有主鍵。 如

    2024年04月23日
    瀏覽(15)
  • Doris通過Flink CDC接入MySQL實戰(zhàn)

    1. 創(chuàng)建MySQL庫表,寫入demo數(shù)據(jù) 登錄測試MySQL 創(chuàng)建MySQL庫表,寫入demo數(shù)據(jù) 注意:MySQL需要開通bin-log log_bin=mysql_bin binlog-format=Row server-id=1 2. 創(chuàng)建Doris庫表 創(chuàng)建Doris表 3. 啟動Flink 啟動flink 創(chuàng)建Flink 任務(wù): 輸入如下地址,查看flink任務(wù) http://localhost:8081/#/job/running 數(shù)據(jù)驗證:啟動后可

    2023年04月10日
    瀏覽(22)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫: 一種是通過FlinkSQL直接輸入兩表數(shù)據(jù)庫映射進(jìn)行數(shù)據(jù)同步,缺點(diǎn)是只能單表進(jìn)行同步; 一種是通過DataStream開發(fā)一個maven項目,打成jar包上傳到服務(wù)器運(yùn)行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應(yīng)用可以部署在具有公網(wǎng)IP的服務(wù)

    2023年04月11日
    瀏覽(27)
  • flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡單使用

    flink cdc數(shù)據(jù)同步,DataStream方式和SQL方式的簡單使用

    目錄 一、flink cdc介紹 1、什么是flink cdc 2、flink cdc能用來做什么 3、flink cdc的優(yōu)點(diǎn) 二、flink cdc基礎(chǔ)使用 1、使用flink cdc讀取txt文本數(shù)據(jù) 2、DataStream的使用方式 3、SQL的方式 總結(jié) flink cdc是一個由阿里研發(fā)的,一個可以直接從MySQL、PostgreSQL等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)

    2024年02月13日
    瀏覽(25)
  • 實戰(zhàn):大數(shù)據(jù)Flink CDC同步Mysql數(shù)據(jù)到ElasticSearch

    實戰(zhàn):大數(shù)據(jù)Flink CDC同步Mysql數(shù)據(jù)到ElasticSearch

    前面的博文我們分享了大數(shù)據(jù)分布式流處理計算框架Flink和其基礎(chǔ)環(huán)境的搭建,相信各位看官都已經(jīng)搭建好了自己的運(yùn)行環(huán)境。那么,今天就來實戰(zhàn)一把使用Flink CDC同步Mysql數(shù)據(jù)導(dǎo)Elasticsearch。 CDC簡介 CDC 的全稱是 Change Data Capture(變更數(shù)據(jù)捕獲技術(shù)) ,在廣義的概念上,只要

    2024年02月09日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包