国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

這篇具有很好參考價(jià)值的文章主要介紹了FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

本文介紹了? 來源單表->目標(biāo)源單表同步,多來源單表->目標(biāo)源單表同步。

注:1.16版本、1.17版本都可以使用火焰圖,生產(chǎn)上最好關(guān)閉,詳情見文章末尾

Flink版本:1.16.2

環(huán)境:Linux CentOS 7.0、jdk1.8

基礎(chǔ)文件:

flink-1.16.2-bin-scala_2.12.tgz、

flink-connector-jdbc-3.0.0-1.16.jar、(maven倉(cāng)庫目錄:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven倉(cāng)庫目錄:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安裝Flink步驟詳見文章第二篇

?

Flink版本:1.17.1

環(huán)境:Linux CentOS 7.0、jdk1.8

基礎(chǔ)文件:

flink-1.17.1-bin-scala_2.12.tgz、

flink-connector-jdbc-3.0.0-1.16.jar、(maven倉(cāng)庫目錄:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven倉(cāng)庫目錄:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)

支持的mysql版本:?

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

本次測(cè)試使用版本flink1.16.2?

一、?來源單表->目標(biāo)源單表同步:數(shù)據(jù)源ip為***.50的源表,同步數(shù)據(jù)到數(shù)據(jù)源ip為***.134的目標(biāo)表中,需要以下幾個(gè)步驟:

1. 啟動(dòng)flink服務(wù):

[root@localhost bin]#? ./start-cluster.sh

2. 停止flink服務(wù):

[root@localhost bin]#? ./stop-cluster.sh

3. 啟動(dòng)FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 編寫FlinkSql,創(chuàng)建臨時(shí)表和job:

FlinkSql與mysql字段的類型映射

?把寫好的Sql粘貼到FlinkSql客戶端命令行中,分號(hào)'? ;? '是語句結(jié)束標(biāo)識(shí)符,按回車創(chuàng)建:

?創(chuàng)建來源表結(jié)構(gòu):

來源表鏈接類型為'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
> ? id STRING NOT NULL,
> ? AlarmTypeID STRING,
> ? `Time` timestamp,
> ? PRIMARY KEY (`id`) NOT ENFORCED
> ?) WITH (
> ? ? 'connector' = 'mysql-cdc',
> ? ? 'hostname' = '***',
> ? ? 'port' = '3306',
> ? ? 'username' = '***',
> ? ? 'password' = '***',
> ? ? 'database-name' = 'alarm',
> ? ? 'server-time-zone' = 'Asia/Shanghai',
> ? ? 'table-name' = 'alarminfo'
> ?);

[INFO] Execute statement succeed.

?創(chuàng)建目標(biāo)表結(jié)構(gòu)(目標(biāo)表結(jié)構(gòu)可比來源表字段多,可使用視圖指定字段默認(rèn)值):

目標(biāo)表鏈接類型為'connector' = 'jdbc',注意url需要跟后面以下屬性值

Flink SQL> CREATE TABLE target_alarminfo134 (
> ? id STRING NOT NULL,
> ? AlarmTypeID STRING,
> ? `Time` timestamp,
> ? sourceLine int,
> ? PRIMARY KEY (`id`) NOT ENFORCED
> ?) WITH (
> ? ? 'connector' = 'jdbc',
> ? ? 'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
> ? ? 'username' = '***',
> ? ? 'password' = '****',
> ? ? 'table-name' = 'alarminfo',
> ? ? 'driver' = 'com.mysql.cj.jdbc.Driver',
> ? ? 'scan.fetch-size' = '1000'
> ?);

[INFO] Execute statement succeed.

?'scan.fetch-size' = '1000'? 的含義:

?在 Flink SQL 中,scan.fetch-size 屬性用于配置批處理查詢中的每次批量獲取記錄的大小。具體地說,它指定了每次從數(shù)據(jù)源讀取的記錄數(shù)。

如何設(shè)置:

以下是一些建議:
考慮數(shù)據(jù)源的吞吐量:如果你的數(shù)據(jù)源的吞吐量較高,網(wǎng)絡(luò)延遲較低,可以適當(dāng)增大 scan.fetch-size 的值,以減少網(wǎng)絡(luò)往返次數(shù)和請(qǐng)求開銷。
考慮網(wǎng)絡(luò)環(huán)境和帶寬限制:如果數(shù)據(jù)源位于遠(yuǎn)程服務(wù)器或網(wǎng)絡(luò)環(huán)境較差,可以選擇適當(dāng)較小的 fetch size 值,以減少網(wǎng)絡(luò)傳輸?shù)呢?fù)載,避免出現(xiàn)大量的網(wǎng)絡(luò)超時(shí)和傳輸失敗情況。
考慮內(nèi)存開銷:fetch size 值過大可能會(huì)占用較多的內(nèi)存資源,特別是對(duì)于批處理查詢。如果你的查詢涉及大量的中間狀態(tài)(intermediate state)或內(nèi)存密集型操作,可以選擇適當(dāng)較小的 fetch size 值。
一般來說,可以先嘗試將 scan.fetch-size 設(shè)置為一個(gè)較默認(rèn)的值,例如 1000 或 5000。然后觀察任務(wù)的性能和執(zhí)行效果,根據(jù)實(shí)際情況進(jìn)行微調(diào)??梢愿鶕?jù)實(shí)際性能測(cè)試和系統(tǒng)資源情況,逐步調(diào)整 fetch size 值,以找到性能和資源利用的平衡點(diǎn)。
需要注意的是,scan.fetch-size 屬性值是一個(gè)相對(duì)的配置,不同的數(shù)據(jù)源和查詢場(chǎng)景可能有不同的最佳值。因此,針對(duì)具體的數(shù)據(jù)源和查詢條件,最好進(jìn)行一些實(shí)際的性能測(cè)試和調(diào)優(yōu),以獲得最優(yōu)的性能和資源使用。

?最后創(chuàng)建同步關(guān)系:

INSERT INTO target_alarminfo134 SELECT *,50 AS sourceLine FROM source_alarminfo50

?若目標(biāo)表比源表結(jié)構(gòu)少字段屬性則執(zhí)行完同步關(guān)系后如下:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

創(chuàng)建完表結(jié)構(gòu)可使用下列語句查看和刪除:

查看表:show tables;

刪除表:drop table if exists ?target_alarminfo;?

flink-UI頁面效果:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink?FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

數(shù)據(jù)同步效果:

源表:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

目標(biāo)表數(shù)據(jù):首次數(shù)據(jù)全量,后面數(shù)據(jù)變更增量?

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

二、?多來源單表->目標(biāo)源單表同步:數(shù)據(jù)源ip為***.50、***.51的兩個(gè)源表,同步數(shù)據(jù)到數(shù)據(jù)源ip為***.134的目標(biāo)表中,使用sourceLine 用于區(qū)分?jǐn)?shù)據(jù)來源,需要以下幾個(gè)步驟:

?1. 創(chuàng)建自定義初始化腳本文件?init.sql、flinkSqlInit.sql,flinkSqlInit.sql文件中包含了在FlinkSql中需要執(zhí)行的語句,用于自動(dòng)化創(chuàng)建臨時(shí)表和視圖,這兩個(gè)放在flink的bin目錄下:

init.sql內(nèi)容如下:

SET execution.runtime-mode=streaming;
SET pipeline.name=my_flink_job;
SET parallism.default=4;

?SET execution.runtime-mode=streaming 設(shè)置了作業(yè)的運(yùn)行模式為流處理模式。這表示作業(yè)將以流處理的方式運(yùn)行,即實(shí)時(shí)處理每個(gè)輸入事件,并根據(jù)輸入數(shù)據(jù)的到達(dá)順序進(jìn)行處理。

SET pipeline.name=my_flink_job 設(shè)置了作業(yè)的流水線名稱為 "my_flink_job"。流水線名稱主要用于標(biāo)識(shí)作業(yè),以便在運(yùn)行時(shí)進(jìn)行管理和監(jiān)控。

SET parallelism.default=4 設(shè)置了作業(yè)的默認(rèn)并行度為 4。并行度表示同時(shí)執(zhí)行作業(yè)任務(wù)的任務(wù)數(shù)量。通過設(shè)置并行度,可以控制作業(yè)在集群上使用的資源量和執(zhí)行的并行度。默認(rèn)并行度將應(yīng)用于作業(yè)的所有算子,除非為某個(gè)算子單獨(dú)指定了并行度。

這些設(shè)置屬性可以在 Flink 的初始化腳本中使用,并在作業(yè)啟動(dòng)時(shí)生效。可以根據(jù)作業(yè)的需求和資源情況調(diào)整這些屬性,以獲得最佳的性能和資源利用率。

注:mysql-cdc和jdbc的區(qū)別:mysql-cdc 標(biāo)注 數(shù)據(jù)來源的表,jdbc標(biāo)注 同步到的目標(biāo)表

flinkSqlInit.sql內(nèi)容如下:

SET execution.checkpointing.interval = 60s;
drop table if exists ?source_alarminfo50;
CREATE TABLE source_alarminfo50 (
? id STRING NOT NULL,
? AlarmTypeID STRING,
? `Time` timestamp,
? PRIMARY KEY (`id`) NOT ENFORCED
?) WITH (
? ? 'connector' = 'mysql-cdc',
? ? 'hostname' = '**',
? ? 'port' = '3306',
? ? 'username' = '**',
? ? 'password' = '**',
? ? 'database-name' = 'alarm',
? ? 'server-time-zone' = 'Asia/Shanghai',
? ? 'table-name' = 'alarminfo'
?);
drop table if exists ?source_alarminfo51;
CREATE TABLE source_alarminfo51 (
? id STRING NOT NULL,
? AlarmTypeID STRING,
? `Time` timestamp,
? PRIMARY KEY (`id`) NOT ENFORCED
?) WITH (
? ? 'connector' = 'mysql-cdc',
? ? 'hostname' = '**',
? ? 'port' = '3306',
? ? 'username' = '**',
? ? 'password' = '**',
? ? 'database-name' = 'alarm',
? ? 'server-time-zone' = 'Asia/Shanghai',
? ? 'table-name' = 'alarminfo'
?);
drop table if exists ?target_alarminfo134;
CREATE TABLE target_alarminfo134 (
? id STRING NOT NULL,
? AlarmTypeID STRING,
? `Time` timestamp,
? sourceLine int,
? PRIMARY KEY (`id`) NOT ENFORCED
?) WITH (
? ? 'connector' = 'jdbc',
? ? 'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
? ? 'username' = '**',
? ? 'password' = '**',
? ? 'table-name' = 'alarminfo',
? ? 'driver' = 'com.mysql.cj.jdbc.Driver',
? ? 'scan.fetch-size' = '200'
?);

BEGIN STATEMENT SET;

INSERT INTO target_alarminfo134
SELECT *,50 AS sourceLine FROM source_alarminfo50
UNION ALL
SELECT *,51 AS sourceLine FROM source_alarminfo51;

END;

其中涉及flinksql的語法:

BEGIN STATEMENT SET 是 Flink SQL 中的一個(gè)特殊語法,用于將一組 SQL 語句作為一個(gè)事務(wù)進(jìn)行處理。它用于將多個(gè) SQL 語句作為一個(gè)原子操作執(zhí)行,要么全部成功提交,要么全部回滾。
在 Flink SQL 中,可以使用 BEGIN STATEMENT SET 將多個(gè) SQL 語句組合成一個(gè)事務(wù),以確保這些語句的原子性。
以下是 BEGIN STATEMENT SET 的使用示例:
BEGIN STATEMENT SET;
-- SQL 語句 1
-- SQL 語句 2
-- ...
COMMIT;
在上述示例中,BEGIN STATEMENT SET 表示事務(wù)的開始,COMMIT 表示事務(wù)的提交。你可以在 BEGIN STATEMENT SET 和 COMMIT 之間編寫需要執(zhí)行的多個(gè) SQL 語句。
如果在 BEGIN STATEMENT SET 和 COMMIT 之間的任何一條語句執(zhí)行失敗,整個(gè)事務(wù)將回滾,即已經(jīng)執(zhí)行的語句會(huì)被撤銷。
需要注意的是,BEGIN STATEMENT SET 和 COMMIT 語句是 Flink SQL 的擴(kuò)展語法,它們可能在某些特定的 Flink 版本或環(huán)境中才可用。在使用時(shí),請(qǐng)確保你的 Flink 版本和環(huán)境支持該語法。?

檢查點(diǎn)間隔設(shè)置:
SET execution.checkpointing.interval = 60s;
通過設(shè)置適當(dāng)?shù)臋z查點(diǎn)間隔,可以在容忍一定故障的同時(shí),控制檢查點(diǎn)的頻率和資源使用。較短的檢查點(diǎn)間隔可以提供更高的容錯(cuò)性,但也會(huì)增加系統(tǒng)開銷。
檢查點(diǎn)是 Flink 中用于實(shí)現(xiàn)容錯(cuò)性的機(jī)制,它會(huì)定期將作業(yè)的狀態(tài)保存到持久化存儲(chǔ)中,以便在發(fā)生故障時(shí)進(jìn)行恢復(fù)。檢查點(diǎn)間隔定義了兩個(gè)連續(xù)檢查點(diǎn)之間的時(shí)間間隔。?

?2. 重啟Flink服務(wù):

停止flink服務(wù):

[root@localhost bin]#? ./stop-cluster.sh

啟動(dòng)flink服務(wù):

[root@localhost bin]#? ./start-cluster.sh

啟動(dòng)FinkSQL:

[root@localhost bin]# ./sql-client.sh

?3.1 在flink的bin目錄下執(zhí)行初始化文件flinkSqlInit.sql:

有兩種方式:

方式一:可設(shè)置job名稱及資源參數(shù)配置

[root@localhost bin]#? ./sql-client.sh -i init.sql -f flinkSqlInit.sql??

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

?使用這個(gè)語句的好處是可以根據(jù)作業(yè)的需求和資源情況調(diào)整這些屬性,以獲得最佳的性能和資源利用率。

flink-UI頁面效果:

?FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

方式二:不可設(shè)置job名稱及資源參數(shù)配置

[root@localhost bin]#? ./sql-client.sh -f ?flinkSqlInit.sql??

?FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

?4. 數(shù)據(jù)同步效果:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

三、打開火焰圖:

編輯flink-conf.yaml:最后面添加?

rest.flamegraph.enabled: true

配置后重啟flink服務(wù),重新創(chuàng)建任務(wù)。

火焰圖效果:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink

注:

在分析火焰圖時(shí),可以關(guān)注以下幾點(diǎn):
函數(shù)的執(zhí)行時(shí)間:縱向的軸顯示了函數(shù)的嵌套層級(jí),越往下表示越深層的函數(shù)調(diào)用。橫向軸表示時(shí)間,通過不同顏色的方塊來表示函數(shù)的執(zhí)行時(shí)間。
熱點(diǎn)函數(shù):尋找占據(jù)執(zhí)行時(shí)間大部分的函數(shù),這些函數(shù)可能是需要優(yōu)化的關(guān)鍵點(diǎn)。
函數(shù)之間的關(guān)系:觀察函數(shù)之間的調(diào)用關(guān)系,查看是否有不必要的函數(shù)調(diào)用或循環(huán)。
I/O 操作:關(guān)注是否有大量的數(shù)據(jù)讀取、寫入或網(wǎng)絡(luò)通信,這可能是性能瓶頸的來源。
根據(jù)火焰圖的分析結(jié)果,您可以進(jìn)一步定位和排查潛在的性能問題,并在代碼、配置或資源分配方面進(jìn)行優(yōu)化。
請(qǐng)注意,為了準(zhǔn)確分析火焰圖,建議在負(fù)載較高的情況下生成火焰圖,并保持足夠的監(jiān)視時(shí)間。此外,F(xiàn)link 的火焰圖功能在生產(chǎn)環(huán)境中可能會(huì)造成一定的開銷,因此建議在測(cè)試或開發(fā)環(huán)境中使用。

四、源表、目標(biāo)表結(jié)構(gòu)with下的屬性介紹:

源表with下的屬性:

chunk-key.even-distribution.factor.lower-bound:塊鍵(Chunk Key)的均勻分布因子下限。

chunk-key.even-distribution.factor.upper-bound:塊鍵的均勻分布因子上限。

chunk-meta.group.size:塊元數(shù)據(jù)的分組大小。

connect.max-retries:連接重試的最大次數(shù)。

connect.timeout:連接的超時(shí)時(shí)間。

connection.pool.size:連接池的大小。

connector:使用的連接器的名稱。

database-name:數(shù)據(jù)庫的名稱。

heartbeat.interval:心跳間隔時(shí)間。

hostname:主機(jī)名或 IP 地址。

password:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的密碼。

port:連接的端口號(hào)。

property-version:屬性版本。

scan.incremental.snapshot.chunk.key-column:增量快照的塊鍵列。

scan.incremental.snapshot.chunk.size:增量快照的塊大小。

scan.incremental.snapshot.enabled:是否啟用增量快照。

scan.newly-added-table.enabled:是否啟用新加入表的掃描。

scan.snapshot.fetch.size:從狀態(tài)快照中獲取的每次批量記錄數(shù)。

scan.startup.mode:掃描啟動(dòng)模式。

scan.startup.specific-offset.file:指定啟動(dòng)位置的文件名。

scan.startup.specific-offset.gtid-set:指定啟動(dòng)位置的 GTID 集合。

scan.startup.specific-offset.pos:指定啟動(dòng)位置的二進(jìn)制日志位置。

scan.startup.specific-offset.skip-events:跳過的事件數(shù)量。

scan.startup.specific-offset.skip-rows:跳過的行數(shù)。

scan.startup.timestamp-millis:指定啟動(dòng)時(shí)間戳(毫秒)。

server-id:服務(wù)器 ID。

server-time-zone:服務(wù)器時(shí)區(qū)。

split-key.even-distribution.factor.lower-bound:切分鍵(Split Key)的均勻分布因子下限。

split-key.even-distribution.factor.upper-bound:切分鍵的均勻分布因子上限。

table-name:表名。

username:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的用戶名。

Sink目標(biāo)表with下的屬性:

connection.max-retry-timeout:連接重試的最大超時(shí)時(shí)間。

connector:使用的連接器的名稱。

driver:JDBC 連接器中使用的數(shù)據(jù)庫驅(qū)動(dòng)程序的類名。

lookup.cache:查找表的緩存配置。

lookup.cache.caching-missing-key:是否緩存查找表中的缺失鍵。

lookup.cache.max-rows:查找表緩存中允許的最大行數(shù)。

lookup.cache.ttl:查找表緩存中行的生存時(shí)間。

lookup.max-retries:查找操作的最大重試次數(shù)。

lookup.partial-cache.cache-missing-key:是否緩存查找表部分缺失的鍵。

lookup.partial-cache.expire-after-access:查找表部分緩存中行的訪問到期時(shí)間。

lookup.partial-cache.expire-after-write:查找表部分緩存中行的寫入到期時(shí)間。

lookup.partial-cache.max-rows:查找表部分緩存中允許的最大行數(shù)。

password:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的密碼。

property-version:屬性版本。

scan.auto-commit:是否自動(dòng)提交掃描操作。

scan.fetch-size:每次批量獲取記錄的大小。

scan.partition.column:用于分區(qū)的列名。

scan.partition.lower-bound:分區(qū)的下限值。

scan.partition.num:要掃描的分區(qū)數(shù)量。

scan.partition.upper-bound:分區(qū)的上限值。

sink.buffer-flush.interval:將緩沖區(qū)的數(shù)據(jù)刷新到目標(biāo)系統(tǒng)的時(shí)間間隔。

sink.buffer-flush.max-rows:緩沖區(qū)中的最大行數(shù),達(dá)到此值時(shí)將刷新數(shù)據(jù)。

sink.max-retries:寫入操作的最大重試次數(shù)。

sink.parallelism:寫入任務(wù)的并行度。

table-name:表名。

url:連接到數(shù)據(jù)庫或其他系統(tǒng)的 URL。

username:連接到數(shù)據(jù)庫或其他系統(tǒng)所需的用戶名。

?

最后FlinkCDC目前不支持整庫同步:

FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2),運(yùn)行環(huán)境,mysql,數(shù)據(jù)庫,flink文章來源地址http://www.zghlxwxcb.cn/news/detail-667179.html

到了這里,關(guān)于FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Flink】FlinkCDC獲取mysql數(shù)據(jù)時(shí)間類型差8小時(shí)時(shí)區(qū)解決方案

    【Flink】FlinkCDC獲取mysql數(shù)據(jù)時(shí)間類型差8小時(shí)時(shí)區(qū)解決方案

    1、背景: 在我們使用FlinkCDC采集mysql數(shù)據(jù)的時(shí)候,日期類型是我們很常見的類型,但是FlinkCDC讀取出來會(huì)和數(shù)據(jù)庫的日期時(shí)間不一致,情況如下 FlinkCDC獲取的數(shù)據(jù)中create_time字段1694597238000轉(zhuǎn)換為時(shí)間戳2023-09-13 17:27:18 ?而數(shù)據(jù)庫中原始數(shù)據(jù)如下,并沒有到下午5點(diǎn),這就導(dǎo)致了

    2024年02月07日
    瀏覽(25)
  • MySQL FlinkCDC 通過Kafka實(shí)時(shí)同步到ClickHouse(自定義Debezium格式支持增加刪除修改)

    MySQL FlinkCDC 通過Kafka實(shí)時(shí)同步到ClickHouse(自定義Debezium格式支持增加刪除修改) 把MySQL多庫多表的數(shù)據(jù)通過FlinkCDC DataStream的方式實(shí)時(shí)同步到同一個(gè)Kafka的Topic中,然后下游再寫Flink SQL拆分把數(shù)據(jù)寫入到ClickHouse,F(xiàn)linkCDC DataStream通過自定義Debezium格式的序列化器,除了增加,還能進(jìn)行

    2024年02月15日
    瀏覽(23)
  • Flink CDC MySQL同步MySQL錯(cuò)誤記錄

    Flink CDC MySQL同步MySQL錯(cuò)誤記錄

    0、相關(guān)Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者從mvnrepository.com下載 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    瀏覽(21)
  • flinkcdc同步完全量數(shù)據(jù)就不同步增量數(shù)據(jù)了

    使用flinkcdc同步mysql數(shù)據(jù),使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量階段同步完成之后,發(fā)現(xiàn)并不開始同步增量數(shù)據(jù),原因有以下兩個(gè): 1.mysql中對(duì)應(yīng)的數(shù)據(jù)庫沒有開啟binlog 在/etc/my.cnf配置文件中,在[ mysqld ]添加以下內(nèi)容 然后重啟數(shù)據(jù)庫 ,執(zhí)行命令 和chec

    2024年02月11日
    瀏覽(19)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 連接mysql(1)

    最新版Flink CDC MySQL同步MySQL(一)_flink 連接mysql(1)

    下載 連接器 SQL jar (或 自行構(gòu)建 )。 將下載的jar包放在FLINK_HOME/lib/. 重啟Flink集群。 注意 :目前2.4以上版本需要進(jìn)行自行編譯構(gòu)建。本文筆者自行進(jìn)行構(gòu)建上傳的 6.使用 Flink CDC 對(duì) MySQL 進(jìn)行流式 ETL 本教程將展示如何使用 Flink CDC 快速構(gòu)建 MySQL的流式 ETL。 假設(shè)我們將產(chǎn)品數(shù)

    2024年04月26日
    瀏覽(25)
  • FlinkCDC 入門之?dāng)?shù)據(jù)同步和故障恢復(fù)

    FlinkCDC 入門之?dāng)?shù)據(jù)同步和故障恢復(fù)

    FlinkCDC 是一款基于 Change Data Capture(CDC)技術(shù)的數(shù)據(jù)同步工具,可以用于將關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)實(shí)時(shí)同步到 Flink 流處理中進(jìn)行實(shí)時(shí)計(jì)算和分析,下圖來自官網(wǎng)的介紹。 下圖 1 是 FlinkCDC 與其它常見 開源 CDC 方案的對(duì)比: 可以看見的是相比于其它開源產(chǎn)品,F(xiàn)linkCDC 不僅支持增

    2024年02月07日
    瀏覽(17)
  • 最新版Flink CDC MySQL同步MySQL(一)

    最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ?的一組源連接器,使用變更數(shù)據(jù)捕獲 (CDC) 從不同數(shù)據(jù)庫中獲取變更。Apache Flink 的 CDC Connectors集成 Debezium 作為捕獲數(shù)據(jù)更改的引擎。所以它可以充分發(fā)揮 Debezium 的能力。 連接器 數(shù)據(jù)庫 驅(qū)動(dòng) mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    瀏覽(17)
  • 基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

    基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠(yuǎn)程服務(wù)器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過程略) 準(zhǔn)備三個(gè)數(shù)據(jù)庫:flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實(shí)時(shí)同步到flink_sink和flink_sink_second的sink_test表。 (建庫建表過程略) 開發(fā)過程

    2024年02月06日
    瀏覽(27)
  • Flink實(shí)時(shí)同步MySQL與Doris數(shù)據(jù)

    Flink實(shí)時(shí)同步MySQL與Doris數(shù)據(jù)

    技術(shù)解析|Doris Connector 結(jié)合 Flink CDC 實(shí)現(xiàn) MySQL 分庫分表 Exactly Once 精準(zhǔn)接入-阿里云開發(fā)者社區(qū) 1. Flink環(huán)境: https://flink.apache.org/zh/ 下載flink-1.15.1 解壓,修改配置 修改配置 修改rest.bind-address為 0.0.0.0 下載依賴jar包 至 flink安裝目錄lib下 啟動(dòng)flink 訪問WebUI http://192.168.0.158:8081 2、

    2024年02月13日
    瀏覽(26)
  • Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表

    Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表

    環(huán)境說明: flink?1.15.2 mysql 版本5.7? ? 注意:需要開啟binlog,因?yàn)樵隽客绞腔赽inlog捕獲數(shù)據(jù) windows11 IDEA 本地運(yùn)行 先上官網(wǎng)使用說明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql開啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測(cè)試是捕獲不到binlog日志的,增量相

    2024年02月10日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包