本文介紹了? 來源單表->目標(biāo)源單表同步,多來源單表->目標(biāo)源單表同步。
注:1.16版本、1.17版本都可以使用火焰圖,生產(chǎn)上最好關(guān)閉,詳情見文章末尾
Flink版本:1.16.2
環(huán)境:Linux CentOS 7.0、jdk1.8
基礎(chǔ)文件:
flink-1.16.2-bin-scala_2.12.tgz、
flink-connector-jdbc-3.0.0-1.16.jar、(maven倉(cāng)庫目錄:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)
flink-sql-connector-mysql-cdc-2.3.0.jar、(maven倉(cāng)庫目錄:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安裝Flink步驟詳見文章第二篇
?
Flink版本:1.17.1
環(huán)境:Linux CentOS 7.0、jdk1.8
基礎(chǔ)文件:
flink-1.17.1-bin-scala_2.12.tgz、
flink-connector-jdbc-3.0.0-1.16.jar、(maven倉(cāng)庫目錄:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)
flink-sql-connector-mysql-cdc-2.3.0.jar、(maven倉(cāng)庫目錄:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
支持的mysql版本:?
本次測(cè)試使用版本flink1.16.2?
一、?來源單表->目標(biāo)源單表同步:數(shù)據(jù)源ip為***.50的源表,同步數(shù)據(jù)到數(shù)據(jù)源ip為***.134的目標(biāo)表中,需要以下幾個(gè)步驟:
1. 啟動(dòng)flink服務(wù):
[root@localhost bin]#? ./start-cluster.sh
2. 停止flink服務(wù):
[root@localhost bin]#? ./stop-cluster.sh
3. 啟動(dòng)FinkSQL:
[root@localhost bin]# ./sql-client.sh
4. 編寫FlinkSql,創(chuàng)建臨時(shí)表和job:
FlinkSql與mysql字段的類型映射
?把寫好的Sql粘貼到FlinkSql客戶端命令行中,分號(hào)'? ;? '是語句結(jié)束標(biāo)識(shí)符,按回車創(chuàng)建:
?創(chuàng)建來源表結(jié)構(gòu):
來源表鏈接類型為'connector' = 'mysql-cdc'
Flink SQL> CREATE TABLE source_alarminfo51 (
> ? id STRING NOT NULL,
> ? AlarmTypeID STRING,
> ? `Time` timestamp,
> ? PRIMARY KEY (`id`) NOT ENFORCED
> ?) WITH (
> ? ? 'connector' = 'mysql-cdc',
> ? ? 'hostname' = '***',
> ? ? 'port' = '3306',
> ? ? 'username' = '***',
> ? ? 'password' = '***',
> ? ? 'database-name' = 'alarm',
> ? ? 'server-time-zone' = 'Asia/Shanghai',
> ? ? 'table-name' = 'alarminfo'
> ?);[INFO] Execute statement succeed.
?創(chuàng)建目標(biāo)表結(jié)構(gòu)(目標(biāo)表結(jié)構(gòu)可比來源表字段多,可使用視圖指定字段默認(rèn)值):
目標(biāo)表鏈接類型為'connector' = 'jdbc',注意url需要跟后面以下屬性值
Flink SQL> CREATE TABLE target_alarminfo134 (
> ? id STRING NOT NULL,
> ? AlarmTypeID STRING,
> ? `Time` timestamp,
> ? sourceLine int,
> ? PRIMARY KEY (`id`) NOT ENFORCED
> ?) WITH (
> ? ? 'connector' = 'jdbc',
> ? ? 'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
> ? ? 'username' = '***',
> ? ? 'password' = '****',
> ? ? 'table-name' = 'alarminfo',
> ? ? 'driver' = 'com.mysql.cj.jdbc.Driver',
> ? ? 'scan.fetch-size' = '1000'
> ?);[INFO] Execute statement succeed.
?'scan.fetch-size' = '1000'? 的含義:
?在 Flink SQL 中,scan.fetch-size 屬性用于配置批處理查詢中的每次批量獲取記錄的大小。具體地說,它指定了每次從數(shù)據(jù)源讀取的記錄數(shù)。
如何設(shè)置:
以下是一些建議:
考慮數(shù)據(jù)源的吞吐量:如果你的數(shù)據(jù)源的吞吐量較高,網(wǎng)絡(luò)延遲較低,可以適當(dāng)增大 scan.fetch-size 的值,以減少網(wǎng)絡(luò)往返次數(shù)和請(qǐng)求開銷。
考慮網(wǎng)絡(luò)環(huán)境和帶寬限制:如果數(shù)據(jù)源位于遠(yuǎn)程服務(wù)器或網(wǎng)絡(luò)環(huán)境較差,可以選擇適當(dāng)較小的 fetch size 值,以減少網(wǎng)絡(luò)傳輸?shù)呢?fù)載,避免出現(xiàn)大量的網(wǎng)絡(luò)超時(shí)和傳輸失敗情況。
考慮內(nèi)存開銷:fetch size 值過大可能會(huì)占用較多的內(nèi)存資源,特別是對(duì)于批處理查詢。如果你的查詢涉及大量的中間狀態(tài)(intermediate state)或內(nèi)存密集型操作,可以選擇適當(dāng)較小的 fetch size 值。
一般來說,可以先嘗試將 scan.fetch-size 設(shè)置為一個(gè)較默認(rèn)的值,例如 1000 或 5000。然后觀察任務(wù)的性能和執(zhí)行效果,根據(jù)實(shí)際情況進(jìn)行微調(diào)??梢愿鶕?jù)實(shí)際性能測(cè)試和系統(tǒng)資源情況,逐步調(diào)整 fetch size 值,以找到性能和資源利用的平衡點(diǎn)。
需要注意的是,scan.fetch-size 屬性值是一個(gè)相對(duì)的配置,不同的數(shù)據(jù)源和查詢場(chǎng)景可能有不同的最佳值。因此,針對(duì)具體的數(shù)據(jù)源和查詢條件,最好進(jìn)行一些實(shí)際的性能測(cè)試和調(diào)優(yōu),以獲得最優(yōu)的性能和資源使用。
?最后創(chuàng)建同步關(guān)系:
INSERT INTO target_alarminfo134 SELECT *,50 AS sourceLine FROM source_alarminfo50
?若目標(biāo)表比源表結(jié)構(gòu)少字段屬性則執(zhí)行完同步關(guān)系后如下:
創(chuàng)建完表結(jié)構(gòu)可使用下列語句查看和刪除:
查看表:show tables;
刪除表:drop table if exists ?target_alarminfo;?
flink-UI頁面效果:
?
數(shù)據(jù)同步效果:
源表:
目標(biāo)表數(shù)據(jù):首次數(shù)據(jù)全量,后面數(shù)據(jù)變更增量?
二、?多來源單表->目標(biāo)源單表同步:數(shù)據(jù)源ip為***.50、***.51的兩個(gè)源表,同步數(shù)據(jù)到數(shù)據(jù)源ip為***.134的目標(biāo)表中,使用sourceLine 用于區(qū)分?jǐn)?shù)據(jù)來源,需要以下幾個(gè)步驟:
?1. 創(chuàng)建自定義初始化腳本文件?init.sql、flinkSqlInit.sql,flinkSqlInit.sql文件中包含了在FlinkSql中需要執(zhí)行的語句,用于自動(dòng)化創(chuàng)建臨時(shí)表和視圖,這兩個(gè)放在flink的bin目錄下:
init.sql內(nèi)容如下:
SET execution.runtime-mode=streaming;
SET pipeline.name=my_flink_job;
SET parallism.default=4;
?SET execution.runtime-mode=streaming 設(shè)置了作業(yè)的運(yùn)行模式為流處理模式。這表示作業(yè)將以流處理的方式運(yùn)行,即實(shí)時(shí)處理每個(gè)輸入事件,并根據(jù)輸入數(shù)據(jù)的到達(dá)順序進(jìn)行處理。
SET pipeline.name=my_flink_job 設(shè)置了作業(yè)的流水線名稱為 "my_flink_job"。流水線名稱主要用于標(biāo)識(shí)作業(yè),以便在運(yùn)行時(shí)進(jìn)行管理和監(jiān)控。
SET parallelism.default=4 設(shè)置了作業(yè)的默認(rèn)并行度為 4。并行度表示同時(shí)執(zhí)行作業(yè)任務(wù)的任務(wù)數(shù)量。通過設(shè)置并行度,可以控制作業(yè)在集群上使用的資源量和執(zhí)行的并行度。默認(rèn)并行度將應(yīng)用于作業(yè)的所有算子,除非為某個(gè)算子單獨(dú)指定了并行度。
這些設(shè)置屬性可以在 Flink 的初始化腳本中使用,并在作業(yè)啟動(dòng)時(shí)生效。可以根據(jù)作業(yè)的需求和資源情況調(diào)整這些屬性,以獲得最佳的性能和資源利用率。
注:mysql-cdc和jdbc的區(qū)別:mysql-cdc 標(biāo)注 數(shù)據(jù)來源的表,jdbc標(biāo)注 同步到的目標(biāo)表
flinkSqlInit.sql內(nèi)容如下:
SET execution.checkpointing.interval = 60s;
drop table if exists ?source_alarminfo50;
CREATE TABLE source_alarminfo50 (
? id STRING NOT NULL,
? AlarmTypeID STRING,
? `Time` timestamp,
? PRIMARY KEY (`id`) NOT ENFORCED
?) WITH (
? ? 'connector' = 'mysql-cdc',
? ? 'hostname' = '**',
? ? 'port' = '3306',
? ? 'username' = '**',
? ? 'password' = '**',
? ? 'database-name' = 'alarm',
? ? 'server-time-zone' = 'Asia/Shanghai',
? ? 'table-name' = 'alarminfo'
?);
drop table if exists ?source_alarminfo51;
CREATE TABLE source_alarminfo51 (
? id STRING NOT NULL,
? AlarmTypeID STRING,
? `Time` timestamp,
? PRIMARY KEY (`id`) NOT ENFORCED
?) WITH (
? ? 'connector' = 'mysql-cdc',
? ? 'hostname' = '**',
? ? 'port' = '3306',
? ? 'username' = '**',
? ? 'password' = '**',
? ? 'database-name' = 'alarm',
? ? 'server-time-zone' = 'Asia/Shanghai',
? ? 'table-name' = 'alarminfo'
?);
drop table if exists ?target_alarminfo134;
CREATE TABLE target_alarminfo134 (
? id STRING NOT NULL,
? AlarmTypeID STRING,
? `Time` timestamp,
? sourceLine int,
? PRIMARY KEY (`id`) NOT ENFORCED
?) WITH (
? ? 'connector' = 'jdbc',
? ? 'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
? ? 'username' = '**',
? ? 'password' = '**',
? ? 'table-name' = 'alarminfo',
? ? 'driver' = 'com.mysql.cj.jdbc.Driver',
? ? 'scan.fetch-size' = '200'
?);BEGIN STATEMENT SET;
INSERT INTO target_alarminfo134
SELECT *,50 AS sourceLine FROM source_alarminfo50
UNION ALL
SELECT *,51 AS sourceLine FROM source_alarminfo51;END;
其中涉及flinksql的語法:
BEGIN STATEMENT SET 是 Flink SQL 中的一個(gè)特殊語法,用于將一組 SQL 語句作為一個(gè)事務(wù)進(jìn)行處理。它用于將多個(gè) SQL 語句作為一個(gè)原子操作執(zhí)行,要么全部成功提交,要么全部回滾。
在 Flink SQL 中,可以使用 BEGIN STATEMENT SET 將多個(gè) SQL 語句組合成一個(gè)事務(wù),以確保這些語句的原子性。
以下是 BEGIN STATEMENT SET 的使用示例:
BEGIN STATEMENT SET;
-- SQL 語句 1
-- SQL 語句 2
-- ...
COMMIT;
在上述示例中,BEGIN STATEMENT SET 表示事務(wù)的開始,COMMIT 表示事務(wù)的提交。你可以在 BEGIN STATEMENT SET 和 COMMIT 之間編寫需要執(zhí)行的多個(gè) SQL 語句。
如果在 BEGIN STATEMENT SET 和 COMMIT 之間的任何一條語句執(zhí)行失敗,整個(gè)事務(wù)將回滾,即已經(jīng)執(zhí)行的語句會(huì)被撤銷。
需要注意的是,BEGIN STATEMENT SET 和 COMMIT 語句是 Flink SQL 的擴(kuò)展語法,它們可能在某些特定的 Flink 版本或環(huán)境中才可用。在使用時(shí),請(qǐng)確保你的 Flink 版本和環(huán)境支持該語法。?
檢查點(diǎn)間隔設(shè)置:
SET execution.checkpointing.interval = 60s;
通過設(shè)置適當(dāng)?shù)臋z查點(diǎn)間隔,可以在容忍一定故障的同時(shí),控制檢查點(diǎn)的頻率和資源使用。較短的檢查點(diǎn)間隔可以提供更高的容錯(cuò)性,但也會(huì)增加系統(tǒng)開銷。
檢查點(diǎn)是 Flink 中用于實(shí)現(xiàn)容錯(cuò)性的機(jī)制,它會(huì)定期將作業(yè)的狀態(tài)保存到持久化存儲(chǔ)中,以便在發(fā)生故障時(shí)進(jìn)行恢復(fù)。檢查點(diǎn)間隔定義了兩個(gè)連續(xù)檢查點(diǎn)之間的時(shí)間間隔。?
?2. 重啟Flink服務(wù):
停止flink服務(wù):
[root@localhost bin]#? ./stop-cluster.sh
啟動(dòng)flink服務(wù):
[root@localhost bin]#? ./start-cluster.sh
啟動(dòng)FinkSQL:
[root@localhost bin]# ./sql-client.sh
?3.1 在flink的bin目錄下執(zhí)行初始化文件flinkSqlInit.sql:
有兩種方式:
方式一:可設(shè)置job名稱及資源參數(shù)配置
[root@localhost bin]#? ./sql-client.sh -i init.sql -f flinkSqlInit.sql??
?使用這個(gè)語句的好處是可以根據(jù)作業(yè)的需求和資源情況調(diào)整這些屬性,以獲得最佳的性能和資源利用率。
flink-UI頁面效果:
?
方式二:不可設(shè)置job名稱及資源參數(shù)配置
[root@localhost bin]#? ./sql-client.sh -f ?flinkSqlInit.sql??
?
?4. 數(shù)據(jù)同步效果:
三、打開火焰圖:
編輯flink-conf.yaml:最后面添加?
rest.flamegraph.enabled: true
配置后重啟flink服務(wù),重新創(chuàng)建任務(wù)。
火焰圖效果:
注:
在分析火焰圖時(shí),可以關(guān)注以下幾點(diǎn):
函數(shù)的執(zhí)行時(shí)間:縱向的軸顯示了函數(shù)的嵌套層級(jí),越往下表示越深層的函數(shù)調(diào)用。橫向軸表示時(shí)間,通過不同顏色的方塊來表示函數(shù)的執(zhí)行時(shí)間。
熱點(diǎn)函數(shù):尋找占據(jù)執(zhí)行時(shí)間大部分的函數(shù),這些函數(shù)可能是需要優(yōu)化的關(guān)鍵點(diǎn)。
函數(shù)之間的關(guān)系:觀察函數(shù)之間的調(diào)用關(guān)系,查看是否有不必要的函數(shù)調(diào)用或循環(huán)。
I/O 操作:關(guān)注是否有大量的數(shù)據(jù)讀取、寫入或網(wǎng)絡(luò)通信,這可能是性能瓶頸的來源。
根據(jù)火焰圖的分析結(jié)果,您可以進(jìn)一步定位和排查潛在的性能問題,并在代碼、配置或資源分配方面進(jìn)行優(yōu)化。
請(qǐng)注意,為了準(zhǔn)確分析火焰圖,建議在負(fù)載較高的情況下生成火焰圖,并保持足夠的監(jiān)視時(shí)間。此外,F(xiàn)link 的火焰圖功能在生產(chǎn)環(huán)境中可能會(huì)造成一定的開銷,因此建議在測(cè)試或開發(fā)環(huán)境中使用。
四、源表、目標(biāo)表結(jié)構(gòu)with下的屬性介紹:
源表with下的屬性:
chunk-key.even-distribution.factor.lower-bound:塊鍵(Chunk Key)的均勻分布因子下限。
chunk-key.even-distribution.factor.upper-bound:塊鍵的均勻分布因子上限。
chunk-meta.group.size:塊元數(shù)據(jù)的分組大小。
connect.max-retries:連接重試的最大次數(shù)。
connect.timeout:連接的超時(shí)時(shí)間。
connection.pool.size:連接池的大小。
connector:使用的連接器的名稱。
database-name:數(shù)據(jù)庫的名稱。
heartbeat.interval:心跳間隔時(shí)間。
hostname:主機(jī)名或 IP 地址。
password:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的密碼。
port:連接的端口號(hào)。
property-version:屬性版本。
scan.incremental.snapshot.chunk.key-column:增量快照的塊鍵列。
scan.incremental.snapshot.chunk.size:增量快照的塊大小。
scan.incremental.snapshot.enabled:是否啟用增量快照。
scan.newly-added-table.enabled:是否啟用新加入表的掃描。
scan.snapshot.fetch.size:從狀態(tài)快照中獲取的每次批量記錄數(shù)。
scan.startup.mode:掃描啟動(dòng)模式。
scan.startup.specific-offset.file:指定啟動(dòng)位置的文件名。
scan.startup.specific-offset.gtid-set:指定啟動(dòng)位置的 GTID 集合。
scan.startup.specific-offset.pos:指定啟動(dòng)位置的二進(jìn)制日志位置。
scan.startup.specific-offset.skip-events:跳過的事件數(shù)量。
scan.startup.specific-offset.skip-rows:跳過的行數(shù)。
scan.startup.timestamp-millis:指定啟動(dòng)時(shí)間戳(毫秒)。
server-id:服務(wù)器 ID。
server-time-zone:服務(wù)器時(shí)區(qū)。
split-key.even-distribution.factor.lower-bound:切分鍵(Split Key)的均勻分布因子下限。
split-key.even-distribution.factor.upper-bound:切分鍵的均勻分布因子上限。
table-name:表名。
username:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的用戶名。
Sink目標(biāo)表with下的屬性:
connection.max-retry-timeout:連接重試的最大超時(shí)時(shí)間。
connector:使用的連接器的名稱。
driver:JDBC 連接器中使用的數(shù)據(jù)庫驅(qū)動(dòng)程序的類名。
lookup.cache:查找表的緩存配置。
lookup.cache.caching-missing-key:是否緩存查找表中的缺失鍵。
lookup.cache.max-rows:查找表緩存中允許的最大行數(shù)。
lookup.cache.ttl:查找表緩存中行的生存時(shí)間。
lookup.max-retries:查找操作的最大重試次數(shù)。
lookup.partial-cache.cache-missing-key:是否緩存查找表部分缺失的鍵。
lookup.partial-cache.expire-after-access:查找表部分緩存中行的訪問到期時(shí)間。
lookup.partial-cache.expire-after-write:查找表部分緩存中行的寫入到期時(shí)間。
lookup.partial-cache.max-rows:查找表部分緩存中允許的最大行數(shù)。
password:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的密碼。
property-version:屬性版本。
scan.auto-commit:是否自動(dòng)提交掃描操作。
scan.fetch-size:每次批量獲取記錄的大小。
scan.partition.column:用于分區(qū)的列名。
scan.partition.lower-bound:分區(qū)的下限值。
scan.partition.num:要掃描的分區(qū)數(shù)量。
scan.partition.upper-bound:分區(qū)的上限值。
sink.buffer-flush.interval:將緩沖區(qū)的數(shù)據(jù)刷新到目標(biāo)系統(tǒng)的時(shí)間間隔。
sink.buffer-flush.max-rows:緩沖區(qū)中的最大行數(shù),達(dá)到此值時(shí)將刷新數(shù)據(jù)。
sink.max-retries:寫入操作的最大重試次數(shù)。
sink.parallelism:寫入任務(wù)的并行度。
table-name:表名。
url:連接到數(shù)據(jù)庫或其他系統(tǒng)的 URL。
username:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的用戶名。
?
最后FlinkCDC目前不支持整庫同步:文章來源:http://www.zghlxwxcb.cn/news/detail-667179.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-667179.html
到了這里,關(guān)于FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!