国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC介紹

這篇具有很好參考價值的文章主要介紹了Flink CDC介紹。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1.CDC概述


CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術。它允許實時地監(jiān)視數(shù)據(jù)庫或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動,并將這些變動抽取出來,以便進行進一步的處理和分析。

傳統(tǒng)上,數(shù)據(jù)源的變化通常通過周期性地輪詢整個數(shù)據(jù)集進行檢查來實現(xiàn)。但是,這種輪詢的方式效率低下且不能實時反應變化。而 CDC 技術則通過在數(shù)據(jù)源上設置一種機制,使得變化的數(shù)據(jù)可以被實時捕獲并傳遞給下游處理系統(tǒng),從而實現(xiàn)了實時的數(shù)據(jù)變動監(jiān)控。

Flink 作為一個強大的流式計算引擎,提供了內(nèi)置的 CDC 功能,能夠連接到各種數(shù)據(jù)源(如數(shù)據(jù)庫、消息隊列等),捕獲其中的數(shù)據(jù)變化,并進行靈活的實時處理和分析。

通過使用 Flink CDC,我們可以輕松地構建實時數(shù)據(jù)管道,對數(shù)據(jù)變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。

Flink CDC介紹,flink,數(shù)據(jù)庫,大數(shù)據(jù)

?

2.CDC 的實現(xiàn)原理


通常來講,CDC 分為主動查詢和事件接收兩種技術實現(xiàn)模式。對于主動查詢而言,用戶通常會在數(shù)據(jù)源表的某個字段中,保存上次更新的時間戳或版本號等信息,然后下游通過不斷的查詢和與上次的記錄做對比,來確定數(shù)據(jù)是否有變動,是否需要同步。這種方式優(yōu)點是不涉及數(shù)據(jù)庫底層特性,實現(xiàn)比較通用;缺點是要對業(yè)務表做改造,且實時性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對數(shù)據(jù)庫的壓力較大。事件接收模式可以通過觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來實現(xiàn)。當數(shù)據(jù)源表發(fā)生變動時,會通過附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來。下游可以通過數(shù)據(jù)庫底層的協(xié)議,訂閱并消費這些事件,然后對數(shù)據(jù)庫變動記錄做重放,從而實現(xiàn)同步。這種方式的優(yōu)點是實時性高,可以精確捕捉上游的各種變動;缺點是部署數(shù)據(jù)庫的事件接收和解析器(例如 Debezium、Canal 等),有一定的學習和運維成本,對一些冷門的數(shù)據(jù)庫支持不夠。綜合來看,事件接收模式整體在實時性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫實現(xiàn),建議使用Debezium來實現(xiàn)變更數(shù)據(jù)的捕獲(下圖來自Debezium 官方文檔如果使用的只有 MySQL,則還可以用Canal。
Flink CDC介紹,flink,數(shù)據(jù)庫,大數(shù)據(jù)

?

3.為什么選 Flink
從上圖可以看到,Debezium 官方架構圖中,是通過 Kafka Streams 直接實現(xiàn)的 CDC 功能。而我們這里更建議使用 Flink CDC 模塊,因為 Flink 相對 Kafka Streams 而言,有如下優(yōu)勢:

強大的流處理引擎: Flink 是一個強大的流處理引擎,具備高吞吐量、低延遲、Exactly-Once 語義等特性。它通過基于事件時間的處理模型,支持準確和有序的數(shù)據(jù)處理,適用于實時數(shù)據(jù)處理和分析場景。這使得 Flink 成為實現(xiàn) CDC 的理想選擇。

內(nèi)置的 CDC 功能: Flink 提供了內(nèi)置的 CDC 功能,可以直接連接到各種數(shù)據(jù)源,捕獲數(shù)據(jù)變化,并將其作為數(shù)據(jù)流進行處理。這消除了我們自行開發(fā)或集成 CDC 解決方案的需要,使得實現(xiàn) CDC 變得更加簡單和高效。

多種數(shù)據(jù)源的支持: Flink CDC 支持與各種數(shù)據(jù)源進行集成,如關系型數(shù)據(jù)庫(如MySQL、PostgreSQL)、消息隊列(如Kafka、RabbitMQ)、文件系統(tǒng)等。這意味著無論你的數(shù)據(jù)存儲在哪里,F(xiàn)link 都能夠輕松地捕獲其中的數(shù)據(jù)變化,并進行進一步的實時處理和分析。

靈活的數(shù)據(jù)處理能力: Flink 提供了靈活且強大的數(shù)據(jù)處理能力,可以通過編寫自定義的轉(zhuǎn)換函數(shù)、處理函數(shù)等來對 CDC 數(shù)據(jù)進行各種實時計算和分析。同時,F(xiàn)link 還集成了 SQL 和 Table API,為用戶提供了使用 SQL 查詢語句或 Table API 進行簡單查詢和分析的方式。

完善的生態(tài)系統(tǒng): Flink 擁有活躍的社區(qū)和龐大的生態(tài)系統(tǒng),這意味著你可以輕松地獲取到豐富的文檔、教程、示例代碼和解決方案。此外,F(xiàn)link 還與其他流行的開源項目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和靈活性。

4.支持的連接器
Flink CDC介紹,flink,數(shù)據(jù)庫,大數(shù)據(jù)

?5.支持的 Flink 版本

Flink CDC介紹,flink,數(shù)據(jù)庫,大數(shù)據(jù)

?

6.Flink CDC特性
支持讀取數(shù)據(jù)庫快照,即使出現(xiàn)故障也能繼續(xù)讀取binlog,并進行Exactly-once處理
DataStream API 的 CDC 連接器,用戶可以在單個作業(yè)中使用多個數(shù)據(jù)庫和表的更改,而無需部署 Debezium 和 Kafka
Table/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創(chuàng)建 CDC 源來監(jiān)視單個表上的更改
下表顯示了連接器的當前特性:
Flink CDC介紹,flink,數(shù)據(jù)庫,大數(shù)據(jù)

?

7.用法實例

7.1DataStream API 的用法(推薦)

請嚴格按照上面的《5.支持的 Flink 版本》搭配來使用Flink CDC

<properties>
	<flink.version>1.13.0</flink.version>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency>
	<groupId>com.ververica</groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>${flinkcdc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-clients_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-scala_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-scala_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-common</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-planner-blink_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-table-api-java-bridge_2.12</artifactId>
	<version>${flink.version}</version>
</dependency>

請?zhí)崆伴_啟MySQL中的binlog,配置my.cnf文件,重啟mysqld服務即可

my.cnf

[client]
default_character_set=utf8
[mysqld]
server-id=1
collation_server=utf8_general_ci
character_set_server=utf8
log-bin=mysql-bin
binlog_format=row
expire_logs_days=30

ddl&dml.sql

create table test_cdc
(
    id   int          not null
        primary key,
    name varchar(100) null,
    age  int          null
);

INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);

FlinkDSCDC.java

package com.daniel.util;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author Daniel
 * @Date: 2023/7/25 10:03
 * @Description DataStream API CDC
 **/
public class FlinkDSCDC {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
                // 這里一定要是db.table的形式
                .tableList("flink.test_cdc")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print();
        env.execute("FlinkDSCDC");
    }
}

UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;

打印出的日志

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

可以得出的結論:

1、日志中的數(shù)據(jù)變化操作類型(op)可以表示為 ‘u’,表示更新操作。在第一條日志中,發(fā)生了一個更新操作,對應的記錄的 key 是 id=1,更新前的數(shù)據(jù)是 {id=1, name=Daniel, age=25},更新后的數(shù)據(jù)是 {id=1, name=Daniel, age=24}。在第二條日志中,也發(fā)生了一個更新操作,對應的記錄的 key 是 id=3,更新前的數(shù)據(jù)是 {id=3, name=James, age=16},更新后的數(shù)據(jù)是 {id=3, name=Andy, age=16}。
2、每條日志還提供了其他元數(shù)據(jù)信息,如數(shù)據(jù)源(source)、版本號(version)、連接器名稱(connector)、時間戳(ts_ms)等。這些信息可以幫助我們追蹤記錄的來源和處理過程。
3、日志中的 sourceOffset 包含了一些關鍵信息,如事務ID(transaction_id)、文件名(file)、偏移位置(pos)等。這些信息可以用于確保數(shù)據(jù)的準確順序和一致性。

7.2Table/SQL API的用法

FlinkSQLCDC.java

package com.daniel.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Author Daniel
 * @Date: 2023/7/25 15:25
 * @Description
 **/
public class FlinkSQLCDC {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE test_cdc (" +
                " id int primary key," +
                " name STRING," +
                " age int" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'scan.startup.mode' = 'latest-offset'," +
                " 'hostname' = 'localhost'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '123456'," +
                " 'database-name' = 'flink'," +
                " 'table-name' = 'test_cdc'" +
                ")");

        Table table = tableEnv.sqlQuery("select * from test_cdc");
        DataStream<Tuple2<Boolean, Row>> dataStreamSource = tableEnv.toRetractStream(table, Row.class);
        dataStreamSource.print();
        env.execute("FlinkSQLCDC");
    }
}

UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);

打印出的日志文章來源地址http://www.zghlxwxcb.cn/news/detail-681194.html

(false,-U[2, David, 38])
(true,+U[2, David, 55])
(false,-U[3, Andy, 16])
(true,+U[3, Andy, 22])
(false,-U[4, Robert, 27])
(true,+U[4, Alice, 27])
(false,-U[1, Daniel, 24])
(true,+U[1, Daniel, 18])
(true,+I[5, David, 29])

到了這里,關于Flink CDC介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 使用Flink CDC從數(shù)據(jù)庫采集數(shù)據(jù),保證數(shù)據(jù)不丟失:實現(xiàn)斷點續(xù)傳機制

    大數(shù)據(jù)技術在當前的數(shù)據(jù)分析和處理中扮演著重要的角色。Apache Flink作為一種快速、可靠的流處理引擎,在大規(guī)模數(shù)據(jù)處理中廣受歡迎。本文將介紹如何使用Flink CDC(Change Data Capture)從數(shù)據(jù)庫采集數(shù)據(jù),并通過設置checkpoint來支持數(shù)據(jù)采集中斷恢復,從而保證數(shù)據(jù)不丟失。

    2024年02月04日
    瀏覽(27)
  • 【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    我一開始是flink-cdc,oracle2Mysql,sql 我一開始直接用的oracle【date】類型,mysql【date】類型,sql的校驗通過了,但是真正操作數(shù)據(jù)的時候報錯,告訴我oracle的數(shù)據(jù)格式的日期數(shù)據(jù),不可以直接插入到mysql格式的日期數(shù)據(jù),說白了就是數(shù)據(jù)格式不一致導致的 我想的是既然格式不對

    2024年02月12日
    瀏覽(25)
  • 實戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)

    目錄 前言: 1、springboot引入依賴: 2、yml配置文件 3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器 4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象 5、CDC 數(shù)據(jù)實體類 6、自定義ApplicationContextUtil 7、自定義sink 交由spring管理,處理變更數(shù)據(jù) ? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查

    2024年02月10日
    瀏覽(24)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、達夢等數(shù)據(jù)庫開啟日志方法

    目錄 1. 前言 2. 數(shù)據(jù)源安裝與配置 2.1 MySQL 2.1.1 安裝 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安裝 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安裝 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安裝 2.4.2 CDC 配置 2.5達夢 2.4.1安裝 2.4.2CDC配置 3. 驗證 3.1 Flink版本與CDC版本的對應關系 3.2 下載相關包 3.3 添加cdc jar 至lib目錄 3.4 驗

    2024年02月05日
    瀏覽(122)
  • Flink CDC-Oracle CDC配置及DataStream API實現(xiàn)代碼...可實現(xiàn)監(jiān)控采集一個數(shù)據(jù)庫的多個表

    使用sysdba角色登錄到Oracle數(shù)據(jù)庫 確保Oracle歸檔日志(Archive Log)已啟用 若未啟用歸檔日志, 需運行以下命令啟用歸檔日志 設置歸檔日志存儲大小及位置 設置數(shù)據(jù)庫恢復文件存儲區(qū)域的大小(如歸檔重做日志文件、控制文件備份等) 設置恢復文件的實際物理存儲路徑;scope=spfile參數(shù)

    2024年02月05日
    瀏覽(25)
  • 業(yè)務數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    業(yè)務數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介紹 Sqoop : SQ L-to-Had oop ( Apache已經(jīng)終止Sqoop項目 ) 用途:把關系型數(shù)據(jù)庫的數(shù)據(jù)轉(zhuǎn)移到HDFS(Hive、Hbase)(重點使用的場景);Hadoop中的數(shù)據(jù)轉(zhuǎn)移到關系型數(shù)據(jù)庫中。Sqoop是java語言開發(fā)的,底層使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是數(shù)據(jù)塊的轉(zhuǎn)移,沒有使

    2024年02月15日
    瀏覽(44)
  • 【Flink-CDC】Flink CDC 介紹和原理概述

    【Flink-CDC】Flink CDC 介紹和原理概述

    CDC是( Change Data Capture 變更數(shù)據(jù)獲取 )的簡稱。 核心思想是, 監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。 CDC 主要分為基于查詢和基于

    2024年01月20日
    瀏覽(25)
  • Flink CDC介紹

    Flink CDC介紹

    1.CDC概述 CDC(Change Data Capture)是一種用于捕獲和處理數(shù)據(jù)源中的變化的技術。它允許實時地監(jiān)視數(shù)據(jù)庫或數(shù)據(jù)流中發(fā)生的數(shù)據(jù)變動,并將這些變動抽取出來,以便進行進一步的處理和分析。 傳統(tǒng)上,數(shù)據(jù)源的變化通常通過周期性地輪詢整個數(shù)據(jù)集進行檢查來實現(xiàn)。但是,這

    2024年02月11日
    瀏覽(15)
  • Flink CDC介紹及原理

    CDC (Change Data Capture) 是一種用于 捕捉數(shù)據(jù)庫變更數(shù)據(jù) 的技術,F(xiàn)link 從 1.11 版本開始原生支持 CDC 數(shù)據(jù)(changelog)的處理,目前已經(jīng)是非常成熟的變更數(shù)據(jù)處理方案。 Flink CDC Connectors 是 Flink 的一組 Source 連接器,是 Flink CDC 的核心組件,這些連接器負責從? MySQL、PostgreSQL、Ora

    2024年02月11日
    瀏覽(15)
  • Flink CDC介紹和簡單實用

    Flink CDC介紹和簡單實用

    CDC是Change Data Capture(變更數(shù)據(jù)獲取)的簡稱。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。 基于查詢和基于binlog 從 ETL 的角度進行分析,

    2024年02月06日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包