一、技術(shù)流程
- 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
- 創(chuàng)建 changefeed,將 TiDB 增量數(shù)據(jù)輸出至 Kafka
- 使用 go-tpc 寫(xiě)入數(shù)據(jù)到上游 TiDB
- 使用 Kafka console consumer 觀察數(shù)據(jù)被寫(xiě)入到指定的 Topic
- (可選)配置 Flink 集群消費(fèi) Kafka 內(nèi)數(shù)據(jù)
二、搭建環(huán)境
部署包含 TiCDC 的 TiDB 集群
在實(shí)驗(yàn)或測(cè)試環(huán)境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:
tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群狀態(tài)
tiup status
三、創(chuàng)建Kafka changefeed
1.創(chuàng)建 changefeed 配置文件
根據(jù) Flink 的要求和規(guī)范,每張表的增量數(shù)據(jù)需要發(fā)送到獨(dú)立的 Topic 中,并且每個(gè)事件需要按照主鍵值分發(fā) Partition。因此,需要?jiǎng)?chuàng)建一個(gè)名為 changefeed.conf 的配置文件,填寫(xiě)如下內(nèi)容:
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]
2.創(chuàng)建一個(gè) changefeed,將增量數(shù)據(jù)輸出到 Kafka
tiup ctl:v<CLUSTER_VERSION> cdc changefeed
create --server="http://127.0.0.1:8300"
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json"
--changefeed-id="kafka-changefeed"
--config="changefeed.conf"
如果命令執(zhí)行成功,將會(huì)返回被創(chuàng)建的 changefeed 的相關(guān)信息,包含被創(chuàng)建的 changefeed 的 ID 以及相關(guān)信息,內(nèi)容如下:
Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}
如果命令長(zhǎng)時(shí)間沒(méi)有返回,你需要檢查當(dāng)前執(zhí)行命令所在服務(wù)器到 sink-uri 中指定的 Kafka 機(jī)器的網(wǎng)絡(luò)可達(dá)性,保證二者之間的網(wǎng)絡(luò)連接正常。
生產(chǎn)環(huán)境下 Kafka 集群通常有多個(gè) broker 節(jié)點(diǎn),你可以在 sink-uri 中配置多個(gè) broker 的訪問(wèn)地址,這有助于提升 changefeed 到 Kafka 集群訪問(wèn)的穩(wěn)定性,當(dāng)部分被配置的 Kafka 節(jié)點(diǎn)故障的時(shí)候,changefeed 依舊可以正常工作。假設(shè) Kafka 集群中有 3 個(gè) broker 節(jié)點(diǎn),地址分別為 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以參考如下 sink-uri 創(chuàng)建 changefeed:
tiup ctl:v<CLUSTER_VERSION> cdc changefeed create
--server="http://127.0.0.1:8300"
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576"
--config="changefeed.conf"
3.Changefeed 創(chuàng)建成功后,執(zhí)行如下命令,查看 changefeed 的狀態(tài)
tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"
四、寫(xiě)入數(shù)據(jù)以產(chǎn)生變更日志
完成以上步驟后,TiCDC 會(huì)將上游 TiDB 的增量數(shù)據(jù)變更日志發(fā)送到 Kafka,下面對(duì) TiDB 寫(xiě)入數(shù)據(jù),以產(chǎn)生增量數(shù)據(jù)變更日志。
1.模擬業(yè)務(wù)負(fù)載
在測(cè)試實(shí)驗(yàn)環(huán)境下,可以使用 go-tpc 向上游 TiDB 集群寫(xiě)入數(shù)據(jù),以讓 TiDB 產(chǎn)生事件變更數(shù)據(jù)。如下命令,首先在上游 TiDB 創(chuàng)建名為 tpcc 的數(shù)據(jù)庫(kù),然后使用 TiUP bench 寫(xiě)入數(shù)據(jù)到這個(gè)數(shù)據(jù)庫(kù)中。
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s
2.消費(fèi) Kafka Topic 中的數(shù)據(jù)
changefeed 正常運(yùn)行時(shí),會(huì)向 Kafka Topic 寫(xiě)入數(shù)據(jù),你可以通過(guò)由 Kafka 提供的 kafka-console-consumer.sh,觀測(cè)到數(shù)據(jù)成功被寫(xiě)入到 Kafka Topic 中:
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`
至此,TiDB 的增量數(shù)據(jù)變更日志就實(shí)時(shí)地復(fù)制到了 Kafka。下一步,你可以使用 Flink 消費(fèi) Kafka 數(shù)據(jù)。當(dāng)然,你也可以自行開(kāi)發(fā)適用于業(yè)務(wù)場(chǎng)景的 Kafka 消費(fèi)端。
五、配置 Flink 消費(fèi) Kafka 數(shù)據(jù)
1.安裝 Flink Kafka Connector
在 Flink 生態(tài)中,F(xiàn)link Kafka Connector 用于消費(fèi) Kafka 中的數(shù)據(jù)并輸出到 Flink 中。Flink Kafka Connector 并不是內(nèi)建的,因此在 Flink 安裝完畢后,還需要將 Flink Kafka Connector 及其依賴(lài)項(xiàng)添加到 Flink 安裝目錄中。下載下列 jar 文件至 Flink 安裝目錄下的 lib 目錄中,如果你已經(jīng)運(yùn)行了 Flink 集群,請(qǐng)重啟集群以加載新的插件。
- flink-connector-kafka-1.17.1.jar
- flink-sql-connector-kafka-1.17.1.jar
- kafka-clients-3.5.1.jar
2.創(chuàng)建一個(gè)表
可以在 Flink 的安裝目錄執(zhí)行如下命令,啟動(dòng) Flink SQL 交互式客戶端:
[root@flink flink-1.15.0]# ./bin/sql-client.sh
隨后,執(zhí)行如下語(yǔ)句創(chuàng)建一個(gè)名為 tpcc_orders 的表:
CREATE TABLE tpcc_orders (
o_id INTEGER,
o_d_id INTEGER,
o_w_id INTEGER,
o_c_id INTEGER,
o_entry_d STRING,
o_carrier_id INTEGER,
o_ol_cnt INTEGER,
o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)
請(qǐng)將 topic 和 properties.bootstrap.servers 參數(shù)替換為環(huán)境中的實(shí)際值。
3.查詢(xún)表內(nèi)容
執(zhí)行如下命令,查詢(xún) tpcc_orders 表中的數(shù)據(jù):
SELECT * FROM tpcc_orders;
執(zhí)行成功后,可以觀察到有數(shù)據(jù)輸出,如下圖文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-659426.html
至此,就完成了 TiDB 與 Flink 的數(shù)據(jù)集成。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-659426.html
到了這里,關(guān)于TiDB數(shù)據(jù)庫(kù)從入門(mén)到精通系列之六:使用 TiCDC 將 TiDB 的數(shù)據(jù)同步到 Apache Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!