1,什么是cdc
CDC是(Change Data Capture 變更數(shù)據(jù)獲取)的簡稱。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。
2,flink的cdc
cdc項目地址:https://github.com/ververica/flink-cdc-connectors
cdc項目文檔:https://ververica.github.io/flink-cdc-connectors/master/
flink-sql項目文檔:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/
?
3,環(huán)境準(zhǔn)備
- mysql
- elasticsearch
- flink on yarn
說明:如果沒有安裝hadoop,那么可以不用yarn,直接用flink standalone環(huán)境吧。
本例使用版本如下:
?下面兩個地址下載flink的依賴包,放在lib目錄下面。
下載地址:
1、https://repo.maven.apache.org/maven2/com/alibaba/ververica/
flink-sql-connector-mysql-cdc-1.4.0.jar
此倉庫提供的最新版本為1.4.0,如需新版本可自行編譯或者去https://mvnrepository.com/下載。
2、https://repo.maven.apache.org/maven2/org/apache/flink/
flink-sql-connector-elasticsearch7_2.11-1.13.5.jar
小坑:此處使用的是es7,由于本地環(huán)境是es8導(dǎo)致無法創(chuàng)建索引,又重新安裝es7測試成功。
?
4,啟動flink
啟動flink集群
./start-cluster.sh
啟動成功的話,可以在 http://localhost:8081/ 訪問到 Flink Web UI,如下所示:
?啟動flink sql-client
./sql-client.sh
不加任何參數(shù)進入交互式界面。
./sql-client.sh -f /tmp/aa.sql
-f:就是接sql文件。即不用進行交互式查詢,這里注意:aa.sql文件里的insert語句會被分開成一個個job。
如果想要在一個job里提交就要注意寫法,即:
在1.15.0以前語法:
BEGIN STATEMENT SET;
-- one or more INSERT INTO statements
{ INSERT INTO|OVERWRITE <select_statement>; }+
END;
自定義job名稱: set pipeline.name = totalTask;
啟動成功后,可以看到如下的頁面:
?
5,數(shù)據(jù)同步初始化
1)mysql數(shù)據(jù)庫原始表
CREATE TABLE `product_view` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) NOT NULL, `product_id` int(11) NOT NULL, `server_id` int(11) NOT NULL, `duration` int(11) NOT NULL, `times` varchar(11) NOT NULL, `time` datetime NOT NULL, PRIMARY KEY (`id`), KEY `time` (`time`), KEY `user_product` (`user_id`,`product_id`) USING BTREE, KEY `times` (`times`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 樣本數(shù)據(jù)
INSERT INTO `product_view` VALUES ('1', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('2', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('3', '1', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('4', '1', '1', '2', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('5', '8', '1', '1', '120', '120', '2020-05-14 13:14:00');
INSERT INTO `product_view` VALUES ('6', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
INSERT INTO `product_view` VALUES ('7', '8', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('8', '8', '1', '3', '120', '120', '2020-04-23 13:14:00');
INSERT INTO `product_view` VALUES ('9', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
2)flink?創(chuàng)建source數(shù)據(jù)庫關(guān)聯(lián)表
CREATE TABLE product_view_source ( `id` int, `user_id` int, `product_id` int, `server_id` int, `duration` int, `times` string, `time` timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '10.34.100.209', 'port' = '3306', 'username' = 'root', 'password' = '123', 'database-name' = 'flinkcdc_test', 'table-name' = 'product_view',
'server-id' = '5401' );
這樣,我們在flink-sql client操作這個表相當(dāng)于操作mysql里面的對應(yīng)表。
3)flink?創(chuàng)建sink,數(shù)據(jù)庫關(guān)聯(lián)表elasticsearch
CREATE TABLE product_view_sink( `id` int, `user_id` int, `product_id` int, `server_id` int, `duration` int, `times` string, `time` timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://10.34.100.156:9200', 'index' = 'product_view_index' );
這樣,es里的product_view_index這個索引在數(shù)據(jù)同步時會被自動創(chuàng)建,如果想指定一些屬性,可以提前手動創(chuàng)建好索引。往product_view_sink里面插入數(shù)據(jù),可以發(fā)現(xiàn)es中已經(jīng)有數(shù)據(jù)了。
查看flink創(chuàng)建的表
?查看flink表數(shù)據(jù)
select * from product_view_source;
?
select * from product_view_sink;
?由此可見,sink不能直接使用sql查詢。
4)建立同步任務(wù)
insert into product_view_sink select * from product_view_source;
這個時候是可以退出flink sql-client的,然后進入flink web-ui,可以看到mysql表數(shù)據(jù)已經(jīng)同步到elasticsearch中了,對mysql進行插入刪除更新,elasticsearch都是同步更新的。
查看任務(wù)
?查看es數(shù)據(jù)
?
6,數(shù)據(jù)實時同步
1)新增記錄
mysql數(shù)據(jù)庫插入一條記錄
INSERT INTO `product_view` VALUES ('10', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
查詢es,新增一條記錄
?
2)刪除記錄
mysql數(shù)據(jù)庫刪除一條記錄
DELETE FROM `product_view` where id=10;
查詢es,減少一條記錄
?3)更新記錄
es原始記錄
?mysql更新一條記錄
UPDATE `product_view` SET user_id=100,product_id=101 WHERE id=2;
變更后es記錄
?
?4)修改表結(jié)構(gòu)
1、flink-sql不支持alter語句,因此flink-sql創(chuàng)建的source,sink表也不支持結(jié)構(gòu)的修改。
2、當(dāng)mysql源表增加列時,flink創(chuàng)建的source,sink表結(jié)構(gòu)都不會發(fā)生改變,job會忽略新增的列仍繼續(xù)執(zhí)行同步任務(wù),不會報錯。
3、當(dāng)mysql源表刪除job中正在同步的列時,job會報如下錯誤:time是手動刪除的字段。并且手動增加time列后job也不能恢復(fù)正常。
?
7,故障恢復(fù)
1)背景
當(dāng)提交的job出錯時會導(dǎo)致同步任務(wù)中斷,重啟job會導(dǎo)致同步任務(wù)從頭開始,如果源表數(shù)據(jù)量巨大會耗費大量時間和資源,如何能以最小的代價快速恢復(fù)呢?這就要用到flink的重啟策略和checkpoint。
2)相關(guān)說明
官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/execution/task_failure_recovery/
Flink 作業(yè)如果沒有定義重啟策略,則會遵循集群啟動時加載的默認重啟策略。 如果提交作業(yè)時設(shè)置了重啟策略,該策略將覆蓋掉集群的默認策略。
通過 Flink 的配置文件?flink-conf.yaml
?來設(shè)置默認的重啟策略。配置參數(shù)?restart-strategy?定義了采取何種策略。 如果沒有啟用 checkpoint,就采用“不重啟”策略。如果啟用了 checkpoint 且沒有配置重啟策略,那么就采用固定延時重啟策略fixed-delay,默認值:重啟次數(shù)Integer.MAX_VALUE
?,重啟間隔1s。
3)重啟場景模擬
不開啟checkpoint就不會開啟重啟策略。
提交job后關(guān)掉mysql,job報如下錯誤:
?開啟checkpoint就會自動開啟默認的重啟策略。
設(shè)置命令:
//每 3 秒做一次 checkpoint,用于測試,生產(chǎn)配置建議5到10分鐘
set execution.checkpointing.interval = 3s;
提交job后關(guān)掉mysql,job一只在重啟,重啟mysql后job自動恢復(fù)running狀態(tài)。
?4)checkpoint模擬
當(dāng)任務(wù)失敗并且重啟也失敗(job徹底掛掉)或任務(wù)被取消掉,此時可以從checkpoint中快速恢復(fù)最近的狀態(tài)。
默認checkpoint的存儲有3種方式
1、內(nèi)存中,當(dāng)job失敗無法從checkpoint中恢復(fù),感覺沒啥鳥用。
2、持久化保存checkpoint,最好使用hdfs,我這里為了方便采用的是本地目錄存儲。
3、rockdb數(shù)據(jù)庫,沒有研究,有興趣的朋友自行研究。
設(shè)置命令:
// 設(shè)置本地存儲目錄
set state.checkpoints.dir = file:///tmp/checkpoint-dir;
//當(dāng)作業(yè)手動取消時,將會保留作業(yè)的 Checkpoint 狀態(tài)信息。注意,這種情況下,需要手動清除該作業(yè)保留的 Checkpoint 狀態(tài)信息,否則這些狀態(tài)信息將永遠保留在外部的持久化存儲中。
//為方便測試,本例采用此種方式
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
//僅當(dāng)作業(yè)失敗時,作業(yè)的 Checkpoint 才會被保留用于任務(wù)恢復(fù)。當(dāng)作業(yè)取消時,Checkpoint 狀態(tài)信息會被刪除,因此取消任務(wù)后,不能從 Checkpoint 位置進行恢復(fù)任務(wù)。
set execution.checkpointing.externalized-checkpoint-retention=DELETE_ON_CANCELLATION;
// 重置指定參數(shù)
reset execution.checkpointing.externalized-checkpoint-retention;
// 設(shè)置并行度
set parallelism.default = 1;
// 重置全部參數(shù)
reset;
提交job,手動取消job后,查看最近的checkpoint。
?從此最近的checkpoint文件中可恢復(fù)job
設(shè)置命令:
//設(shè)置要恢復(fù)的checkpoint
set 'execution.savepoint.path'='file:/tmp/checkpoint-dir/0bdf06a6e699f87b6703a39df6ea8b5d/chk-3';
//執(zhí)行原來的任務(wù)sql,是所有的sql,包括建表語句。最好提交剛開始提交任務(wù)時就用文件的方式,這樣方便恢復(fù)。
insert into product_view_sink select a2.* from product_view_source a1 inner join product_view_source2 a2 on a1.id=a2.id;
?從checkpoint中恢復(fù)時,新提交的任務(wù)checkpoint狀態(tài)變化如下:
表示從要恢復(fù)的job的id=118號開始恢復(fù),新job的id=舊job的結(jié)束id+新job已創(chuàng)建的id,123=118+5
?實操模擬:
1、源mysql中的數(shù)據(jù)如下:
? 2、開啟checkpoint,設(shè)置存儲目錄,保留取消任務(wù)時checkpoint文件,提交job。
? 3、查詢es中同步的數(shù)據(jù),和mysql中記錄完全一樣
? 4、取消job,刪除es中的索引
?
? 4、修改、添加mysql中數(shù)據(jù)
? 5、flink-sql設(shè)置恢復(fù)checkpoint,并重新提交任務(wù)
? 6、觀察es中的數(shù)據(jù)發(fā)現(xiàn)取消job前的數(shù)據(jù)沒有了,只有取消后新增的2條數(shù)據(jù)和修改的那條數(shù)據(jù)。這說明不是從頭開始同步的,是從checkpoint中恢復(fù)出來的。
?
8,任務(wù)文件模板
1)任務(wù)模板
提交任務(wù),最好以文件的方式提交,方便后續(xù)job的恢復(fù)。
-- 設(shè)置job名稱
set pipeline.name = totalTask;
-- 設(shè)置checkpoint時間間隔
set execution.checkpointing.interval = 300s;
-- 設(shè)置checkpoint持久化目錄
set state.checkpoints.dir = file:///tmp/checkpoint-dir;
-- 設(shè)置當(dāng)取消job時仍保留checkpoint文件
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
建表語句source表
建表語句sink表
BEGIN STATEMENT SET;
insert同步語句
END;?
9,斷點續(xù)傳
job失敗后恢復(fù)步驟如下:
1、找到最新的checkpoint;
2、set 'execution.savepoint.path'='xxxxx';
3、執(zhí)行本原來任務(wù)sql文件;
10,遇到的問題
1)slots不足
flink默認taskmanager.numberOfTaskSlots=1即只能運行一個子任務(wù),一般設(shè)置為機器的CPU核心數(shù)。
2)內(nèi)存不足
任務(wù)失敗,可能由于內(nèi)存不足導(dǎo)致,一般可調(diào)節(jié)flink-config.yaml以下配置,如內(nèi)存需求很大可以按照b來配置。
a.最簡單的總內(nèi)存配置,會按照默認比例給具體的內(nèi)存項分配。
b.給具體的內(nèi)存項分配
jobmanager.memory.flink.size: 2048m
taskmanager.memory.task.heap.size : 2048m
taskmanager.memory.managed.size : 2048m
?
?
3)重復(fù)server-id
?
1、前提:單個提交job任務(wù),即每個insert語句形成一個job,就是一個同步任務(wù)。
結(jié)論:通過實踐可知,所有具有相同server-id的source表,只能選擇其中一個且被一個job使用。實用性很差,只是測試時踩的坑記錄一下。
場景:假如source1和source2表具有相同的server-id,如果job1中使用了source1(不能同時使用source1和source2),那其他job就不能在用source1、source2了。
分析:先提交一個job1并且已經(jīng)在同步了,此時如果提交的job2中有source表與job1中source表有相同的server-id,或job2中使用和job1中重復(fù)的source表,那job2也從job1已經(jīng)讀到的binlog位置開始讀就會有問題,直接報如下錯誤。
2、前提:批量提交同步任務(wù),即將多個insert語句放在一起形成一個job,一個insert對應(yīng)一個同步任務(wù),一個job包含多個同步任務(wù)。
結(jié)論:通過實踐可知,不同的同步任務(wù)(即不同的insert語句)可以使用同一個source表,但不建議共享,可能造成數(shù)據(jù)丟失。但一個同步任務(wù)不能使用相同的server-id的source表。
場景:假如source1和source2表具有相同的server-id,如果任務(wù)1使用source1(不能同時使用source1和source2),其他任務(wù)還可以使用source1。
分析:同時提交任務(wù)1和任務(wù)2并且都使用到了source1,等于2個任務(wù)共同維護source1的binlog狀態(tài),此時可能導(dǎo)致某個任務(wù)從錯誤的binlog位置讀取數(shù)據(jù),從而導(dǎo)致數(shù)據(jù)丟失。
最佳實踐:一個同步任務(wù)中(一個insert語句)使用到的每個source表都對應(yīng)一個不同server-id。同一個source表如在多個job或任務(wù)中使用,就在每個job或每個任務(wù)中設(shè)置不同的表名及server-id。這樣對于相同的source表在每個job或任務(wù)中都各自維護一份binlog狀態(tài)了。
舉例說明:如order表需要在3個job或同步任務(wù)中使用,job1中name=order1,server-id=5401;job2中name=order2,server-id=5402;job3中name=order3,server-id=5403;這樣3個job就會各自維護各自的關(guān)于order表的binlog狀態(tài)。
4)數(shù)據(jù)不一致
問題:同步完成后通過es查詢索引的數(shù)據(jù)量和mysql查詢的數(shù)據(jù)量不一致。
問題定位:通過在flink的sql-client.sh查詢發(fā)現(xiàn)和mysql查詢的數(shù)據(jù)不一致,定位到是flink的sql和mysql的sql語法存在區(qū)別。
?flink的sql-client.sh查詢select * from product01_source where (test_char is NULL or test_char = '2');
?
?通過以上模擬,可以發(fā)現(xiàn)mysql查詢的結(jié)果和flink查詢的結(jié)果不一致。
問題解決:最初是想通過union來合并兩個條件的結(jié)果來解決,但總感覺還有其他解決方法,最終發(fā)現(xiàn)是flink的版本bug導(dǎo)致,將原來的flink-1.13.6換成1.14.6,并將flink-sql-connector-mysql-cdc-1.4.0.jar換成flink-sql-connector-mysql-cdc-2.3.0.jar,問題得以解決。
問題鏈接:https://issues.apache.org/jira/browse/FLINK-22015
?遇到的新問題:flink升級為1.14.6版本后雖然解決了數(shù)據(jù)一致性的問題,但是當(dāng)提交多個復(fù)雜(join的表多)的job的時候,自測的時候提交2-3個時就會卡死,一直提交不了。報連接池連接不可用超時。
?問題解決:給source表加上? ?'connection.pool.size'?=?'200'? 參數(shù),增加連接數(shù)量即可。文章來源:http://www.zghlxwxcb.cn/news/detail-638267.html
總結(jié):至此可正常提交job并保證數(shù)據(jù)的一致性。文章來源地址http://www.zghlxwxcb.cn/news/detail-638267.html
到了這里,關(guān)于flink-cdc同步mysql數(shù)據(jù)到elasticsearch的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!