01 引言
官方文檔:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md
如果要使用flink cdc做sqlserver的實時同步,需要滿足以下條件:
- 需要安裝SQLServer(需要支持CDC的功能,SQLServer 2008之后的版本都支持);
- 需要開啟SQL Server代理;
- 啟用CDC功能。
ok,接下來開始講解。
02 SQLServer安裝
首先需要先安裝SqlServer(使用的是2019版本),有興趣的同學(xué)可以參考博主之前寫的《Docker下安裝SqlServer2019》。
主要就是兩個步驟:
## 拉取最新鏡像
docker pull mcr.microsoft.com/mssql/server:2019-latest
## 運行 SQL Server 容器(密碼必須是8個字符,并包含字母、數(shù)字和特殊字符,如:abc@123456 ,下面映射主機端口為30027)
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
03 開啟SQLServer代理
首先使用root用戶進入容器:
docker exec -it --user root sql_server_2019 bash
進入容器后,執(zhí)行命令啟用SqlServeragent:
/opt/mssql/bin/mssql-conf set sqlagent.enabled true
退出,并重啟容器:
exit
docker restart sql_server_2019
具體操作如下:
04 開啟CDC功能
step1:創(chuàng)建’cdc_test’數(shù)據(jù)庫,并使用連接工具登錄該數(shù)據(jù)庫,使用以下 SQL 命令啟用 CDC 功能:
-- 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE cdc_test;
-- 啟用CDC功能
EXEC sys.sp_cdc_enable_db;
-- 判斷當(dāng)前數(shù)據(jù)庫是否啟用了CDC(如果返回1,表示已啟用)
SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
step2:選擇要進行 CDC 跟蹤的表(這里使用orders表作為演示
)
-- 創(chuàng)建示例表(orders)
CREATE TABLE orders (
id int,
order_date date,
purchaser int,
quantity int,
product_id int,
PRIMARY KEY ([id])
);
-- schema_name 是表所屬的架構(gòu)(schema)的名稱。
-- table_name 是要啟用 CDC 跟蹤的表的名稱。
-- cdc_role 是 CDC 使用的角色的名稱。如果沒有指定角色名稱,系統(tǒng)將創(chuàng)建一個默認(rèn)角色。
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'orders',
@role_name = 'cdc_role';
執(zhí)行結(jié)果如下:
step3:啟用 CDC 后,SQL Server 將自動跟蹤啟用了 CDC 的表上的數(shù)據(jù)更改,并將更改信息存儲在 CDC 相關(guān)的表中,您可以使用這些信息進行數(shù)據(jù)更改追蹤和同步。
-- 查詢在當(dāng)前數(shù)據(jù)庫下所有的表:
SELECT * FROM INFORMATION_SCHEMA.TABLES
05 Flink SQL
ok,現(xiàn)在可以寫FlinkSQL了,如下:
-- 創(chuàng)建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)連接器從SQL Server數(shù)據(jù)庫讀取數(shù)據(jù)
CREATE TABLE t_source_sqlserver (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED -- 主鍵定義(可選)
) WITH (
'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC連接器
'hostname' = '10.194.183.120', -- SQL Server主機名
'port' = '30027', -- SQL Server端口
'username' = 'sa', -- SQL Server用戶名
'password' = 'abc@123456', -- SQL Server密碼
'database-name' = 'cdc_test', -- 數(shù)據(jù)庫名稱
'schema-name' = 'dbo', -- 模式名稱
'table-name' = 'orders' -- 要捕獲更改的表名
);
-- 創(chuàng)建目標(biāo)表table_sink_mysql,使用JDBC連接器將數(shù)據(jù)寫入MySQL數(shù)據(jù)庫
CREATE TABLE table_sink_mysql (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED -- 主鍵定義(可選)
)
WITH (
'connector' = 'jdbc', -- 使用JDBC連接器
'url' = 'jdbc:mysql://10.194.183.120:30025/test', -- MySQL的JDBC URL
'username' = 'root', -- MySQL用戶名
'password' = 'root', -- MySQL密碼
'table-name' = 'orders' -- 要寫入的MySQL表名
);
-- 從t_source_sqlserver表中選擇數(shù)據(jù),并將其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
啟動程序,一切正常:
06 驗證
驗證新增:
驗證修改:文章來源:http://www.zghlxwxcb.cn/news/detail-717483.html
驗證刪除:文章來源地址http://www.zghlxwxcb.cn/news/detail-717483.html
到了這里,關(guān)于flink sqlserver cdc實時同步(含sqlserver安裝配置等)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!