前沿: flink cdc功能越發(fā)強(qiáng)大,支持的數(shù)據(jù)源也越多,本篇介紹使用flink cdc實(shí)現(xiàn):
sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整個(gè)流程
1.sqlserver 環(huán)境準(zhǔn)備(得使用sqlserver 16以下版本,flink cdc當(dāng)前只支持16以下sqlserver版本)
我這個(gè)使用的是docker環(huán)境:
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
starrocks.docker.scarf.sh/starrocks/allin1-ubuntu latest 4d3c0066a012 3 days ago 4.71GB
mcr.microsoft.com/mssql/server 2019-latest e7fc0b49be3c 4 weeks ago 1.47GB
mcr.microsoft.com/mssql/server 2022-latest 683d523cd395 5 weeks ago 2.9GB
federatedai/standalone_fate latest 6019ec787699 9 months ago 5.29GB
milvusdb/milvus v2.1.4 d9a5c977c414 11 months ago 711MB
starrocks/dev-env main 8f4edba3b115 16 months ago 7.65GB
minio/minio RELEASE.2022-03-17T06-34-49Z 239acc52a73a 17 months ago 228MB
kicbase/stable v0.0.29 64d09634c60d 20 months ago 1.14GB
quay.io/coreos/etcd v3.5.0 a7908fd5fb88 2 years ago 110MB
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest
docker exec -it --user root sql_server_2019 bash
開(kāi)啟代理,重啟sqlserver環(huán)境,連接:?
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
root@99e196828047:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
SQL Server needs to be restarted in order to apply this setting. Please run
'systemctl restart mssql-server.service'.
root@99e196828047:/# exit
exit
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker restart sql_server_2019
sql_server_2019
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
開(kāi)啟sqlserver cdc功能:
root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
1> use cdc_test;
2> go
Changed database context to 'cdc_test'.
1> EXEC sys.sp_cdc_enable_db;
2> go
1> SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
2> go
is_cdc_enabled
1> CREATE TABLE orders (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id]))
2> go
1>
2>
3> EXEC sys.sp_cdc_enable_table
4> @source_schema = 'dbo',
5> @source_name = 'orders',
6> @role_name = 'cdc_role';
7> go
Job 'cdc.cdc_test_capture' started successfully.
Job 'cdc.cdc_test_cleanup' started successfully.
插入一些數(shù)據(jù):
1> select * from orders;
2> go
id order_date purchaser quantity product_id
----------- ---------------- ----------- ----------- -----------
1 2023-07-07 1 1 1
2 2023-07-07 2 2 2
3 2023-07-07 3 3 3
4 2023-07-07 4 4 4
45 2023-07-07 5 5 5
(5 rows affected)
1> update orders set quantity = 100 where id =1 ;
2> go
(1 rows affected)
1> select * from orders;
2> go
id order_date purchaser quantity product_id
----------- ---------------- ----------- ----------- -----------
1 2023-07-07 1 100 1
2 2023-07-07 2 2 2
3 2023-07-07 3 3 3
4 2023-07-07 4 4 4
45 2023-07-07 5 5 5
(5 rows affected)
1> update orders set quantity = 200 where id = 2;
2> go
2.準(zhǔn)備flink環(huán)境:
- 下載flink 1.16.2 (官網(wǎng)下載)
- 下載flink sqlserver cdc 2.2.0 (Central Repository: com/ververica/flink-cdc-connectors)
- 下載flink starrocks connector 1.15(這個(gè)應(yīng)該也要下載對(duì)應(yīng)版本1.16.2,但官方還沒(méi)出,我拿1.15測(cè)試了也ok)下載鏈接:Release Release 1.2.6 · StarRocks/starrocks-connector-for-apache-flink · GitHub
3.準(zhǔn)備starrocks docker環(huán)境:
見(jiàn)鏈接:使用 Docker 部署 StarRocks @ deploy_with_docker @ StarRocks Docs
4.啟動(dòng)flink環(huán)境(cd {FLINK_HOME}):
xiuchenggong@xiuchengdeMacBook-Pro bin % ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host xiuchengdeMacBook-Pro.local.
Starting taskexecutor daemon on host xiuchengdeMacBook-Pro.local.
xiuchenggong@xiuchengdeMacBook-Pro bin % ./sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/flink-1.16.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
?▓██▓██?
▓████??█▓?▓███▓?
▓███▓?? ???▓██? ?
?██? ??▓▓█▓▓?? ?████
██? ??▓███? ?█?█?
?▓█ ███ ▓??██
▓█ ?????▓██▓???▓▓█
█? █ ??? ███▓▓█ ?█???
████? ?▓█▓ ██??? ▓███?
??█▓▓██ ▓█? ▓█?▓██▓ ?█?
▓??▓████? ██ ?█ █▓??█???█?
███▓?██▓ ▓█ █ █▓ ?▓█▓▓█?
?██▓ ?█? █ █? ?█████▓? ██▓??
███? ? █? ▓ ?█ █████??? ?█?▓ ▓?
██▓█ ??▓? ▓███████▓? ?█? ?▓ ▓██▓
?██▓ ▓█ █▓█ ??█████▓▓?? ██?? █ ? ▓█?
▓█▓ ▓█ ██▓ ?▓▓▓▓▓▓▓? ?██▓ ?█?
▓█ █ ▓███▓?? ?▓▓▓███▓ ??? ▓█
██▓ ██? ??▓▓███▓▓▓▓▓██████▓? ▓███ █
▓███? ███ ?▓▓??? ?▓████▓? ??▓? █▓
█▓??▓▓██ ??????????▓██▓? █▓
██ ▓??█ ▓▓▓▓??? ?█▓ ?▓▓██▓ ▓? ??▓
▓█▓ ▓?█ █▓? ??▓▓██? ?▓█? ??????▓█████?
██? ▓█?█? ?▓▓? ▓█ █? ???? ?█?
▓█ ?█▓ ? █? ?█ █▓
█▓ ██ █? ▓▓ ?█▓▓▓?█?
█▓ ?▓██? ▓? ▓█▓?????▓█? ?█
██ ▓█▓? ? ??█?██? ▓▓
▓█? ?█▓?? ?? █?█▓?????██
?██? ?▓▓? ▓██▓?█? ?▓▓▓▓?█▓
?▓██? ▓? ?█▓█ ?????
?▓▓▓▓▓?????????????????????????▓▓ ▓??█?
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /Users/xiuchenggong/.flink-sql-history
Flink SQL>
建sqlsever到flink的表:
Flink SQL> CREATE TABLE t_source_sqlserver (
> id INT,
> order_date DATE,
> purchaser INT,
> quantity INT,
> product_id INT,
> PRIMARY KEY (id) NOT ENFORCED -- 主鍵定義(可選)
> ) WITH (
> 'connector' = 'sqlserver-cdc', -- 使用SQL Server CDC連接器
> 'hostname' = 'localhost', -- SQL Server主機(jī)名
> 'port' = '30027', -- SQL Server端口
> 'username' = 'sa', -- SQL Server用戶名
> 'password' = 'abc@123456', -- SQL Server密碼
> 'database-name' = 'cdc_test', -- 數(shù)據(jù)庫(kù)名稱
> 'schema-name' = 'dbo', -- 模式名稱
> 'table-name' = 'orders' -- 要捕獲更改的表名
> );
?再建flink到starrocks的表:
Flink SQL>
>
> CREATE TABLE IF NOT EXISTS `orders_sink` (
> id int,
> order_date date,
> purchaser int,
> quantity int,
> product_id int,
> PRIMARY KEY(`id`) NOT ENFORCED
> ) with (
> 'load-url' = 'localhost:8030',
> 'sink.buffer-flush.interval-ms' = '15000',
> 'sink.properties.row_delimiter' = '\x02',
> 'sink.properties.column_separator' = '\x01',
> 'connector' = 'starrocks',
> 'database-name' = 'test',
> 'table-name' = 'orders',
> 'jdbc-url' = 'jdbc:mysql://localhost:9030',
> 'password' = '',
> 'username' = 'root'
> )
> ;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+--------------------+
| table name |
+--------------------+
| orders_sink |
| t_source_sqlserver |
+--------------------+
2 rows in set
提交作業(yè):
Flink SQL> insert into orders_sink select * from t_source_sqlserver;
[INFO] Submitting SQL update statement to the cluster...
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/xiuchenggong/flink/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 746cc173cd71133e96d080f25327e9bc
flink webui看到長(zhǎng)期駐留的作業(yè):
5.驗(yàn)證在sqlserver中的數(shù)據(jù)是不是已經(jīng)同步到starrocks中了,insert/update/delete:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-688958.html
StarRocks > select * from orders;
+------+------------+-----------+----------+------------+
| id | order_date | purchaser | quantity | product_id |
+------+------------+-----------+----------+------------+
| 1 | 2023-07-07 | 1 | 100 | 1 |
| 3 | 2023-07-07 | 3 | 3 | 3 |
| 4 | 2023-07-07 | 4 | 4 | 4 |
| 45 | 2023-07-07 | 5 | 5 | 5 |
| 2 | 2023-07-07 | 2 | 200 | 2 |
+------+------------+-----------+----------+------------+
5 rows in set (0.016 sec)
StarRocks >
數(shù)據(jù)的增刪改都同步過(guò)去了;文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-688958.html
到了這里,關(guān)于使用flink sqlserver cdc 同步數(shù)據(jù)到StarRocks的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!