簡介
CDC是Change Data Capture(變更數(shù)據(jù)獲?。┑暮喎Q。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。
種類
基于查詢和基于binlog
基于日志的 CDC 方案介紹
從 ETL 的角度進行分析,一般采集的都是業(yè)務(wù)庫數(shù)據(jù),這里使用 MySQL 作為需要采集的數(shù)據(jù)庫,通過 Debezium 把 MySQL Binlog 進行采集后發(fā)送至 Kafka 消息隊列,然后對接一些實時計算引擎或者 APP 進行消費后把數(shù)據(jù)傳輸入 OLAP 系統(tǒng)或者其他存儲介質(zhì)。
Flink 希望打通更多數(shù)據(jù)源,發(fā)揮完整的計算能力。我們生產(chǎn)中主要來源于業(yè)務(wù)日志和數(shù)據(jù)庫日志,F(xiàn)link 在業(yè)務(wù)日志的支持上已經(jīng)非常完善,但是在數(shù)據(jù)庫日志支持方面在 Flink 1.11 前還屬于一片空白,這就是為什么要集成 CDC 的原因之一。
Flink SQL 內(nèi)部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數(shù)據(jù)只需要把CDC 數(shù)據(jù)轉(zhuǎn)換成 Flink 認識的數(shù)據(jù),所以在 Flink 1.11 里面重構(gòu)了 TableSource 接口,以便更好支持和集成 CDC。
重構(gòu)后的 TableSource 輸出的都是 RowData 數(shù)據(jù)結(jié)構(gòu),代表了一行的數(shù)據(jù)。在RowData 上面會有一個元數(shù)據(jù)的信息,我們稱為 RowKind 。RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數(shù)據(jù)庫里面的 binlog 概念十分類似。通過 Debezium 采集的 JSON 格式,包含了舊數(shù)據(jù)和新數(shù)據(jù)行以及原數(shù)據(jù)信息,op 的 u表示是 update 更新操作標識符,ts_ms 表示同步的時間戳。因此,對接 Debezium JSON 的數(shù)據(jù),其實就是將這種原始的 JSON 數(shù)據(jù)轉(zhuǎn)換成 Flink 認識的 RowData。
flink作為etl工具
原工作原理
優(yōu)化后
Flink SQL 采集+計算+傳輸(ETL)一體化優(yōu)點:
? 開箱即用,簡單易上手
? 減少維護的組件,簡化實時鏈路,減輕部署成本
? 減小端到端延遲
? Flink 自身支持 Exactly Once 的讀取和計算
? 數(shù)據(jù)不落地,減少存儲成本
? 支持全量和增量流式讀取
? binlog 采集位點可回溯
應(yīng)用場景
? 實時數(shù)據(jù)同步,數(shù)據(jù)備份,數(shù)據(jù)遷移,數(shù)倉構(gòu)建
優(yōu)勢:豐富的上下游(E & L),強大的計算(T),易用的 API(SQL),流式計算低延遲
? 數(shù)據(jù)庫之上的實時物化視圖、流式數(shù)據(jù)分析
? 索引構(gòu)建和實時維護
? 業(yè)務(wù) cache 刷新
? 審計跟蹤
? 微服務(wù)的解耦,讀寫分離
? 基于 CDC 的維表關(guān)聯(lián)
開源地址
https://github.com/ververica/flink-cdc-connectors
最新flink cdc官方文檔分享
https://flink-learning.org.cn/article/detail/eed4549f80e80cc30c69c406cb08b59a
流程圖
個人理解作圖
1.X痛點
所以設(shè)計目標
設(shè)計實現(xiàn)上:
在對于有主鍵的表做初始化模式,整體的流程主要分為5個階段:
1.Chunk切分;2.Chunk分配;(實現(xiàn)并行讀取數(shù)據(jù)&CheckPoint)
3.Chunk讀取;(實現(xiàn)無鎖讀取)
4.Chunk匯報;
5.Chunk分配。
對于并發(fā)線程
會對比各個讀取切分的最高和最低的位置區(qū)間,超過區(qū)間進行更新
目前支持開發(fā)方式
個人理解作圖
開發(fā)測試大致流程
個人理解作圖
使用
mysql開啟binlog
vi /etc/my.cnf 底部追加
server_id=2
log_bin=mysql-bin
binlog_format=ROW
# 下面這行可寫可不寫 監(jiān)控對應(yīng)的數(shù)據(jù)庫
binlog_do_db=elebap_bak
重啟mysqld服務(wù), 并啟動mysql
systemctl restart mysqld
或者
bin/mysqld --initialize --user=root --basedir=/usr/local/mysql --datadir=/data/mysql
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 | 154 | | | |
+------------------+----------+--------------+------------------+-------------------+
mysql> show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
6 rows in set (0.01 sec)
log_bin顯示ON開啟狀態(tài)。
mysql的建表以及插入數(shù)據(jù):
CREATE TABLE study(
ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT ,
NAME VARCHAR(20) NOT NULL,
AGE INT(10)
);
INSERT INTO study VALUES(1 , 'a' , 10);
INSERT INTO study VALUES(2 , 'b' , 11);
INSERT INTO study VALUES(3 , 'c' , 12);
INSERT INTO study VALUES(4 , 'd' , 13);
INSERT INTO study VALUES(5 , 'e' , 14);
INSERT INTO study VALUES(6 , 'f' , 15);
代碼
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* @author wyi
* @date 2022/8/18 11:06
* @description
*/
public class flinkcdcTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties pops = new Properties();
pops.setProperty("debezium.snapshot.locking.mode", "none");
DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource.<JSONObject>builder()
.hostname("192.168.80.161")
.port(3306)
.username("root")
.password("123456")
.databaseList(BussinessConstant.DATABASE_LIST)
.tableList(BussinessConstant.ABLE_LIST_ALARM_CONFIG_CAP_UNBALANCE)
.deserializer(new TestRuleDeserialization())
.build();
SingleOutputStreamOperator<Object> map = env.addSource(mysqlSource).map(new MapFunction<JSONObject, Object>() {
@Override
public Object map(JSONObject jsonObject) throws Exception {
return jsonObject.toString();
}
});
map.print();
env.execute("CdcMysqlSource");
}
}
自定義的序列化類文章來源:http://www.zghlxwxcb.cn/news/detail-461224.html
package com.cosmosource.da.cdc;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
/**
* @author wyi
* @date 2022/8/18 10:32
* @description 這是一個demo,測試flink-cdc連接mysql的反序列化類
*/
public class TestRuleDeserialization implements DebeziumDeserializationSchema<JSONObject> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) throws Exception {
//獲取主題
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
System.out.println(arr[1]);
System.out.println(arr[2]);
//獲取操作類型 READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//獲取值信息并轉(zhuǎn)換為Struct類型
Struct value = (Struct) sourceRecord.value();
System.out.println("value:"+value);
//獲取變化后的數(shù)據(jù)
Struct after = value.getStruct("after");
//創(chuàng)建JSON對象用于存儲數(shù)據(jù)信息
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
//創(chuàng)建JSON對象用于封裝最終返回值數(shù)據(jù)信息
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//發(fā)送數(shù)據(jù)至下游
collector.collect(result);
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return TypeInformation.of(JSONObject.class);
}
}
結(jié)果:文章來源地址http://www.zghlxwxcb.cn/news/detail-461224.html
……………………………………
……………………………………
……………………………………
wy
study
value:Struct{after=Struct{ID=9,NAME=1,AGE=15},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wy,table=study,server_id=0,file=mysql-bin.000001,pos=3128,row=0},op=c,ts_ms=1660793058775}
八月 18, 2022 11:24:19 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.80.161:3306 at mysql-bin.000001/3128 (sid:5501, cid:24)
5> {"database":"wy","data":{"ID":4,"NAME":"d","AGE":13},"operation":"create","table":"study"}
1> {"database":"wy","data":{"ID":8,"NAME":"1","AGE":15},"operation":"create","table":"study"}
8> {"database":"wy","data":{"ID":7,"NAME":"f","AGE":15},"operation":"create","table":"study"}
4> {"database":"wy","data":{"ID":3,"NAME":"c","AGE":12},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":1,"NAME":"a","AGE":10},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":9,"NAME":"1","AGE":15},"operation":"create","table":"study"}
3> {"database":"wy","data":{"ID":2,"NAME":"b","AGE":11},"operation":"create","table":"study"}
6> {"database":"wy","data":{"ID":5,"NAME":"e","AGE":14},"operation":"create","table":"study"}
7> {"database":"wy","data":{"ID":6,"NAME":"f","AGE":15},"operation":"create","table":"study"}
到了這里,關(guān)于Flink CDC介紹和簡單實用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!