目錄
導(dǎo)言
Paimon CDC
Demo 說明
Demo 準(zhǔn)備
Demo 開始
總結(jié)
導(dǎo)言
MongoDB 是一個(gè)比較成熟的文檔數(shù)據(jù)庫(kù),在業(yè)務(wù)場(chǎng)景中,通常需要采集 MongoDB 的數(shù)據(jù)到數(shù)據(jù)倉(cāng)庫(kù)或數(shù)據(jù)湖中,面向分析場(chǎng)景使用。
Flink MongoDB CDC 是 Flink CDC 社區(qū)提供的一個(gè)用于捕獲變更數(shù)據(jù)(Change Data Capturing)的 Flink 連接器,可連接到 MongoDB 數(shù)據(jù)庫(kù)和集合,并捕獲其中的文檔增加、更新、替換、刪除等變更操作。
Apache Paimon (incubating) 是一項(xiàng)流式數(shù)據(jù)湖存儲(chǔ)技術(shù),可以為用戶提供高吞吐、低延遲的數(shù)據(jù)攝入、流式訂閱以及實(shí)時(shí)查詢能力。
Paimon CDC
Paimon CDC 是整合了 Flink CDC、Kafka、Paimon 的入湖工具,幫助你更好更方便的完成一鍵入湖。
你可以通過?Flink SQL?或者?Flink DataStream API?將?Flink CDC?數(shù)據(jù)寫入?Paimon?中,也可以通過Paimon?提供的?CDC?工具來完成入湖。那這兩種方式有什么區(qū)別呢?
上圖是使用?Flink SQL?來完成入湖,簡(jiǎn)單,但是當(dāng)源表添加新列后,同步作業(yè)不會(huì)同步新的列,下游?Paimon?表也不會(huì)增加新列。
上圖是使用?Paimon CDC?工具來同步數(shù)據(jù),可以看到,當(dāng)源表發(fā)生列的新增后,流作業(yè)會(huì)自動(dòng)新增列的同步,并傳導(dǎo)到下游的?Paimon?表中,完成?Schema Evolution?的同步。
另外?Paimon CDC?工具也提供了整庫(kù)同步:
整庫(kù)同步可以幫助你:
-
一個(gè)作業(yè)同步多張表,以低成本的方式同步大量小表
-
作業(yè)里同時(shí)自動(dòng)進(jìn)行 Schema Evolution
-
新表將會(huì)被自動(dòng)進(jìn)行同步,你不用重啟作業(yè),全自動(dòng)完成
Demo 說明
你可以跟隨?Demo?步驟體驗(yàn)?Paimon CDC?的全自動(dòng)同步之旅,Demo?展示同步?Mongo DB?的數(shù)據(jù)到Paimon?中,如下圖。
以下的?Demo?使用?Flink?來完成入湖,使用?Spark SQL?來查詢,當(dāng)然你可以使用?Flink SQL?來查詢,或者使用其它計(jì)算引擎,包括?Trino、Presto、StarRocks、Doris?、Hive?等等。
Demo 準(zhǔn)備
步驟一:
首先下載?MongoDB Community Server,免費(fèi)版,不用交錢。
https://www.mongodb.com/try/download/community
啟動(dòng)?MongoDB Server:
mkdir?/tmp/mongodata?
./mongod --replSet rs0 --dbpath /tmp/mongodata
注意:這里開啟了replSet,詳見?MongoDB?文檔,只有開啟了?replSet?的庫(kù)才會(huì)產(chǎn)生?changelog,也就才會(huì)被?Flink Mongo CDC?可以增量讀取?CDC?數(shù)據(jù)。
步驟二:
下載?MongoDB Shell:
https://www.mongodb.com/try/download/shell
并啟動(dòng):
./mongosh
另外需要初始化?replSet,否者?MongoDB Server?會(huì)一直報(bào)錯(cuò)。
rs.initiate()
步驟三:
下載?Flink,請(qǐng)到官網(wǎng)下載最新?Flink:
https://www.apache.org/dyn/closer.lua/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
并依次下載以下?Jars?到?Flink?的?lib?目錄中:
paimon-flink-1.18-0.6-*.jar,paimon-flink?集成?Jar:
https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.18/0.6-SNAPSHOT/
flink-shaded-hadoop-*.jar,Paimon?需要?hadoop?相關(guān)依賴:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flink-sql-connector-mongodb-cdc-*.jar:
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.4.2/flink-sql-connector-mongodb-cdc-2.4.2.jar
在?flink/conf/flink-conf.yaml?文件中設(shè)置?checkpoint?間隔:
execution.checkpointing.interval: 10 s
生產(chǎn)中不推薦使用此間隔,太快會(huì)產(chǎn)生大量文件導(dǎo)致?Cost?上升,一般推薦的?Checkpoint?間隔是?1 - 5?分鐘。
啟動(dòng)?Flink?集群:
./bin/start-cluster.sh
啟動(dòng)?Flink?同步任務(wù):
./bin/flink?run?lib/paimon-flink-action-0.6-*.jar
mongodb-sync-database?
--warehouse?/tmp/warehouse1
???--database?test
???--mongodb-conf?hosts=127.0.0.1:27017
???--mongodb-conf?database=test
???--table-conf?bucket=1
參數(shù)說明:
-
Warehouse 指定 paimon 所在文件系統(tǒng)目錄,如你有 HDFS 集群或者對(duì)象存儲(chǔ),可以替換成你的目錄。
-
MongoDB 相關(guān)配置,如有密碼,請(qǐng)?zhí)顚懨艽a。
-
最后指定 bucket 個(gè)數(shù),目前整庫(kù)同步只支持了固定 Bucket 的表,如有特殊需求,可以修改個(gè)別表的 Bucket 個(gè)數(shù)。
可以看到,作業(yè)已成功啟動(dòng),拓?fù)渲饕齻€(gè)節(jié)點(diǎn):
-
Source:Flink MongoDB CDC Source,并完成 Schema Evolution 和自動(dòng)加表。
-
CDC MultiplexWriter:復(fù)雜多個(gè)表的 Paimon 表 Writer,自動(dòng)動(dòng)態(tài)加表。
-
Multiplex Global Committer:兩階段提交的文件提交節(jié)點(diǎn)。
Writer 和 Committer 都有可能成為瓶頸,Writer 和 Committer 的并發(fā)都可以通過 Flink 的配置影響。
你可以考慮打開全異步模式來避免 Writer 的 Compaction 瓶頸:
https://paimon.apache.org/docs/master/maintenance/write-performance/#asynchronous-compaction
步驟四:
下載?Spark,請(qǐng)到官網(wǎng)下載最新版本:
https://spark.apache.org/downloads.html
下載?Paimon Spark?集成?Jar:
https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-spark-3.5/0.6-SNAPSHOT/
啟動(dòng)?Spark SQL:
./bin/spark-sql?
??--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog
??--conf?spark.sql.catalog.paimon.warehouse=file:/tmp/warehouse1
使用?Paimon Catalog,指定?Database:
USE?paimon;
USE rs0;
Demo 開始
步驟一:
我們首先測(cè)試下寫入的數(shù)據(jù)可以被成功讀取到。
我們先給?MongoDB?插入一條數(shù)據(jù):
db.orders.insertOne({id: 1, price: 5})
然后我們?cè)?Spark SQL?里查詢:
可以看到這條數(shù)據(jù)被同步到?Paimon?里,并且可以看到?orders?表的?Schema?里多了一列?“_id”,這列是MongoDB?自動(dòng)生成的隱含的主鍵。
步驟二:
我們?cè)賮砜纯锤率侨绾伪煌降摹?/p>
在?Mongo Shell?里更新下數(shù)據(jù):
db.orders.update({id: 1}, {$set: { price: 8 }})
Spark?里查詢:
數(shù)據(jù)的?price?被更新為?8
步驟三:
我們?cè)賮砜纯刺砑幼侄蔚耐角闆r。
在?Mongo Shell?里新插入一條數(shù)據(jù),多了一列:
db.orders.insertOne({id: 2, price: 6, desc: “haha”})
Spark?里查詢:
可以看到,Paimon?對(duì)應(yīng)的表里已經(jīng)新增了一列,查詢數(shù)據(jù)顯示,老的數(shù)據(jù)默認(rèn)值為?NULL。
步驟四:
我們?cè)賮砜纯葱略霰淼耐角闆r。
在?Mongo Shell?里新插入一張表的數(shù)據(jù):
db.brands.insertOne({id: 1, brand: “NBA”})
Spark?里查詢:
Paimon?里已經(jīng)自動(dòng)多出來一張表,數(shù)據(jù)也被同步過來。
總結(jié)
通過上面的操作你感受到了嗎,通過 Paimon CDC 的入湖程序可以讓你全自動(dòng)的同步業(yè)務(wù)數(shù)據(jù)庫(kù)到?Paimon?里,數(shù)據(jù)、Schema Evolution、新增表,全部被自動(dòng)完成,你只用管好這一個(gè)?Flink?作業(yè)即可。這套入湖程序已經(jīng)被部署到各行各業(yè),各個(gè)公司里,給業(yè)務(wù)數(shù)據(jù)帶來非常方便的鏡像到湖存儲(chǔ)里面的能力。
更有其它數(shù)據(jù)源等你來體驗(yàn):Mysql、Kafka、MongoDB、Pulsar、PostgresSQL。
Paimon?的長(zhǎng)期使命包括:
-
極致易用性、高性能的數(shù)據(jù)入湖,方便的湖存儲(chǔ)管理,豐富生態(tài)的查詢。
-
方便的數(shù)據(jù)流讀,與?Flink?生態(tài)的良好集成,給業(yè)務(wù)帶來1分鐘新鮮度的數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-809360.html
-
加強(qiáng)的?Append?數(shù)據(jù)處理,時(shí)間旅行、數(shù)據(jù)排序帶來高效的查詢,升級(jí)?Hive?數(shù)倉(cāng)。文章來源地址http://www.zghlxwxcb.cn/news/detail-809360.html
到了這里,關(guān)于Flink 內(nèi)容分享(二十一):通過Flink CDC一鍵整庫(kù)同步MongoDB到Paimon的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!