国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

技術(shù)干貨|如何利用 ChunJun 實現(xiàn)數(shù)據(jù)實時同步?

這篇具有很好參考價值的文章主要介紹了技術(shù)干貨|如何利用 ChunJun 實現(xiàn)數(shù)據(jù)實時同步?。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

實時同步是 ChunJun 的?個重要特性,指在數(shù)據(jù)同步過程中,數(shù)據(jù)源與?標系統(tǒng)之間的數(shù)據(jù)傳輸和更新?乎在同?時間進?。

在實時同步場景中我們更加關(guān)注源端,當(dāng)源系統(tǒng)中的數(shù)據(jù)發(fā)?變化時,這些變化會?即傳輸并應(yīng)?到?標系統(tǒng),以保證兩個系統(tǒng)中的數(shù)據(jù)保持?致。這個特性需要作業(yè)運?過程中 source 插件不間斷地頻繁訪問源端。在?產(chǎn)場景下,對于這類?時間運?、資源可預(yù)估、需要穩(wěn)定性的作業(yè),我們推薦使? perjob 模式部署。

插件?持 JSON 腳本和 SQL 腳本兩種配置?式,具體的參數(shù)配置請參考「ChunJun連接器文檔」:https://sourl.cn/vxq6Zp

本文將為大家介紹如何使用 ChunJun 實時同步,以及 ChunJun ?持的 RDB 實時采集插件的特性、采集邏輯及其原理,幫助大家更好地理解 ChunJun 與實時同步。

如何使用 ChunJun 實時同步

為了讓?家能更深?了解如何使? ChunJun 做實時同步,我們假設(shè)有這樣?個場景:?個電商?站希望將其訂單數(shù)據(jù)從 MySQL 數(shù)據(jù)庫實時同步到 HBase 數(shù)據(jù)庫,以便于后續(xù)的數(shù)據(jù)分析和處理。

在這個場景中,我們將使? Kafka 作為中間消息隊列,以實現(xiàn) MySQL 和 HBase 之間的數(shù)據(jù)同步。這樣做的好處是 MySQL 表中變更可以實時同步到 HBase 結(jié)果表中,?不?擔(dān)?歷史數(shù)據(jù)被修改后 HBase 表未被同步。

如果在?家的實際應(yīng)用場景中,不關(guān)?歷史數(shù)據(jù)是否變更(或者歷史數(shù)據(jù)根本不會變更),且業(yè)務(wù)表有?個遞增的主鍵,那么可以參考本?之后的 JDBC-Polling 模式?節(jié)的內(nèi)容。

· 數(shù)據(jù)源組件的部署以及 ChunJun 的部署這?不做詳細描述

· 案例中的腳本均以 SQL 腳本為例,JSON 腳本也能實現(xiàn)相同功能,但在參數(shù)名上可能存在出?,使? JSON 的同學(xué)可以參考上文 「ChunJun 連接器」?檔中的參數(shù)介紹

采集 MySQL 數(shù)據(jù)到 Kafka

● 數(shù)據(jù)準備

?先,我們在 Kafka 中創(chuàng)建?個名為 order_dml 的 topic,然后在 MySQL 中創(chuàng)建?個訂單表,并插??些測試數(shù)據(jù)。創(chuàng)建表的 SQL 語句如下:

-- 創(chuàng)建?個名為ecommerce_db的數(shù)據(jù)庫,?于存儲電商?站的數(shù)據(jù)
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;
-- 創(chuàng)建?個名為orders的表,?于存儲訂單信息
CREATE TABLE IF NOT EXISTS orders (
 id INT AUTO_INCREMENT PRIMARY KEY, -- ?增主鍵
 order_id VARCHAR(50) NOT NULL, -- 訂單編號,不能為空
 user_id INT NOT NULL, -- ?戶ID,不能為空
 product_id INT NOT NULL, -- 產(chǎn)品ID,不能為空
 quantity INT NOT NULL, -- 訂購數(shù)量,不能為空
 order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- 訂單?期,默認值為當(dāng)前時間
戳,不能為空
);
-- 插??些測試數(shù)據(jù)到orders表
INSERT INTO orders (order_id, user_id, product_id, quantity) 
VALUES ('ORD123', 1, 101, 2), 
('ORD124', 2, 102, 1), 
('ORD125', 3, 103, 3), 
('ORD126', 1, 104, 1), 
('ORD127', 2, 105, 5);

● 使用 Binlog 插件采集數(shù)據(jù)到 Kafka

為了表示數(shù)據(jù)的變化類型和更好地處理數(shù)據(jù)變化,實時采集插件一般會用 RowData(Flink 內(nèi)部數(shù)據(jù)結(jié)構(gòu))中的 RowKind 記錄?志中的數(shù)據(jù)事件(insert、delete 等)類型,binlog 插件也?樣。而當(dāng)數(shù)據(jù)被打到 Kafka 中時,RowKind 信息應(yīng)該怎么處理呢?

這?我們就需要?到 upsert-kafka-x,upsert-kafka-x 會識別 RowKind。對各類時間的處理邏輯如下:

? insert 數(shù)據(jù):序列化后直接打?

? delete 數(shù)據(jù):只寫 key,value 置為 null

? update 數(shù)據(jù):分為?條 delete 數(shù)據(jù)和 insert 數(shù)據(jù)處理,即先根據(jù)主鍵刪除原本的數(shù)據(jù),再寫? update 后的數(shù)據(jù)

在下?步中我們再解釋如何將 Kafka 中的數(shù)據(jù)還原到 HBase 或者其他?持 upsert 語義的數(shù)據(jù)庫中,接下來我們來編寫 SQL 腳本,實現(xiàn) MySQL 數(shù)據(jù)實時采集到 Kafka 中的功能,示例如下:

CREATE TABLE binlog_source ( 
id int, 
order_id STRING, 
user_id INT, 
product_id int, 
quantity int, 
order_date TIMESTAMP(3) 
) WITH ( 
'connector' = 'binlog-x', 
'username' = 'root', 
'password' = 'root', 
'cat' = 'insert,delete,update', 
'url' = 'jdbc:mysql://localhost:3306/ecommerce_db?useSSL=false', 
'host' = 'localhost', 
'port' = '3306', 
'table' = 'ecommerce_db.orders', 
'timestamp-format.standard' = 'SQL', 
'scan.parallelism' = '1' 
); 
CREATE TABLE kafka_sink ( 
id int, 
order_id STRING, 
user_id INT, 
product_id int, 
quantity int, 
order_date TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED 
) WITH ( 
'connector' = 'upsert-kafka-x', 
'topic' = 'orders', 
'properties.bootstrap.servers' = 'localhost:9092', 
'key.format' = 'json', 
'value.format' = 'json', 
'value.fields-include' = 'ALL', 
'sink.parallelism' = '1' 
); 
insert into 
kafka_sink 
select 
* 
from 
binlog_source u;

還原 Kafka 中的數(shù)據(jù)到 HBase

上述步驟中,我們通過 binlog-x 和 upsert-kafka-x,將 MySQL 中的數(shù)據(jù)實時采集到了 Kafka 中。解鈴還須系鈴?,我們可以通過 upsert-kafka-x 再去將 Kafka 中的數(shù)據(jù)解析成帶有 upsert 語義的數(shù)據(jù)。

upsert-kafka-x 作為 source 插件時,會判斷 Kafka 中數(shù)據(jù)的 value 是否為 null,如果 value 為 null 則標記這條數(shù)據(jù)的 RowKind 為 DELETE,否則將數(shù)據(jù)的 ROWKIND 標記為 INSERT。

ChunJun的 hbase-x 插件?前已經(jīng)具備了 upsert 語句的能?,使? hbase-x 即可將 Kafka 中的數(shù)據(jù)還原到 hbase中。接下來是 SQL 腳本示例,為了?便在 HBase 中查看數(shù)據(jù)結(jié)果,我們將 int 數(shù)據(jù) cast 為 string 類型:

CREATE TABLE kafka_source ( 
id int, 
order_id STRING, 
user_id INT, 
product_id INT, 
quantity INT, 
order_date TIMESTAMP(3), 
PRIMARY KEY (id) NOT ENFORCED 
) WITH ( 
'connector' = 'upsert-kafka-x', 
'topic' = 'orders', 
'properties.bootstrap.servers' = 'localhost:9092', 
'properties.group.id' = 'test_group', 
'key.format' = 'json', 
'value.format' = 'json', 
'scan.parallelism' = '1' 
); 
CREATE TABLE hbase_sink( 
rowkey STRING, order_info ROW < order_id STRING, 
user_id STRING, 
product_id STRING, 
quantity STRING, 
order_date STRING >, 
PRIMARY KEY (rowkey) NOT ENFORCED 
) WITH( 
-- 這?以hbase14為例,如果hbase版本是2.x,我們可以使?hbase2-x插件代替 
'connector' = 'hbase14-x', 
'zookeeper.quorum' = 'localhost:2181', 
'zookeeper.znode.parent' = '/hbase', 
'table-name' = 'ecommerce_db:orders', 
'sink.parallelism' = '1' 
); 
INSERT INTO 
hbase_sink 
SELECT 
cast(id as STRING), 
ROW( 
cast(order_id as STRING), 
cast(user_id as STRING), 
cast(product_id as STRING), 
cast(quantity as STRING), 
cast(order_date as STRING) 
) 
FROM 
kafka_source

Tips:如果我們不需要 Kafka 中間件,也可以使? binlog-x 插件直接對接 hbase-x 插件。

ChunJun 支持的 RDB 實時采集插件

本節(jié)主要介紹 ChunJun 的 RDB 實時采集插件的特性、采集邏輯及其原理。

ChunJun 的 RDB 實時采集可以實時監(jiān)視數(shù)據(jù)庫中的更改,并在發(fā)?更改時讀取數(shù)據(jù)變化,例如插?、更新和刪除操作。使? ChunJun 實時采集,我們可以實時獲取有關(guān)數(shù)據(jù)庫中更改的信息,從?能夠及時響應(yīng)這些更改,如此便可以幫助我們更好地管理和利? RDB 數(shù)據(jù)庫中的數(shù)據(jù)。

并且 ChunJun 提供了故障恢復(fù)和斷點續(xù)傳功能來確保數(shù)據(jù)的完整性。ChunJun 實時采集類插件的?致實現(xiàn)步驟如下:

· 連接數(shù)據(jù)庫,確認讀取點位,讀取點位可以理解為?個 offset,如 Binlog 中,指?志的?件名和?件的 position 信息

· 根據(jù)讀取點位開始讀取 redolog,獲取其中關(guān)于數(shù)據(jù)變更相關(guān)的操作記錄

· 根據(jù) tableName、操作事件(如insert、delete、update)等過濾信息過濾出需要的 log ?志

· 解析 log ?志,解析后的事件信息包括表名、數(shù)據(jù)庫名、操作類型(插?、更新或刪除)和變更的數(shù)據(jù)?等

· 將解析出來的數(shù)據(jù)會加?為 ChunJun 內(nèi)部統(tǒng)?的 DdlRowData 供下游使?

ChunJun ?前已?持的實時采集 Connector 有:binlog(mysql)、oceanbasecdc、oraclelogminer、sqlservercdc。

Binlog 簡介

ChunJun binlog 插件的主要功能是讀取 MySQL 的?進制?志(binlog)?件。這些?件記錄了所有對數(shù)據(jù)的更改操作,如插?、更新和刪除等。?前,該插件依賴 Canal 組件來讀取 MySQL 的 binlog ?件。

核?操作步驟如下:

? 確認讀取點位:在 binlog 插件中,我們可以在腳本的 start 字段中直接指定 journal-name(binlog ?件名)和 position(?件的特定位置)

? 讀取binlog:binlog 插件將?身偽裝成 MySQL 的 Slave 節(jié)點,向 MySQL Master 發(fā)送請求,要求將 binlog ?件的數(shù)據(jù)流發(fā)送給它

? 故障恢復(fù)和斷點續(xù)傳:故障時,插件會記錄當(dāng)前的 binlog 位置信息,從 checkpoint/savepoint 恢復(fù)后,我們可以從上次記錄的位置繼續(xù)讀取 binlog ?件,確保數(shù)據(jù)變化的完整性

使? binlog 所需的權(quán)限在「binlog插件使??檔」中有詳細說明,鏈接如下:

https://sourl.cn/mvae9m

OracleLogminer 簡介

Logminer 插件借助 Oracle 提供的 Logminer ?具通過讀取視圖的?式獲取 Oracle redolog 中的信息。

核?操作步驟如下:

01 定位需讀取起始點位(start_scn)

?前 logminer ?持四種策略指定 StartScn:

· all:從 Oracle 數(shù)據(jù)庫中最早的歸檔?志組開始采集(不建議使?)

· current:任務(wù)運?時的 SCN 號

· time:指定時間點對應(yīng)的 SCN 號

· scn:直接指定 SCN 號

02 定位需要讀取的結(jié)束點位(end_scn)

插件根據(jù) start_scn 和 maxLogFileSize(默認5G)獲取可加載的 redolog ?件列表,end_scn 取這個?件列表中最?的 scn 值。

03 加載 redo ?志到 Logminer

通過?個存儲過程,將 scn 區(qū)間范圍內(nèi)的 redolog 加載到 Logminer ?。

04 從視圖中讀取數(shù)據(jù)

以 scn > ? 作為 where 條件直接查詢 v$logmnr_contents 視圖內(nèi)的信息即可獲取 redolog 中的數(shù)據(jù)。

05 重復(fù)1-4步驟,實現(xiàn)不斷的讀取

如標題。

06 故障恢復(fù)和斷點續(xù)傳

在發(fā)?故障時,插件會保存當(dāng)前消費的 scn 號,重啟時從上次的 scn 號開始讀取,確保數(shù)據(jù)完整性。

? 關(guān)于該插件原理的詳細介紹請參?「Oracle Logminer 實現(xiàn)原理說明?檔」:

https://sourl.cn/6vqz4b

? 使?lominer插件的前提條件詳?「Oracle配置LogMiner」:

https://sourl.cn/eteyZY

SqlServerCDC 簡介

SqlServerCDC 插件依賴 SQL Server 的 CDC Agent 服務(wù)提供的視圖獲取 redolog 中的信息。

核?操作步驟如下:

01 定位需讀取起始點位(from_lsn)

?前 SqlserverCDC 僅?持直接配置 lsn 號,如果 lsn 號未配置,則取數(shù)據(jù)庫中當(dāng)前最?的 lsn 號為 from_lsn。

02 定位需要讀取的結(jié)束點位(to_lsn)

SqlserverCDC 插件定期地(可通過 pollInterval 參數(shù)指定)獲取數(shù)據(jù)庫中的最? lsn 為 end_lsn。

03 從視圖中讀取數(shù)據(jù)

查詢 Agent 服務(wù)提供的視圖中 lsn 區(qū)間范圍內(nèi)的數(shù)據(jù),過濾出需要監(jiān)聽的表及事件類型。

04 重復(fù)1-3步驟,實現(xiàn)不斷的讀取

如標題。

05 故障恢復(fù)和斷點續(xù)傳

在發(fā)?故障時,插件會保存當(dāng)前消費的 lsn 號。重啟時從上次的 lsn 號開始讀取,確保數(shù)據(jù)完整性。

? 關(guān)于該插件原理的詳細介紹請參?「Sqlserver CDC 實現(xiàn)原理說明?檔」:

https://sourl.cn/5pQvEM

? 配置 SqlServer CDC Agent 服務(wù)詳?「Sqlserver 配置 CDC ?檔」:

https://sourl.cn/h5nd8j

OceanBaseCDC 簡介

OceanBase 是螞蟻集團開源的?款分布式關(guān)系型數(shù)據(jù)庫,它使??進制?志(binlog)記錄數(shù)據(jù)變更。OceanBaseCDC 的實現(xiàn)依賴于 OceanBase 提供的 LogProxy 服務(wù),LogProxy 提供了基于發(fā)布-訂閱模型的服務(wù),允許使? OceanBase 的 logclient 訂閱特定的 binlog 數(shù)據(jù)流。

OceanBaseCDC 啟動?個 Listener 線程。當(dāng) logclient 連接到 LogProxy 后,Listener 會訂閱經(jīng)過數(shù)據(jù)過濾的 binlog,然后將其添加到內(nèi)部維護的列表中。當(dāng)收到 COMMIT 信息后,Listener 會將?志變更信息傳遞給?個阻塞隊列,由主線程消費并將其轉(zhuǎn)換為 ChunJun 內(nèi)部的 DdlRowData,最終發(fā)送到下游。

JDBC-Polling 模式讀

JDBC 插件的 polling 讀取模式是基于 SQL 語句做數(shù)據(jù)讀取的,相對于基于重做?志的實時采集成本更低,但 jdbc 插件做實時同步對業(yè)務(wù)場景有更?的要求:

· 有?個數(shù)值類型或者時間類型的遞增主鍵

· 不更新歷史數(shù)據(jù)或者不關(guān)?歷史數(shù)據(jù)是否更新,僅關(guān)?新數(shù)據(jù)的獲取

實現(xiàn)原理簡介

? 設(shè)置遞增的業(yè)務(wù)主鍵作為 polling 模式依賴的增量鍵

? 在增量讀取的過程中,實時記錄 increColumn 對應(yīng)的值(state),作為下?次數(shù)據(jù)讀取的起始點位

? 當(dāng)?批數(shù)據(jù)讀取完后,間隔?段時間之后依據(jù) state 讀取下?批數(shù)據(jù)

polling 依賴部分增量同步的邏輯,關(guān)于增量同步的更多介紹可以點擊:

https://sourl.cn/UC8n6K

如何配置?個 jdbc-polling 作業(yè)

先介紹?下開啟 polling 模式需要關(guān)注的配置項:

技術(shù)干貨|如何利用 ChunJun 實現(xiàn)數(shù)據(jù)實時同步?

以 MySQL 為例,假設(shè)我們有?個存儲訂單信息的歷史表,且訂單的 order_id 是遞增的,我們希望定期地獲取這張表的新增數(shù)據(jù)。

CREATE TABLE order.realtime_order_archive ( 
order_id INT PRIMARY KEY COMMENT '訂單唯?標識', 
customer_id INT COMMENT '客戶唯?標識', 
product_id INT COMMENT '產(chǎn)品唯?標識', 
order_date TIMESTAMP COMMENT '訂單?期和時間', 
payment_method VARCHAR(255) COMMENT '?付?式(信?卡、?付寶、微信?付等)', 
shipping_method VARCHAR(255) COMMENT '配送?式(順豐速運、圓通速遞等)', 
shipping_address VARCHAR(255) COMMENT '配送地址', 
order_total DECIMAL(10,2) COMMENT '訂單總?額', 
discount DECIMAL(10,2) COMMENT '折扣?額', 
order_status VARCHAR(255) COMMENT '訂單狀態(tài)(已完成、已取消等)' 
); 

我們可以這樣配置 json 腳本的 reader 信息。

"name": "mysqlreader", 
"parameter": { 
"column" : [ 
"*" //這?假設(shè)我們讀取所有字段,可以填寫‘*’ 
], 
"increColumn": "id", 
"polling": true, 
"pollingInterval": 3000, 
"username": "username", 
"password": "password", 
"connection": [ 
{ 
"jdbcUrl": [ 
"jdbc:mysql://ip:3306/liuliu?useSSL=false" 
], 
"schema":"order", 
"table": [ 
"realtime_order_archive" ] 
} 
] 
} 
}

《數(shù)棧產(chǎn)品白皮書》:https://fs80.cn/cw0iw1

《數(shù)據(jù)治理行業(yè)實踐白皮書》下載地址:https://fs80.cn/380a4b

想了解或咨詢更多有關(guān)袋鼠云大數(shù)據(jù)產(chǎn)品、行業(yè)解決方案、客戶案例的朋友,瀏覽袋鼠云官網(wǎng):https://www.dtstack.com/?src=szbky

同時,歡迎對大數(shù)據(jù)開源項目有興趣的同學(xué)加入「袋鼠云開源框架釘釘技術(shù)qun」,交流最新開源技術(shù)信息,qun號碼:30537511,項目地址:https://github.com/DTStack文章來源地址http://www.zghlxwxcb.cn/news/detail-423678.html

到了這里,關(guān)于技術(shù)干貨|如何利用 ChunJun 實現(xiàn)數(shù)據(jù)實時同步?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Vue中如何利用websocket實現(xiàn)實時通訊

    Vue中如何利用websocket實現(xiàn)實時通訊

    原理很簡單,有點像VUE中的EventBus,用emit和on傳來傳去 首先我們可以先去自己去用node搭建一個本地服務(wù)器 步驟如下 1.新建一個app.js,然后創(chuàng)建pagejson.js文件,輸入以下指令 npm init -y 2.下載 express包 pnpm? i? express 3.在app.js里面去進行導(dǎo)包創(chuàng)建express實例 ? 4.重啟服務(wù)器,我們可

    2023年04月08日
    瀏覽(17)
  • MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal

    MySQL如何實時同步數(shù)據(jù)到ES?試試阿里開源的Canal

    前幾天在網(wǎng)上沖浪的時候發(fā)現(xiàn)了一個比較成熟的開源中間件——? Canal? 。在了解了它的工作原理和使用場景后,頓時產(chǎn)生了濃厚的興趣。今天,就讓我們跟隨我的腳步,一起來揭開它神秘的面紗吧。 目錄 前言 簡介? 工作原理? MySQL主備復(fù)制原理 canal 工作原理 Canal架構(gòu)? C

    2024年02月20日
    瀏覽(25)
  • canal實現(xiàn)mysql數(shù)據(jù)實時同步到es

    最近有一個需求:原有一些mysql數(shù)據(jù),這些數(shù)據(jù)量很大,且包含文本信息,需要對其進行搜索,這時如果使用mysql的like來匹配,效率會很低,且很可能影響整個系統(tǒng)的運行,經(jīng)過和同事的討論,最終決定使用es來做搜索。 但是源數(shù)據(jù)有很多關(guān)聯(lián)關(guān)系,搜索的時候也會帶上這些

    2024年02月16日
    瀏覽(87)
  • [大數(shù)據(jù) Flink,Java實現(xiàn)不同數(shù)據(jù)庫實時數(shù)據(jù)同步過程]

    目錄 ??前言: ??實現(xiàn)Mysql同步Es的過程包括以下步驟: ??配置Mysql數(shù)據(jù)庫連接 ??在Flink的配置文件中,添加Mysql數(shù)據(jù)庫的連接信息??梢栽趂link-conf.yaml文件中添加如下配置: ??在Flink程序中,使用JDBCInputFormat來連接Mysql數(shù)據(jù)庫,并定義查詢語句,獲取需要同步的數(shù)據(jù)。具體代

    2024年02月10日
    瀏覽(22)
  • ETLCloud+MaxCompute實現(xiàn)云數(shù)據(jù)倉庫的高效實時同步

    ETLCloud+MaxCompute實現(xiàn)云數(shù)據(jù)倉庫的高效實時同步

    MaxCompute是適用于數(shù)據(jù)分析場景的企業(yè)級SaaS(Software as a Service)模式云數(shù)據(jù)倉庫,以Serverless架構(gòu)提供快速、全托管的在線數(shù)據(jù)倉庫服務(wù),消除了傳統(tǒng)數(shù)據(jù)平臺在資源擴展性和彈性方面的限制,最小化用戶運維投入,使您可以經(jīng)濟并高效地分析處理海量數(shù)據(jù)。 MaxCompute提供離線

    2024年02月13日
    瀏覽(21)
  • 基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一)

    基于Canal與Flink實現(xiàn)數(shù)據(jù)實時增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    瀏覽(98)
  • MySQL如何實時同步數(shù)據(jù)到ES?試試這款阿里開源的神器!

    MySQL如何實時同步數(shù)據(jù)到ES?試試這款阿里開源的神器!

    mall 項目中的商品搜索功能,一直都沒有做實時數(shù)據(jù)同步。最近發(fā)現(xiàn)阿里巴巴開源的 canal 可以把MySQL中的數(shù)據(jù)實時同步到Elasticsearch中,能很好地解決數(shù)據(jù)同步問題。今天我們來講講 canal 的使用,希望對大家有所幫助! canal主要用途是對MySQL數(shù)據(jù)庫增量日志進行解析,提供增量

    2024年04月14日
    瀏覽(19)
  • 揭秘 ChunJun:如何實現(xiàn) e2e&session 日志隔離

    揭秘 ChunJun:如何實現(xiàn) e2e&session 日志隔離

    本文將從 e2e 的基本介紹,e2e 的使用與擴展,session 日志隔離三個維度為大家?guī)?ChunJun e2e session 日志隔離的分享。 大量具體代碼和演示請看視頻教程?? 視頻課程: https://www.bilibili.com/video/BV1ru411P7oZ/?spm_id_from=333.999.0.0 課件獲?。?https://www.dtstack.com/resources/1052?src=szsm ChunJu

    2024年02月08日
    瀏覽(22)
  • 基于Canal實現(xiàn)Mysql數(shù)據(jù)實時同步到Elasticsearch(Docker版)

    基于Canal實現(xiàn)Mysql數(shù)據(jù)實時同步到Elasticsearch(Docker版)

    1、Canal簡介 ??Canal主要用途是對MySQL數(shù)據(jù)庫增量日志進行解析,提供增量數(shù)據(jù)的訂閱和消費,簡單說就是可以對MySQL的增量數(shù)據(jù)進行實時同步,支持同步到MySQL、Elasticsearch、HBase等數(shù)據(jù)存儲中去。 ??Canal會模擬MySQL主庫和從庫的交互協(xié)議,從而偽裝成MySQL的從庫,然后向My

    2024年02月10日
    瀏覽(93)
  • 利用MQ實現(xiàn)mysql與elasticsearch數(shù)據(jù)同步

    利用MQ實現(xiàn)mysql與elasticsearch數(shù)據(jù)同步

    1.聲明exchange、queue、RoutingKey 2. 在hotel-admin中進行增刪改(SQL),完成消息發(fā)送 3. 在hotel-demo中完成消息監(jiān)聽,并更新elasticsearch數(shù)據(jù) 4. 測試同步 我這里的mq是掛在了docker上,虛擬機地址是192.168.116.128。到時候這個根據(jù)自己的項目改就行 在hotel-demo中,定義配置類,聲明隊列、交

    2024年02月09日
    瀏覽(16)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包