考大家一個(gè)問(wèn)題,如果想要把數(shù)據(jù)庫(kù)的數(shù)據(jù)同步到別的地方,比如es,mongodb,大家會(huì)采用哪些方案呢? :::
-
定時(shí)掃描同步?
-
實(shí)時(shí)日志同步?
定時(shí)同步是一個(gè)很好的方案,比較簡(jiǎn)單,但是如果對(duì)實(shí)時(shí)要求比較高的話(huà),定時(shí)同步就有點(diǎn)不合適了。今天給大家介紹一種實(shí)時(shí)同步方案,就是是使用flinkcdc 來(lái)讀取數(shù)據(jù)庫(kù)日志,并且寫(xiě)入到elasticsearch中。
1.什么是flinkcdc?
Flink CDC(Change Data Capture)是指通過(guò) Apache Flink 實(shí)現(xiàn)的一種數(shù)據(jù)變化捕獲技術(shù)。CDC 可以實(shí)時(shí)捕獲數(shù)據(jù)庫(kù)中的數(shù)據(jù)變化,如插入、更新、刪除操作,并將這些變化數(shù)據(jù)流式地傳輸?shù)狡渌到y(tǒng)或存儲(chǔ)中。通過(guò) Flink CDC,用戶(hù)可以實(shí)時(shí)監(jiān)控?cái)?shù)據(jù)庫(kù)中的數(shù)據(jù)變化,并將這些變化數(shù)據(jù)用于實(shí)時(shí)分析、ETL(Extract, Transform, Load)等應(yīng)用場(chǎng)景。Flink CDC 通常用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道,幫助用戶(hù)實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步和分析。
2.flinkcdc發(fā)展趨勢(shì)?
目前在github 上大概有5k 的star,也有越來(lái)越多的人使用。
3.flinkcdc有什么優(yōu)勢(shì)?
說(shuō)到實(shí)時(shí)同步,canal 是比較常用的方案
canal,譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。 這句介紹有幾個(gè)關(guān)鍵字:增量日志,增量數(shù)據(jù)訂閱和消費(fèi)。
canal的把自己偽裝成MySQL slave,模擬MySQL slave的交互協(xié)議向MySQL Mater發(fā)送 dump協(xié)議,MySQL mater收到canal發(fā)送過(guò)來(lái)的dump請(qǐng)求,開(kāi)始推送binary log給canal,然后canal解析binary log,再發(fā)送到存儲(chǔ)目的地,比如MySQL,Kafka,Elastic Search等等。
那么 flinkcdc 和canal 對(duì)比,有什么不同呢?
這是網(wǎng)上的一個(gè)對(duì)比??梢钥吹?flinkcdc 和canal 一樣,也是通過(guò)讀取數(shù)據(jù)庫(kù)日志的方式做到實(shí)時(shí)同步的,這個(gè)和很多實(shí)時(shí)同步的工具原理相同,比如 ogg debezium 都是這樣做的,flinkcdc 的優(yōu)勢(shì)是基于flink 這個(gè)強(qiáng)大的實(shí)時(shí)計(jì)算引擎,可以做到集群部署,高可用等等,并且社區(qū)活躍,支持的平臺(tái)多,像 mysql oracle mongodb 主流數(shù)據(jù)庫(kù)都是支持的。而canal只支持mysql。
還有一個(gè)優(yōu)勢(shì),flinkcdc 是基于java實(shí)現(xiàn)的,背靠大數(shù)據(jù)這個(gè)大平臺(tái),解決方案也是比較多的。源碼閱讀修改起來(lái)也是比較方便的。
4.一個(gè)例子
光說(shuō)不練假把式,簡(jiǎn)單的寫(xiě)一個(gè)把mysql 數(shù)據(jù)實(shí)時(shí)同步到es的例子,使用flinksql的方式,只需要簡(jiǎn)單的幾行sql
依賴(lài)
flink-1.15.0
flink-sql-connector-elasticsearch7-1.15.0.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
mysql 5.7
es 7.9.3
安裝好flink 之后,把 flink-sql-connector-elasticsearch7-1.15.0.jar flink-sql-connector-mysql-cdc-2.2.1.jar 上傳到 flink lib 目錄 啟動(dòng)flink
./start-cluster.sh
打開(kāi)flink sql 窗口
./start-cluster.sh
創(chuàng)建和mysql 關(guān)聯(lián)的表
CREATE?TABLE?products?(
????id?INT,
????name?STRING,
????description?STRING,
????PRIMARY?KEY?(id)?NOT?ENFORCED
??)?WITH?(
????'connector'?=?'mysql-cdc',
????'hostname'?=?'localhost',
????'port'?=?'3306',
????'username'?=?'root',
????'password'?=?'123456',
????'database-name'?=?'mydb',
????'table-name'?=?'products'
??);
CREATE?TABLE?orders?(
???order_id?INT,
???order_date?TIMESTAMP(0),
???customer_name?STRING,
???price?DECIMAL(10,?5),
???product_id?INT,
???order_status?BOOLEAN,
???PRIMARY?KEY?(order_id)?NOT?ENFORCED
?)?WITH?(
???'connector'?=?'mysql-cdc',
???'hostname'?=?'localhost',
???'port'?=?'3306',
???'username'?=?'root',
???'password'?=?'123456',
???'database-name'?=?'mydb',
???'table-name'?=?'orders'
?);
創(chuàng)建和es 同步的表
CREATE?TABLE?enriched_orders?(
???order_id?INT,
???order_date?TIMESTAMP(0),
???customer_name?STRING,
???price?DECIMAL(10,?5),
???product_id?INT,
???order_status?BOOLEAN,
???product_name?STRING,
???product_description?STRING,
???PRIMARY?KEY?(order_id)?NOT?ENFORCED
?)?WITH?(
?????'connector'?=?'elasticsearch-7',
?????'hosts'?=?'http://192.168.91.134:9200',
?????'index'?=?'enriched_orders'
?);
創(chuàng)建讀取mysql寫(xiě)入es任務(wù)
INSERT?INTO?enriched_orders
?SELECT?o.*,?p.name,?p.description
?FROM?orders?AS?o
?LEFT?JOIN?products?AS?p?ON?o.product_id?=?p.id;
執(zhí)行這個(gè)任務(wù)后,mysql 的數(shù)據(jù)就能實(shí)時(shí)同步至es了
當(dāng)然數(shù)據(jù)源也是支持很多種,比如 oracle mongodb sqlserver 寫(xiě)入端也支持 es kafka hive 等等,看大家需要。想我們的業(yè)務(wù)場(chǎng)景,是先將mysql 數(shù)據(jù)同步到kafka,然后再消費(fèi)kafka 消息,把數(shù)據(jù)寫(xiě)入到es, hive,starrocks 等等。并且使用了checkpoint 做為斷點(diǎn)恢復(fù)的保障。
5.最后
附上一些涉及的到網(wǎng)址,方便大家查閱
flinkcdc 官網(wǎng)?
flinkcdc github
flink 官網(wǎng)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-769804.html
flink 文檔文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-769804.html
到了這里,關(guān)于FlinkCDC數(shù)據(jù)實(shí)時(shí)同步Mysql到ES的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!