1.環(huán)境準(zhǔn)備
首先我們要基于Flink CDC MySQL同步MySQL的環(huán)境基礎(chǔ)上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。筆者已經(jīng)搭建好環(huán)境,這里不做具體演示了,如果需要Es的搭建教程情況筆者其他博客
注意: 建議生產(chǎn)環(huán)境統(tǒng)一使用穩(wěn)定版本Flink1.16.*。筆者這里只是作為教程編寫采用當(dāng)下最新版本,生產(chǎn)環(huán)境不推薦使用
2.編譯flink-sql-connector-mysql-cdc
最新版本flink-1.17.1 mysql同步Es具體jar依賴版本如下所示:
注意:下載鏈接僅適用于穩(wěn)定版本,SNAPSHOT依賴需要您自己構(gòu)建。
flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
flink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jar(需要自行進(jìn)行構(gòu)建編譯,筆者構(gòu)建的已經(jīng)上次至次博客。需要可以進(jìn)行下載,csdn需要積分下載,無法設(shè)置免費的,需要免費版可以直接聯(lián)系筆者)
下載所需的JAR包并放在下面flink-1.17.1/lib/:
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
3.建立mysql和Es映射關(guān)系表
使用以下命令啟動 Flink SQL CLI:
./bin/sql-client.sh
我們應(yīng)該看到 CLI 客戶端的歡迎屏幕。首先,每 3 秒啟用一次檢查點
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
編輯源數(shù)據(jù)庫Flink Sql代碼,如下所示:
CREATE TABLE products (
id INT NOT NULL,
name STRING,
description STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', #引入的CDC jar包驅(qū)動,沒有引入會報錯提示需要引入
'hostname' = '192.168.50.163',#源數(shù)據(jù)庫連接host地址,可以根據(jù)自己的具體設(shè)置,此處為筆者本機的
'port' = '3306', #源數(shù)據(jù)庫端口
'username' = 'root',#源數(shù)據(jù)庫賬號
'password' = '*****',#源數(shù)據(jù)庫密碼
'database-name' = 'mydb',#源數(shù)據(jù)庫
'table-name' = 'products'#源數(shù)據(jù)庫表
);
在Flink SQL 執(zhí)行以下語句創(chuàng)建從相應(yīng)數(shù)據(jù)庫表捕獲更改數(shù)據(jù)的表
-- Flink SQL
Flink SQL> CREATE TABLE products (
> id INT,
> name STRING,
> description STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.168.50.163',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '****',
> 'database-name' = 'mydb',
> 'table-name' = 'products'
> );
在es創(chuàng)建要同步的目標(biāo)索引,具體語句如下:
PUT product1
{
"settings": {
"number_of_shards": 12,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "keyword"
},
"description": {
"type": "text"
}
}
}
}
編輯目標(biāo)ES映射Flink Sql代碼,如下所示:
CREATE TABLE product1 (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',#目標(biāo)ES版本,最新目前支持7
'hosts' = 'http://192.168.50.236:9200',#連接信息
'index' = 'product1'#索引信息
);
注意: 本文Es為測試版本沒有配置賬號密碼,如果有賬號密碼配置即可 ‘username’ = ‘xxxx’,‘password’=‘xxxx’
建立目標(biāo)索引與Flink SQL的映射關(guān)系,具體語句如下:
-- Flink SQL
CREATE TABLE product1 (
> id INT,
> name STRING,
> description STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',#目標(biāo)ES版本,最新目前支持7
> 'hosts' = 'http://192.168.50.236:9200',#連接信息
> 'index' = 'product1'#索引信息
> );
使用Flink SQL添加mysql和Es映射表數(shù)據(jù)關(guān)聯(lián)關(guān)系
-- Flink SQL
Flink SQL> insert into product1 select * from products;
4.時區(qū)問題處理
錯誤:
The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone Etc/UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
解決思路:
- Flink集群開啟NTP服務(wù)器 時間同步
- 把服務(wù)器時區(qū)改成和數(shù)據(jù)庫一樣的時間本文為(Asia/Shanghai)
- 配置Flink sql的時區(qū)為Asia/Shanghai,具體命令如下所示:
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
注意:這是筆者遇到的問題,具體問題具體解決即可
5.具體實現(xiàn)結(jié)果
整體實現(xiàn)結(jié)果如下圖所示:
Flink 運行任務(wù)
mysql 源數(shù)據(jù)表數(shù)據(jù)
Es目標(biāo)索引已經(jīng)數(shù)據(jù)查詢圖文章來源:http://www.zghlxwxcb.cn/news/detail-539187.html
至此,筆者的Flink CDC MySQL同步Elasticsearch第一篇講解完畢,希望能幫助到搭建文章來源地址http://www.zghlxwxcb.cn/news/detail-539187.html
到了這里,關(guān)于最新版Flink CDC MySQL同步Elasticsearch(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!