国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

這篇具有很好參考價值的文章主要介紹了flink-cdc同步mysql數(shù)據(jù)到elasticsearch。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1,什么是cdc

CDC是(Change Data Capture 變更數(shù)據(jù)獲取)的簡稱。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。

2,flink的cdc

cdc項目地址:https://github.com/ververica/flink-cdc-connectors

cdc項目文檔:https://ververica.github.io/flink-cdc-connectors/master/

flink-sql項目文檔:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sqlclient/

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

3,環(huán)境準(zhǔn)備

  • mysql
  • elasticsearch
  • flink on yarn

說明:如果沒有安裝hadoop,那么可以不用yarn,直接用flink standalone環(huán)境吧。

本例使用版本如下:

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?下面兩個地址下載flink的依賴包,放在lib目錄下面。 

  下載地址:

  1、https://repo.maven.apache.org/maven2/com/alibaba/ververica/

  flink-sql-connector-mysql-cdc-1.4.0.jar

  此倉庫提供的最新版本為1.4.0,如需新版本可自行編譯或者去https://mvnrepository.com/下載。

  2、https://repo.maven.apache.org/maven2/org/apache/flink/

  flink-sql-connector-elasticsearch7_2.11-1.13.5.jar

  小坑:此處使用的是es7,由于本地環(huán)境是es8導(dǎo)致無法創(chuàng)建索引,又重新安裝es7測試成功。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

4,啟動flink

啟動flink集群

./start-cluster.sh

啟動成功的話,可以在 http://localhost:8081/ 訪問到 Flink Web UI,如下所示:

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?啟動flink sql-client

./sql-client.sh
不加任何參數(shù)進入交互式界面。
./sql-client.sh -f /tmp/aa.sql
-f:就是接sql文件。即不用進行交互式查詢,這里注意:aa.sql文件里的insert語句會被分開成一個個job。

如果想要在一個job里提交就要注意寫法,即:
在1.15.0以前語法:
BEGIN STATEMENT SET;
-- one or more INSERT INTO statements
{ INSERT INTO|OVERWRITE <select_statement>; }+
END;

自定義job名稱: set pipeline.name = totalTask;

啟動成功后,可以看到如下的頁面:

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

5,數(shù)據(jù)同步初始化

1)mysql數(shù)據(jù)庫原始表

CREATE TABLE `product_view` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`server_id` int(11) NOT NULL,
`duration` int(11) NOT NULL,
`times` varchar(11) NOT NULL,
`time` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `time` (`time`),
KEY `user_product` (`user_id`,`product_id`) USING BTREE,
KEY `times` (`times`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 樣本數(shù)據(jù)
INSERT INTO `product_view` VALUES ('1', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('2', '1', '1', '1', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('3', '1', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('4', '1', '1', '2', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('5', '8', '1', '1', '120', '120', '2020-05-14 13:14:00');
INSERT INTO `product_view` VALUES ('6', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');
INSERT INTO `product_view` VALUES ('7', '8', '1', '3', '120', '120', '2020-04-24 13:14:00');
INSERT INTO `product_view` VALUES ('8', '8', '1', '3', '120', '120', '2020-04-23 13:14:00');
INSERT INTO `product_view` VALUES ('9', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');

2)flink?創(chuàng)建source數(shù)據(jù)庫關(guān)聯(lián)表

CREATE TABLE product_view_source (
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.34.100.209',
'port' = '3306',
'username' = 'root',
'password' = '123',
'database-name' = 'flinkcdc_test',
'table-name' = 'product_view',
'server-id' = '5401'
);

這樣,我們在flink-sql client操作這個表相當(dāng)于操作mysql里面的對應(yīng)表。

3)flink?創(chuàng)建sink,數(shù)據(jù)庫關(guān)聯(lián)表elasticsearch

CREATE TABLE product_view_sink(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://10.34.100.156:9200',
'index' = 'product_view_index'
);

這樣,es里的product_view_index這個索引在數(shù)據(jù)同步時會被自動創(chuàng)建,如果想指定一些屬性,可以提前手動創(chuàng)建好索引。往product_view_sink里面插入數(shù)據(jù),可以發(fā)現(xiàn)es中已經(jīng)有數(shù)據(jù)了。

查看flink創(chuàng)建的表

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?查看flink表數(shù)據(jù)

select * from product_view_source;

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

select * from product_view_sink;

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?由此可見,sink不能直接使用sql查詢。

4)建立同步任務(wù)

insert into product_view_sink select * from product_view_source;

這個時候是可以退出flink sql-client的,然后進入flink web-ui,可以看到mysql表數(shù)據(jù)已經(jīng)同步到elasticsearch中了,對mysql進行插入刪除更新,elasticsearch都是同步更新的。

查看任務(wù)

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?查看es數(shù)據(jù)

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

6,數(shù)據(jù)實時同步

1)新增記錄

mysql數(shù)據(jù)庫插入一條記錄

INSERT INTO `product_view` VALUES ('10', '8', '1', '2', '120', '120', '2020-05-13 13:14:00');

查詢es,新增一條記錄

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

2)刪除記錄

mysql數(shù)據(jù)庫刪除一條記錄

DELETE FROM `product_view` where id=10;

查詢es,減少一條記錄

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?3)更新記錄

es原始記錄

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?mysql更新一條記錄

UPDATE `product_view` SET user_id=100,product_id=101 WHERE id=2;

變更后es記錄

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

?4)修改表結(jié)構(gòu)

1、flink-sql不支持alter語句,因此flink-sql創(chuàng)建的source,sink表也不支持結(jié)構(gòu)的修改。

2、當(dāng)mysql源表增加列時,flink創(chuàng)建的source,sink表結(jié)構(gòu)都不會發(fā)生改變,job會忽略新增的列仍繼續(xù)執(zhí)行同步任務(wù),不會報錯。

3、當(dāng)mysql源表刪除job中正在同步的列時,job會報如下錯誤:time是手動刪除的字段。并且手動增加time列后job也不能恢復(fù)正常。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

7,故障恢復(fù)

1)背景

當(dāng)提交的job出錯時會導(dǎo)致同步任務(wù)中斷,重啟job會導(dǎo)致同步任務(wù)從頭開始,如果源表數(shù)據(jù)量巨大會耗費大量時間和資源,如何能以最小的代價快速恢復(fù)呢?這就要用到flink的重啟策略和checkpoint。

2)相關(guān)說明

官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/execution/task_failure_recovery/

Flink 作業(yè)如果沒有定義重啟策略,則會遵循集群啟動時加載的默認重啟策略。 如果提交作業(yè)時設(shè)置了重啟策略,該策略將覆蓋掉集群的默認策略。

通過 Flink 的配置文件?flink-conf.yaml?來設(shè)置默認的重啟策略。配置參數(shù)?restart-strategy?定義了采取何種策略。 如果沒有啟用 checkpoint,就采用“不重啟”策略。如果啟用了 checkpoint 且沒有配置重啟策略,那么就采用固定延時重啟策略fixed-delay,默認值:重啟次數(shù)Integer.MAX_VALUE?,重啟間隔1s。

3)重啟場景模擬

不開啟checkpoint就不會開啟重啟策略。

提交job后關(guān)掉mysql,job報如下錯誤:

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?開啟checkpoint就會自動開啟默認的重啟策略。

設(shè)置命令:


//每 3 秒做一次 checkpoint,用于測試,生產(chǎn)配置建議5到10分鐘
set execution.checkpointing.interval = 3s;

提交job后關(guān)掉mysql,job一只在重啟,重啟mysql后job自動恢復(fù)running狀態(tài)。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?4)checkpoint模擬

當(dāng)任務(wù)失敗并且重啟也失敗(job徹底掛掉)或任務(wù)被取消掉,此時可以從checkpoint中快速恢復(fù)最近的狀態(tài)。

默認checkpoint的存儲有3種方式

1、內(nèi)存中,當(dāng)job失敗無法從checkpoint中恢復(fù),感覺沒啥鳥用。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

2、持久化保存checkpoint,最好使用hdfs,我這里為了方便采用的是本地目錄存儲。

3、rockdb數(shù)據(jù)庫,沒有研究,有興趣的朋友自行研究。

設(shè)置命令:

// 設(shè)置本地存儲目錄
set state.checkpoints.dir = file:///tmp/checkpoint-dir;
//當(dāng)作業(yè)手動取消時,將會保留作業(yè)的 Checkpoint 狀態(tài)信息。注意,這種情況下,需要手動清除該作業(yè)保留的 Checkpoint 狀態(tài)信息,否則這些狀態(tài)信息將永遠保留在外部的持久化存儲中。
//為方便測試,本例采用此種方式
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
//僅當(dāng)作業(yè)失敗時,作業(yè)的 Checkpoint 才會被保留用于任務(wù)恢復(fù)。當(dāng)作業(yè)取消時,Checkpoint 狀態(tài)信息會被刪除,因此取消任務(wù)后,不能從 Checkpoint 位置進行恢復(fù)任務(wù)。
set execution.checkpointing.externalized-checkpoint-retention=DELETE_ON_CANCELLATION;
// 重置指定參數(shù)
reset execution.checkpointing.externalized-checkpoint-retention;
// 設(shè)置并行度
set parallelism.default = 1;
// 重置全部參數(shù)
reset;

提交job,手動取消job后,查看最近的checkpoint。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?從此最近的checkpoint文件中可恢復(fù)job

設(shè)置命令:

//設(shè)置要恢復(fù)的checkpoint
set 'execution.savepoint.path'='file:/tmp/checkpoint-dir/0bdf06a6e699f87b6703a39df6ea8b5d/chk-3';
//執(zhí)行原來的任務(wù)sql,是所有的sql,包括建表語句。最好提交剛開始提交任務(wù)時就用文件的方式,這樣方便恢復(fù)。
insert into product_view_sink select a2.* from product_view_source a1 inner join product_view_source2 a2 on a1.id=a2.id;

?從checkpoint中恢復(fù)時,新提交的任務(wù)checkpoint狀態(tài)變化如下:

表示從要恢復(fù)的job的id=118號開始恢復(fù),新job的id=舊job的結(jié)束id+新job已創(chuàng)建的id,123=118+5

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?實操模擬:

  1、源mysql中的數(shù)據(jù)如下:

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?  2、開啟checkpoint,設(shè)置存儲目錄,保留取消任務(wù)時checkpoint文件,提交job。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?  3、查詢es中同步的數(shù)據(jù),和mysql中記錄完全一樣

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?  4、取消job,刪除es中的索引

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?  4、修改、添加mysql中數(shù)據(jù)

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?  5、flink-sql設(shè)置恢復(fù)checkpoint,并重新提交任務(wù)

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?  6、觀察es中的數(shù)據(jù)發(fā)現(xiàn)取消job前的數(shù)據(jù)沒有了,只有取消后新增的2條數(shù)據(jù)和修改的那條數(shù)據(jù)。這說明不是從頭開始同步的,是從checkpoint中恢復(fù)出來的。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

8,任務(wù)文件模板

1)任務(wù)模板

提交任務(wù),最好以文件的方式提交,方便后續(xù)job的恢復(fù)。

-- 設(shè)置job名稱
set pipeline.name = totalTask;
-- 設(shè)置checkpoint時間間隔
set execution.checkpointing.interval = 300s;
-- 設(shè)置checkpoint持久化目錄
set state.checkpoints.dir = file:///tmp/checkpoint-dir;
-- 設(shè)置當(dāng)取消job時仍保留checkpoint文件
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;


建表語句source表
建表語句sink表

BEGIN STATEMENT SET;
insert同步語句
END;?

9,斷點續(xù)傳

job失敗后恢復(fù)步驟如下:

1、找到最新的checkpoint;
2、set 'execution.savepoint.path'='xxxxx';
3、執(zhí)行本原來任務(wù)sql文件;

10,遇到的問題

1)slots不足

flink默認taskmanager.numberOfTaskSlots=1即只能運行一個子任務(wù),一般設(shè)置為機器的CPU核心數(shù)。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

2)內(nèi)存不足

任務(wù)失敗,可能由于內(nèi)存不足導(dǎo)致,一般可調(diào)節(jié)flink-config.yaml以下配置,如內(nèi)存需求很大可以按照b來配置。

a.最簡單的總內(nèi)存配置,會按照默認比例給具體的內(nèi)存項分配。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

b.給具體的內(nèi)存項分配

jobmanager.memory.flink.size: 2048m

taskmanager.memory.task.heap.size : 2048m
taskmanager.memory.managed.size : 2048m

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?

3)重復(fù)server-id

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?flink-cdc同步mysql數(shù)據(jù)到elasticsearch

1、前提:單個提交job任務(wù),即每個insert語句形成一個job,就是一個同步任務(wù)。

結(jié)論:通過實踐可知,所有具有相同server-id的source表,只能選擇其中一個且被一個job使用。實用性很差,只是測試時踩的坑記錄一下。

場景:假如source1和source2表具有相同的server-id,如果job1中使用了source1(不能同時使用source1和source2),那其他job就不能在用source1、source2了。

分析:先提交一個job1并且已經(jīng)在同步了,此時如果提交的job2中有source表與job1中source表有相同的server-id,或job2中使用和job1中重復(fù)的source表,那job2也從job1已經(jīng)讀到的binlog位置開始讀就會有問題,直接報如下錯誤。

2、前提:批量提交同步任務(wù),即將多個insert語句放在一起形成一個job,一個insert對應(yīng)一個同步任務(wù),一個job包含多個同步任務(wù)。

結(jié)論:通過實踐可知,不同的同步任務(wù)(即不同的insert語句)可以使用同一個source表,但不建議共享,可能造成數(shù)據(jù)丟失。但一個同步任務(wù)不能使用相同的server-id的source表。

場景:假如source1和source2表具有相同的server-id,如果任務(wù)1使用source1(不能同時使用source1和source2),其他任務(wù)還可以使用source1。

分析:同時提交任務(wù)1和任務(wù)2并且都使用到了source1,等于2個任務(wù)共同維護source1的binlog狀態(tài),此時可能導(dǎo)致某個任務(wù)從錯誤的binlog位置讀取數(shù)據(jù),從而導(dǎo)致數(shù)據(jù)丟失。

最佳實踐:一個同步任務(wù)中(一個insert語句)使用到的每個source表都對應(yīng)一個不同server-id。同一個source表如在多個job或任務(wù)中使用,就在每個job或每個任務(wù)中設(shè)置不同的表名及server-id。這樣對于相同的source表在每個job或任務(wù)中都各自維護一份binlog狀態(tài)了。

舉例說明:如order表需要在3個job或同步任務(wù)中使用,job1中name=order1,server-id=5401;job2中name=order2,server-id=5402;job3中name=order3,server-id=5403;這樣3個job就會各自維護各自的關(guān)于order表的binlog狀態(tài)。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

4)數(shù)據(jù)不一致

問題:同步完成后通過es查詢索引的數(shù)據(jù)量和mysql查詢的數(shù)據(jù)量不一致。

問題定位:通過在flink的sql-client.sh查詢發(fā)現(xiàn)和mysql查詢的數(shù)據(jù)不一致,定位到是flink的sql和mysql的sql語法存在區(qū)別。

當(dāng)where 條件中同時包含 OR 和 IS NULL 的 SQL時,IS NULL條件會失效 。 例如:?select * from product01_source where (test_char is NULL or test_char = '2'); 這樣的結(jié)果篩選出來不會包含test_char? is null的結(jié)果,當(dāng)把test_char? = '2'去掉只有test_char? is null時這樣的結(jié)果是正確的。也就是說OR和IS NULL不可以同時使用。
場景模擬
mysql查詢select * from product01 where (test_char = '2' or test_char is null);

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?flink的sql-client.sh查詢select * from product01_source where (test_char is NULL or test_char = '2');

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?通過以上模擬,可以發(fā)現(xiàn)mysql查詢的結(jié)果和flink查詢的結(jié)果不一致。

問題解決:最初是想通過union來合并兩個條件的結(jié)果來解決,但總感覺還有其他解決方法,最終發(fā)現(xiàn)是flink的版本bug導(dǎo)致,將原來的flink-1.13.6換成1.14.6,并將flink-sql-connector-mysql-cdc-1.4.0.jar換成flink-sql-connector-mysql-cdc-2.3.0.jar,問題得以解決。

問題鏈接:https://issues.apache.org/jira/browse/FLINK-22015

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?遇到的新問題:flink升級為1.14.6版本后雖然解決了數(shù)據(jù)一致性的問題,但是當(dāng)提交多個復(fù)雜(join的表多)的job的時候,自測的時候提交2-3個時就會卡死,一直提交不了。報連接池連接不可用超時。

flink-cdc同步mysql數(shù)據(jù)到elasticsearch

?問題解決:給source表加上? ?'connection.pool.size'?=?'200'? 參數(shù),增加連接數(shù)量即可。

總結(jié):至此可正常提交job并保證數(shù)據(jù)的一致性。文章來源地址http://www.zghlxwxcb.cn/news/detail-638267.html

到了這里,關(guān)于flink-cdc同步mysql數(shù)據(jù)到elasticsearch的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 大數(shù)據(jù)技術(shù)之 Flink-CDC

    大數(shù)據(jù)技術(shù)之 Flink-CDC

    CDC 是 Change Data Capture(變更數(shù)據(jù)獲?。┑暮喎Q。在廣義的概念上,只要是能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱之為 CDC 。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以

    2024年02月05日
    瀏覽(21)
  • 基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

    基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠程服務(wù)器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過程略) 準(zhǔn)備三個數(shù)據(jù)庫:flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實時同步到flink_sink和flink_sink_second的sink_test表。 (建庫建表過程略) 開發(fā)過程

    2024年02月06日
    瀏覽(27)
  • 實戰(zhàn):大數(shù)據(jù)Flink CDC同步Mysql數(shù)據(jù)到ElasticSearch

    實戰(zhàn):大數(shù)據(jù)Flink CDC同步Mysql數(shù)據(jù)到ElasticSearch

    前面的博文我們分享了大數(shù)據(jù)分布式流處理計算框架Flink和其基礎(chǔ)環(huán)境的搭建,相信各位看官都已經(jīng)搭建好了自己的運行環(huán)境。那么,今天就來實戰(zhàn)一把使用Flink CDC同步Mysql數(shù)據(jù)導(dǎo)Elasticsearch。 CDC簡介 CDC 的全稱是 Change Data Capture(變更數(shù)據(jù)捕獲技術(shù)) ,在廣義的概念上,只要

    2024年02月09日
    瀏覽(19)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫: 一種是通過FlinkSQL直接輸入兩表數(shù)據(jù)庫映射進行數(shù)據(jù)同步,缺點是只能單表進行同步; 一種是通過DataStream開發(fā)一個maven項目,打成jar包上傳到服務(wù)器運行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應(yīng)用可以部署在具有公網(wǎng)IP的服務(wù)

    2023年04月11日
    瀏覽(27)
  • 【現(xiàn)場問題】flink-cdc,Oracle2Mysql的坑,Oracle區(qū)分大小寫導(dǎo)致

    【現(xiàn)場問題】flink-cdc,Oracle2Mysql的坑,Oracle區(qū)分大小寫導(dǎo)致

    Column ‘id’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘DROP’ to suppress this exception and drop such records silently 大致意思就是不能插入為空的數(shù)值。 為什么會報這個錯誤,我們來看DML的執(zhí)行語句: insert into t_wx_target select

    2024年02月12日
    瀏覽(25)
  • 【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    我一開始是flink-cdc,oracle2Mysql,sql 我一開始直接用的oracle【date】類型,mysql【date】類型,sql的校驗通過了,但是真正操作數(shù)據(jù)的時候報錯,告訴我oracle的數(shù)據(jù)格式的日期數(shù)據(jù),不可以直接插入到mysql格式的日期數(shù)據(jù),說白了就是數(shù)據(jù)格式不一致導(dǎo)致的 我想的是既然格式不對

    2024年02月12日
    瀏覽(25)
  • Flink DataStream API CDC同步MySQL數(shù)據(jù)到StarRocks

    Flink DataStream API CDC同步MySQL數(shù)據(jù)到StarRocks

    Flink:1.16.1 pom文件如下 Java代碼 SourceAndSinkInfo 類,用于定義source和sink的IP、端口、賬號、密碼信息 DataCenterShine實體類,字段與數(shù)據(jù)庫一一對應(yīng)。 StarRocksPrimary 實體類 FieldInfo注解類,用于標(biāo)記字段序號、是否為主鍵、是否為空,后續(xù)生成TableSchema需要使用到。 TableName 注解類,

    2024年02月03日
    瀏覽(32)
  • 基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    這篇教程將展示如何基于 Flink CDC 快速構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步。本教程的演示都將在 Flink SQL CLI 中進行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。 假設(shè)我們有電子商務(wù)業(yè)務(wù),商品的數(shù)據(jù)存儲在 MySQL ,我們需要實時把它同步到 Databend 中。 接下來的內(nèi)容

    2024年02月10日
    瀏覽(29)
  • 基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    基于大數(shù)據(jù)平臺(XSailboat)的計算管道實現(xiàn)MySQL數(shù)據(jù)源的CDC同步--flink CDC

    筆者在先前的一篇文檔《數(shù)據(jù)標(biāo)簽設(shè)計 – 大數(shù)據(jù)平臺(XSailboat)的數(shù)據(jù)標(biāo)簽?zāi)K》 提到了關(guān)于數(shù)據(jù)標(biāo)簽的模塊,現(xiàn)已實現(xiàn)并應(yīng)用于項目中。在項目中遇到這樣一種情形: 如果打標(biāo)信息和業(yè)務(wù)數(shù)據(jù)是在一個數(shù)據(jù)庫實例中,那么只需要連接兩張表進行查詢即可。但是數(shù)據(jù)標(biāo)簽作為

    2024年01月17日
    瀏覽(35)
  • SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    最近做的一個項目,使用的是pg數(shù)據(jù)庫,公司沒有成熟的DCD組件,為了實現(xiàn)數(shù)據(jù)變更消息發(fā)布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka。 監(jiān)聽數(shù)據(jù)變化,進行異步通知,做系統(tǒng)內(nèi)異步任務(wù)。 架構(gòu)方案(懶得寫了,看圖吧): -- 創(chuàng)建pg 高線數(shù)據(jù)同步用

    2024年02月02日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包