來源互聯(lián)網(wǎng)多篇文章總結(jié)
一、傳統(tǒng)的數(shù)據(jù)同步方案與 Flink SQL CDC 解決方案
業(yè)務(wù)系統(tǒng)經(jīng)常會遇到需要更新數(shù)據(jù)到多個存儲的需求。例如:一個訂單系統(tǒng)剛剛開始只需要寫入數(shù)據(jù)庫即可完成業(yè)務(wù)使用。某天 BI 團隊期望對數(shù)據(jù)庫做全文索引,于是我們同時要寫多一份數(shù)據(jù)到 ES 中,改造后一段時間,又有需求需要寫入到 Redis 緩存中。
很明顯這種模式是不可持續(xù)發(fā)展的,這種雙寫到各個數(shù)據(jù)存儲系統(tǒng)中可能導(dǎo)致不可維護(hù)和擴展,數(shù)據(jù)一致性問題等,需要引入分布式事務(wù),成本和復(fù)雜度也隨之增加。
我們可以通過 CDC(Change Data Capture)工具進(jìn)行解除耦合,同步到下游需要同步的存儲系統(tǒng),實現(xiàn)一份變動記錄,實時處理并投遞到多個目的地。通過這種方式提高系統(tǒng)的穩(wěn)健性,也方便后續(xù)的維護(hù)。
1.1 Flink SQL CDC 數(shù)據(jù)同步與原理解析
CDC 是變更數(shù)據(jù)捕獲(Change Data Capture)技術(shù)的縮寫,它可以將源數(shù)據(jù)庫(Source)的增量變動記錄,同步到一個或多個數(shù)據(jù)目的(Sink)。在同步過程中,還可以對數(shù)據(jù)進(jìn)行一定的處理,例如分組(GROUP BY)、多表的關(guān)聯(lián)(JOIN)等。
業(yè)界主要有基于查詢的 CDC 和基于日志的 CDC ,可以從下面表格對比他們功能和差異點。
-
基于查詢的 CDC
用戶通常會在數(shù)據(jù)源表的某個字段中,保存上次更新的時間戳或版本號等信息,然后下游通過不斷的查詢和與上次的記錄做對比,來確定數(shù)據(jù)是否有變動,是否需要同步。這種方式優(yōu)點是不涉及數(shù)據(jù)庫底層特性,實現(xiàn)比較通用;缺點是要對業(yè)務(wù)表做改造,且實時性不高,不能確保跟蹤到所有的變更記錄,且持續(xù)的頻繁查詢對數(shù)據(jù)庫的壓力較大。
特點:基于批處理,不能捕獲到所有數(shù)據(jù)的變化、高延遲、需要查詢數(shù)據(jù)庫,會增加數(shù)據(jù)庫壓力
-
基于日志的 CDC
可以通過觸發(fā)器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)來實現(xiàn)。當(dāng)數(shù)據(jù)源表發(fā)生變動時,會通過附加在表上的觸發(fā)器或者 binlog 等途徑,將操作記錄下來。下游可以通過數(shù)據(jù)庫底層的協(xié)議,訂閱并消費這些事件,然后對數(shù)據(jù)庫變動記錄做重放,從而實現(xiàn)同步。這種方式的優(yōu)點是實時性高,可以精確捕捉上游的各種變動;缺點是部署數(shù)據(jù)庫的事件接收和解析器(例如 Debezium、Canal 等),有一定的學(xué)習(xí)和運維成本,對一些冷門的數(shù)據(jù)庫支持不夠。綜合來看,事件接收模式整體在實時性、吞吐量方面占優(yōu),如果數(shù)據(jù)源是 MySQL、PostgreSQL、MongoDB 等常見的數(shù)據(jù)庫實現(xiàn),建議使用Debezium來實現(xiàn)變更數(shù)據(jù)的捕獲。
特點: 基于streaming模式、能捕捉所有數(shù)據(jù)的變化、低延遲、不會增加數(shù)據(jù)庫壓力。
經(jīng)過以上對比,我們可以發(fā)現(xiàn)基于日志 CDC 有以下這幾種優(yōu)勢:
- 能夠捕獲所有數(shù)據(jù)的變化,捕獲完整的變更記錄。在異地容災(zāi),數(shù)據(jù)備份等場景中得到廣泛應(yīng)用,如果是基于查詢的 CDC 有可能導(dǎo)致兩次查詢的中間一部分?jǐn)?shù)據(jù)丟失
- 每次 DML 操作均有記錄無需像查詢 CDC 這樣發(fā)起全表掃描進(jìn)行過濾,擁有更高的效率和性能,具有低延遲,不增加數(shù)據(jù)庫負(fù)載的優(yōu)勢
- 無需入侵業(yè)務(wù),業(yè)務(wù)解耦,無需更改業(yè)務(wù)模型
- 捕獲刪除事件和捕獲舊記錄的狀態(tài),在查詢 CDC 中,周期的查詢無法感知中間數(shù)據(jù)是否刪除
常見開源CDC方案比較
1.2 基于日志的 CDC 方案介紹
從 ETL 的角度進(jìn)行分析,一般采集的都是業(yè)務(wù)庫數(shù)據(jù),這里使用 MySQL 作為需要采集的數(shù)據(jù)庫,通過 Debezium 把 MySQL Binlog 進(jìn)行采集后發(fā)送至 Kafka 消息隊列,然后對接一些實時計算引擎或者 APP 進(jìn)行消費后把數(shù)據(jù)傳輸入 OLAP 系統(tǒng)或者其他存儲介質(zhì)。
Flink 希望打通更多數(shù)據(jù)源,發(fā)揮完整的計算能力。我們生產(chǎn)中主要來源于業(yè)務(wù)日志和數(shù)據(jù)庫日志,F(xiàn)link 在業(yè)務(wù)日志的支持上已經(jīng)非常完善,但是在數(shù)據(jù)庫日志支持方面在 Flink 1.11 前還屬于一片空白,這就是為什么要集成 CDC 的原因之一。
Flink SQL 內(nèi)部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數(shù)據(jù)只需要把CDC 數(shù)據(jù)轉(zhuǎn)換成 Flink 認(rèn)識的數(shù)據(jù)
1.3 選擇 Flink 作為 ETL 工具
之前的mysql binlog日志處理流程,例如canal監(jiān)聽binlog把日志寫入到kafka中。而Flink實時消費Kakfa的數(shù)據(jù)實現(xiàn)mysql數(shù)據(jù)的同步或其他內(nèi)容等。
拆分來說整體上可以分為以下幾個階段:
- mysql開啟binlog
- canal同步binlog數(shù)據(jù)寫入到kafka
- flink讀取kakfa中的binlog數(shù)據(jù)進(jìn)行相關(guān)的業(yè)務(wù)處理。
- 整體的處理鏈路較長,需要用到的組件也比較多。
Apache Flink CDC可以直接從數(shù)據(jù)庫獲取到binlog供下游進(jìn)行業(yè)務(wù)計算分析。簡單來說鏈路如下圖:
社區(qū)開發(fā)了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。目前也已開源,開源地址:
https://github.com/ververica/flink-cdc-connectors
flink-cdc-connectors 可以用來替換 Debezium+Kafka 的數(shù)據(jù)采集模塊,從而實現(xiàn) Flink SQL 采集+計算+傳輸(ETL)一體化,這樣做的優(yōu)點有以下:
- 開箱即用,簡單易上手
- 減少維護(hù)的組件,簡化實時鏈路,減輕部署成本
- 減小端到端延遲
- Flink 自身支持 Exactly Once 的讀取和計算
- 數(shù)據(jù)不落地,減少存儲成本
- 支持全量和增量流式讀取
- binlog 采集位點可回溯*
二、 基于 Flink SQL CDC 的數(shù)據(jù)同步方案實踐
下面給大家?guī)?個關(guān)于 Flink SQL + CDC 在實際場景中使用較多的案例。在完成實驗時候,你需要 Docker、MySQL、Elasticsearch 等組件,具體請參考每個案例參考文檔。
2.1 CDC Streaming ETL
模擬電商公司的訂單表和物流表,需要對訂單數(shù)據(jù)進(jìn)行統(tǒng)計分析,對于不同的信息需要進(jìn)行關(guān)聯(lián)后續(xù)形成訂單的大寬表后,交給下游的業(yè)務(wù)方使用 ES 做數(shù)據(jù)分析,這個案例演示了如何只依賴 Flink 不依賴其他組件,借助 Flink 強大的計算能力實時把 Binlog 的數(shù)據(jù)流關(guān)聯(lián)一次并同步至 ES 。
例如如下的這段 Flink SQL 代碼就能完成實時同步 MySQL 中 orders 表的全量+增量數(shù)據(jù)的目的。
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
SELECT * FROM orders
2.2 Flink-CDC實踐之mysql案例
- 開啟mysql binlog
查看mysql-binlog狀態(tài)并開啟mysql-binlog
上圖是開始的狀態(tài)。如果沒有開始,則log_bin=off,log_bin_basename和log_bin_index值為空。開啟方式如下:
vim vim /etc/my.cnf
在添加以下信息:
#開啟binglog
server-id=1
log-bin=/var/lib/mysql/mysql-bin
- server-id表示單個結(jié)點的id,這里由于只有一個結(jié)點,所以可以把id隨機指定為一個數(shù),這里將id設(shè)置成1。若集群中有多個結(jié)點,則id不能相同
- 第二句是指定binlog日志文件的名字為mysql-bin,以及其存儲路徑。
添加完成后保存退出。
重啟mysql服務(wù):
service mysqld restart
- 編寫flinksql
- 源表:
create table Flink_source(id bigint, name string, age int,dt string)
with(
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.180',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'Flink_source'
);
可以知道,我們要去實時取Flink_source表,而這張表已經(jīng)存儲于mysql數(shù)據(jù)庫的。文章來源:http://www.zghlxwxcb.cn/news/detail-603891.html
- 目標(biāo)表:
create table Flink_target(id bigint primary key, name string, age int,dt string)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.180:3306/test',
'username'='root',
'password'='123456',
'table-name' = 'Flink_target',
'sink.buffer-flush.max-rows'='1',
'sink.buffer-flush.interval'='0'
);
可以知道,我們到實時存入目標(biāo)表Flink_target,而這張表已經(jīng)存儲于mysql數(shù)據(jù)庫。文章來源地址http://www.zghlxwxcb.cn/news/detail-603891.html
- 插入數(shù)據(jù)
insert into Flink_target select * from Flink_source;
到了這里,關(guān)于基于 Flink SQL CDC 數(shù)據(jù)處理的終極武器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!