国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

38、Flink 的CDC 格式:canal部署以及示例

這篇具有很好參考價(jià)值的文章主要介紹了38、Flink 的CDC 格式:canal部署以及示例。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文詳細(xì)的介紹了canal的部署、2個(gè)示例以及在Flink 中通過(guò)canal將數(shù)據(jù)變化信息同步到Kafka中,然后通過(guò)Flink SQL client進(jìn)行讀取。

如果需要了解更多內(nèi)容,可以在本人Flink 專欄中了解更新系統(tǒng)的內(nèi)容。

本文除了maven依賴外,還依賴Flink 、kafka和canal環(huán)境好用。

一、Canal Format

1、canal 介紹

38、Flink 的CDC 格式:canal部署以及示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,Flink CDC,flink kafka

Canal 是一個(gè) CDC(ChangeLog Data Capture,變更日志數(shù)據(jù)捕獲)工具,可以實(shí)時(shí)地將 MySQL 變更傳輸?shù)狡渌到y(tǒng)。Canal 為變更日志提供了統(tǒng)一的數(shù)據(jù)格式,并支持使用 JSON 或 protobuf 序列化消息(Canal 默認(rèn)使用 protobuf)。

Flink 支持將 Canal 的 JSON 消息解析為 INSERT / UPDATE / DELETE 消息到 Flink SQL 系統(tǒng)中。在很多情況下,利用這個(gè)特性非常的有用。

例如

  • 將增量數(shù)據(jù)從數(shù)據(jù)庫(kù)同步到其他系統(tǒng)
  • 日志審計(jì)
  • 數(shù)據(jù)庫(kù)的實(shí)時(shí)物化視圖
  • 關(guān)聯(lián)維度數(shù)據(jù)庫(kù)的變更歷史,等等。

Flink 還支持將 Flink SQL 中的 INSERT / UPDATE / DELETE 消息編碼為 Canal 格式的 JSON 消息,輸出到 Kafka 等存儲(chǔ)中。 但需要注意的是,截至 Flink 1.17版本 還不支持將 UPDATE_BEFORE 和 UPDATE_AFTER 合并為一條 UPDATE 消息。因此,F(xiàn)link 將 UPDATE_BEFORE 和 UPDATE_AFTER 分別編碼為 DELETE 和 INSERT 類型的 Canal 消息。

未來(lái)會(huì)支持 Canal protobuf 類型消息的解析以及輸出 Canal 格式的消息。

2、binlog設(shè)置及驗(yàn)證

設(shè)置binlog需要監(jiān)控的數(shù)據(jù)庫(kù),本示例使用的數(shù)據(jù)庫(kù)是mysql5.7

1)、配置

本示例設(shè)置的參數(shù)參考下面的配置

[root@server4 ~]# cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
......

log-bin=mysql-bin  # log-bin的名稱,可以是任意名稱
binlog-format=row  # 推薦該參數(shù),其他的參數(shù)視情況而定,比如mixed、statement
server_id=1 # mysql集群環(huán)境中不要重復(fù)
binlog_do_db=test # test是mysql的數(shù)據(jù)庫(kù)名稱,如果監(jiān)控多個(gè)數(shù)據(jù)庫(kù),可以添加多個(gè)binlog_do_db即可,例如下面示例
# binlog_do_db=test2
# binlog_do_db=test3
.....

  • STATEMENT模式(SBR)
    每一條會(huì)修改數(shù)據(jù)的sql語(yǔ)句會(huì)記錄到binlog中。優(yōu)點(diǎn)是并不需要記錄每一條sql語(yǔ)句和每一行的數(shù)據(jù)變化,減少了binlog日志量,節(jié)約IO,提高性能。缺點(diǎn)是在某些情況下會(huì)導(dǎo)致master-slave中的數(shù)據(jù)不一致(如sleep()函數(shù), last_insert_id(),以及user-defined functions(udf)等會(huì)出現(xiàn)問(wèn)題)

  • ROW模式(RBR)
    不記錄每條sql語(yǔ)句的上下文信息,僅需記錄哪條數(shù)據(jù)被修改了,修改成什么樣了。而且不會(huì)出現(xiàn)某些特定情況下的存儲(chǔ)過(guò)程、或function、或trigger的調(diào)用和觸發(fā)無(wú)法被正確復(fù)制的問(wèn)題。缺點(diǎn)是會(huì)產(chǎn)生大量的日志,尤其是alter table的時(shí)候會(huì)讓日志暴漲。

  • MIXED模式(MBR)
    以上兩種模式的混合使用,一般的復(fù)制使用STATEMENT模式保存binlog,對(duì)于STATEMENT模式無(wú)法復(fù)制的操作使用ROW模式保存binlog,MySQL會(huì)根據(jù)執(zhí)行的SQL語(yǔ)句選擇日志保存方式。

2)、重啟mysql

保存配置后重啟mysql

service mysqld restart

3)、驗(yàn)證

重啟后,可以通過(guò)2個(gè)簡(jiǎn)單的方法驗(yàn)證是否設(shè)置成功。

mysql默認(rèn)的安裝目錄:cd /var/lib/mysql

[root@server4 ~]# cd /var/lib/mysql
[root@server4 mysql]# ll
......
-rw-r----- 1 mysql mysql    154 110 2022 mysql-bin.000001
-rw-r----- 1 mysql mysql       1197 116 12:21 mysql-bin.index
.....

  • 查看mysql-bin.000001文件是否生成,且其大小為154字節(jié)。mysql-bin.000001是mysql重啟的次數(shù),重啟2次則為mysql-bin.000002
  • 在test數(shù)據(jù)庫(kù)中創(chuàng)建或添加數(shù)據(jù),mysql-bin.000001的大小是否增加

以上情況滿足,則說(shuō)明binlog配置正常

3、canal部署

1)、下載

去其官網(wǎng):https://github.com/alibaba/canal/wiki下載需要的版本。
本示例使用的是:canal.deployer-1.1.7.tar.gz

2)、解壓

先創(chuàng)建需要解壓的目錄/usr/local/bigdata/canal/


tar -zvxf canal.deployer-1.1.7.tar.gz -C /usr/local/bigdata/canal/
[alanchan@server3 canal]$ ll
總用量 20
drwxr-xr-x 2 root root 4096 116 05:30 bin
drwxr-xr-x 5 root root 4096 117 00:45 conf
drwxr-xr-x 2 root root 4096 1128 08:56 lib
drwxrwxrwx 4 root root 4096 1128 09:23 logs
drwxrwxrwx 2 root root 4096 1013 06:09 plugin

4、示例1:canal CDC 輸出至控制臺(tái)

本示例是將mysql變化的數(shù)據(jù)在控制臺(tái)中顯示,做該步操作需要自行編寫代碼,也就是做canal的client。

1)、修改canal的配置

需要修改2個(gè)配置文件,即
/usr/local/bigdata/canal/conf/canal.properties

/usr/local/bigdata/canal/conf/example/instance.properties。

  • canal.properties修改
    由于本處是通過(guò)client的控制臺(tái)展示,所以需要將該配置文件中的canal.serverMode = tcp
  • instance.properties
    修改配置文件的
    canal.instance.master.address=192.168.10.44:3306 # 監(jiān)控的數(shù)據(jù)庫(kù)
    canal.instance.dbUsername=root # 訪問(wèn)該數(shù)據(jù)庫(kù)的用戶名
    canal.instance.dbPassword=123456 # 訪問(wèn)該數(shù)據(jù)庫(kù)的用戶名對(duì)應(yīng)的密碼
    canal.instance.filter.regex=.\… #該參數(shù)是監(jiān)控?cái)?shù)據(jù)庫(kù)對(duì)應(yīng)的表的監(jiān)控配置,默認(rèn)是全表

2)、啟動(dòng)canal

[root@server3 bin]$ pwd
/usr/local/bigdata/canal/bin
[root@server3 bin]$ startup.sh
......
[root@server3 ~]# jps
20330 CanalLauncher

出現(xiàn)上面的進(jìn)程名稱,說(shuō)明啟動(dòng)成功。

3)、maven依賴

<dependencies>
	<dependency>
		<groupId>com.alibaba.otter</groupId>
		<artifactId>canal.client</artifactId>
		<version>1.1.4</version>
	</dependency>
</dependencies>

4)、代碼實(shí)現(xiàn)

本處僅僅是解析binlog文件內(nèi)容,以及將解析的內(nèi)容輸出。


import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCanalDemo {

    public static void main(String[] args) {
        // 創(chuàng)建鏈接
        // 這里填寫canal所配置的服務(wù)器ip,端口號(hào),destination(在canal.properties文件里)以及服務(wù)器賬號(hào)密碼
        // ip 是 canal的服務(wù)端地址
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.43", 11111),
                "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            // connector.subscribe(".*\\..*");
            connector.subscribe("test.*"); // test 數(shù)據(jù)庫(kù)
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認(rèn)
                // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

5)、驗(yàn)證

需要 先啟動(dòng)canal服務(wù)端,再啟動(dòng)java應(yīng)用程序。
為簡(jiǎn)單起見,已經(jīng)在mysql創(chuàng)建好test數(shù)據(jù)庫(kù)和在該數(shù)據(jù)庫(kù)下創(chuàng)建的userscoressink表,其表結(jié)構(gòu)如下:


CREATE TABLE `userscoressink`  (
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `scores` float NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

應(yīng)用程序啟動(dòng)后,先刪除該表的數(shù)據(jù),然后新增數(shù)據(jù)和修改數(shù)據(jù)。
控制臺(tái)輸出如下

empty count : 1
empty count : 2
================&gt; binlog[mysql-bin.000063:6811] , name[test,userscoressink] , eventType : DELETE
name : alanchan    update=false
scores : 10.0    update=false
================&gt; binlog[mysql-bin.000063:7090] , name[test,userscoressink] , eventType : DELETE
name : alan    update=false
scores : 20.0    update=false
name : alanchan    update=true
scores : 20.0    update=true
empty count : 1
empty count : 2
================&gt; binlog[mysql-bin.000063:8477] , name[test,userscoressink] , eventType : INSERT
name : alanchanchn    update=true
scores : 30.0    update=true
empty count : 1
================&gt; binlog[mysql-bin.000063:8759] , name[test,userscoressink] , eventType : UPDATE
-------&gt; before
name : alanchanchn    update=false
scores : 30.0    update=false
-------&gt; after
name : alanchanchn    update=false
scores : 80.0    update=true
empty count : 1
empty count : 2
empty count : 3

至此,已經(jīng)完成了canal控制臺(tái)的輸出驗(yàn)證。

5、示例2:canal CDC 輸出值kafka

該步驟需要已經(jīng)安裝好kafka的環(huán)境。

1)、修改canal配置

需要修改2個(gè)配置文件,即
/usr/local/bigdata/canal/conf/canal.properties

/usr/local/bigdata/canal/conf/example/instance.properties。

  • canal.properties修改
    由于本處是通過(guò)client的控制臺(tái)展示,所以需要將該配置文件中的
    canal.serverMode = kafka
    kafka.bootstrap.servers = 192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092
    其他的使用默認(rèn)即可,如果需要的話,根據(jù)自己的環(huán)境進(jìn)行修改。
  • instance.properties
    修改配置文件的
    canal.instance.master.address=192.168.10.44:3306 # 監(jiān)控的數(shù)據(jù)庫(kù)
    canal.instance.dbUsername=root # 訪問(wèn)該數(shù)據(jù)庫(kù)的用戶名
    canal.instance.dbPassword=123456 # 訪問(wèn)該數(shù)據(jù)庫(kù)的用戶名對(duì)應(yīng)的密碼
    canal.instance.filter.regex=.\… #該參數(shù)是監(jiān)控?cái)?shù)據(jù)庫(kù)對(duì)應(yīng)的表的監(jiān)控配置,默認(rèn)是全表
    canal.mq.topic=alan_canal_to_kafka_topic # kafka接收數(shù)據(jù)的主題
    canal.mq.partition=0 # kafka主題對(duì)應(yīng)的分區(qū)

2)、啟動(dòng)canal

如果之前已經(jīng)啟動(dòng)了canal,則需要先stop。


[root@server3 bin]$ pwd
/usr/local/bigdata/canal/bin
[root@server3 bin]$ startup.sh
......
[root@server3 ~]# jps
20330 CanalLauncher

3)、驗(yàn)證

需要 先啟動(dòng)canal服務(wù)端,再啟動(dòng)java應(yīng)用程序。
為簡(jiǎn)單起見,已經(jīng)在mysql創(chuàng)建好test數(shù)據(jù)庫(kù)和在該數(shù)據(jù)庫(kù)下創(chuàng)建的userscoressink表,其表結(jié)構(gòu)如下:


CREATE TABLE `userscoressink`  (
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `scores` float NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

應(yīng)用程序啟動(dòng)后,先刪除該表的數(shù)據(jù),然后新增數(shù)據(jù)和修改數(shù)據(jù)。

  • 啟動(dòng)kafka命令行消費(fèi)模式
kafka-console-consumer.sh --bootstrap-server server1:9092 --topic alan_canal_to_kafka_topic --from-beginning
  • 在mysql中操作表, 觀察kafka輸出結(jié)果
 {
	"data": [{
		"name": "alanchanchn",
		"scores": "30.0"
	}],
	"database": "test",
	"es": 1705385155000,
	"gtid": "",
	"id": 5,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": [{
		"name": "alan"
	}],
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705385629948,
	"type": "UPDATE"
} {
	"data": [{
		"name": "alan_chan",
		"scores": "40.0"
	}],
	"database": "test",
	"es": 1705385193000,
	"gtid": "",
	"id": 6,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": null,
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705385668291,
	"type": "INSERT"
} {
	"data": [{
		"name": "alan_chan",
		"scores": "40.0"
	}],
	"database": "test",
	"es": 1705385489000,
	"gtid": "",
	"id": 7,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": null,
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705385963893,
	"type": "DELETE"
} {
	"data": [{
		"name": "alan_chan",
		"scores": "80.0"
	}],
	"database": "test",
	"es": 1705385976000,
	"gtid": "",
	"id": 8,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": null,
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705386450899,
	"type": "INSERT"
} {
	"data": [{
		"name": "alan_chan",
		"scores": "80.0"
	}],
	"database": "test",
	"es": 1705386778000,
	"gtid": "",
	"id": 10,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": null,
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705387252955,
	"type": "DELETE"
} {
	"data": [{
		"name": "alan1",
		"scores": "100.0"
	}],
	"database": "test",
	"es": 1705387290000,
	"gtid": "",
	"id": 14,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": null,
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705387765290,
	"type": "INSERT"
} 

以上,完成了通過(guò)canal監(jiān)控mysql的數(shù)據(jù)變化同步到kafka中。

二、Flink 與 canal 實(shí)踐

為了使用Canal格式,使用構(gòu)建自動(dòng)化工具(如Maven或SBT)的項(xiàng)目和帶有SQL JAR包的SQLClient都需要以下依賴項(xiàng)。

1、maven依賴

該依賴在flink自建工程中已經(jīng)包含。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.17.1</version>
</dependency>

有關(guān)如何部署 Canal 以將變更日志同步到消息隊(duì)列,請(qǐng)參閱上文的具體事例或想了解更多的信息參考 Canal 文檔。

2、Flink sql client 建表示例

Canal 為變更日志提供了統(tǒng)一的格式,下面是一個(gè)從 MySQL 庫(kù) userscoressink表中捕獲更新操作的簡(jiǎn)單示例:

{
	"data": [{
		"name": "alanchanchn",
		"scores": "30.0"
	}],
	"database": "test",
	"es": 1705385155000,
	"gtid": "",
	"id": 5,
	"isDdl": false,
	"mysqlType": {
		"name": "varchar(255)",
		"scores": "float"
	},
	"old": [{
		"name": "alan"
	}],
	"pkNames": null,
	"sql": "",
	"sqlType": {
		"name": 12,
		"scores": 7
	},
	"table": "userscoressink",
	"ts": 1705385629948,
	"type": "UPDATE"
}


有關(guān)各個(gè)字段的含義,請(qǐng)參閱 Canal 文檔

MySQL userscoressink表有2列(name,scores)。上面的 JSON 消息是 userscoressink表上的一個(gè)更新事件,表示 id = 5 的行數(shù)據(jù)上name 字段值從alan變更成為alanchanchn。

消息已經(jīng)同步到了一個(gè) Kafka 主題:alan_mysql_bycanal_to_kafka_topic2,那么就可以使用以下DDL來(lái)從這個(gè)主題消費(fèi)消息并解析變更事件。

具體啟動(dòng)canal參考本文的第一部分的kafka示例,其他不再贅述。下面的部分僅僅是演示canal環(huán)境都正常后,在Flink SQL client中的操作。

-- 元數(shù)據(jù)與 MySQL "userscoressink" 表完全相同
CREATE TABLE userscoressink (
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'alan_mysql_bycanal_to_kafka_topic2',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'canal-json' -- 使用 canal-json 格式
);

將 Kafka 主題注冊(cè)成 Flink 表之后,就可以將 Canal 消息用作變更日志源。

-- 驗(yàn)證,在mysql中新增、修改和刪除數(shù)據(jù),觀察flink sql client 的數(shù)據(jù)變化
Flink SQL> CREATE TABLE userscoressink (
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'alan_mysql_bycanal_to_kafka_topic2',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'canal-json'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from userscoressink;
+----+--------------------------------+--------------------------------+
| op |                           name |                         scores |
+----+--------------------------------+--------------------------------+
| +I |                           name |                          100.0 |
| +I |                           alan |                           80.0 |
| +I |                       alanchan |                          120.0 |
| -U |                       alanchan |                          120.0 |
| +U |                       alanchan |                          100.0 |
| -D |                           name |                          100.0 |


-- 關(guān)于MySQL "userscoressink" 表的實(shí)時(shí)物化視圖
-- 按name分組,對(duì)scores進(jìn)行求和

Flink SQL> select name,sum(scores) from userscoressink group by name;
+----+--------------------------------+--------------------------------+
| op |                           name |                         EXPR$1 |
+----+--------------------------------+--------------------------------+
| +I |                           name |                          100.0 |
| +I |                           alan |                           80.0 |
| +I |                       alanchan |                          120.0 |
| -D |                       alanchan |                          120.0 |
| +I |                       alanchan |                          100.0 |
| -D |                           name |                          100.0 |


3、Available Metadata

以下格式元數(shù)據(jù)可以在表定義中公開為只讀(VIRTUAL)列。
只有當(dāng)相應(yīng)的連接器轉(zhuǎn)發(fā)格式元數(shù)據(jù)時(shí),注意格式元數(shù)據(jù)字段才可用。

截至版本1.17,只有Kafka連接器能夠公開其值格式的元數(shù)據(jù)字段。
38、Flink 的CDC 格式:canal部署以及示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,Flink CDC,flink kafka
以下示例顯示了如何訪問(wèn)Kafka中的Canal元數(shù)據(jù)字段:

---- 建表sql
CREATE TABLE userscoressink_meta (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  name STRING,
  scores FLOAT
) WITH (
 'connector' = 'kafka',
 'topic' = 'alan_mysql_bycanal_to_kafka_topic2',
 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'canal-json' 
);

---- 驗(yàn)證
Flink SQL> CREATE TABLE userscoressink_meta (
>   origin_database STRING METADATA FROM 'value.database' VIRTUAL,
>   origin_table STRING METADATA FROM 'value.table' VIRTUAL,
>   origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
>   origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
>   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
>   name STRING,
>   scores FLOAT
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'alan_mysql_bycanal_to_kafka_topic2',
>  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'canal-json' 
> );
[INFO] Execute statement succeed.

Flink SQL> show tables;
+---------------------+
|          table name |
+---------------------+
|      userscoressink |
| userscoressink_meta |
+---------------------+
2 rows in set

Flink SQL> select * from userscoressink_meta;
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+
| op |                origin_database |                   origin_table |                origin_sql_type |                origin_pk_names |               origin_ts |                           name |                         scores |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+
| +I |                        cdctest |                 userscoressink |            {name=12, scores=7} |                         (NULL) | 2024-01-19 04:56:28.144 |                           name |                          100.0 |
| +I |                        cdctest |                 userscoressink |            {name=12, scores=7} |                         (NULL) | 2024-01-19 05:09:45.610 |                           alan |                           80.0 |
| +I |                        cdctest |                 userscoressink |            {name=12, scores=7} |                         (NULL) | 2024-01-19 05:09:55.529 |                       alanchan |                          120.0 |
| -U |                        cdctest |                 userscoressink |            {name=12, scores=7} |                         (NULL) | 2024-01-19 05:10:12.051 |                       alanchan |                          120.0 |
| +U |                        cdctest |                 userscoressink |            {name=12, scores=7} |                         (NULL) | 2024-01-19 05:10:12.051 |                       alanchan |                          100.0 |
| -D |                        cdctest |                 userscoressink |            {name=12, scores=7} |                         (NULL) | 2024-01-19 05:10:21.966 |                           name |                          100.0 |


4、Format 參數(shù)

38、Flink 的CDC 格式:canal部署以及示例,# Flink專欄,flink,大數(shù)據(jù),kafka,flink hive,flink sql,Flink CDC,flink kafka

5、重要事項(xiàng):重復(fù)的變更事件

在正常的操作環(huán)境下,Canal 應(yīng)用能以 exactly-once 的語(yǔ)義投遞每條變更事件。在這種情況下,F(xiàn)link 消費(fèi) Canal 產(chǎn)生的變更事件能夠工作得很好。 然而,當(dāng)有故障發(fā)生時(shí),Canal 應(yīng)用只能保證 at-least-once 的投遞語(yǔ)義。 這也意味著,在非正常情況下,Canal 可能會(huì)投遞重復(fù)的變更事件到消息隊(duì)列中,當(dāng) Flink 從消息隊(duì)列中消費(fèi)的時(shí)候就會(huì)得到重復(fù)的事件。 這可能會(huì)導(dǎo)致 Flink query 的運(yùn)行得到錯(cuò)誤的結(jié)果或者非預(yù)期的異常。因此,建議在這種情況下,將作業(yè)參數(shù) table.exec.source.cdc-events-duplicate 設(shè)置成 true,并在該 source 上定義 PRIMARY KEY。 框架會(huì)生成一個(gè)額外的有狀態(tài)算子,使用該 primary key 來(lái)對(duì)變更事件去重并生成一個(gè)規(guī)范化的 changelog 流。

6、數(shù)據(jù)類型映射

目前,Canal Format 使用 JSON Format 進(jìn)行序列化和反序列化。 有關(guān)數(shù)據(jù)類型映射的更多詳細(xì)信息,請(qǐng)參閱 JSON Format 文檔。

以上,本文詳細(xì)的介紹了canal的部署、2個(gè)示例以及在Flink 中通過(guò)canal將數(shù)據(jù)變化信息同步到Kafka中,然后通過(guò)Flink SQL client進(jìn)行讀取。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-817719.html

到了這里,關(guān)于38、Flink 的CDC 格式:canal部署以及示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 業(yè)務(wù)數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    業(yè)務(wù)數(shù)據(jù)同步工具介紹和使用(Sqoop、Datax、Canal、MaxWell、Flink CDC)

    介紹 Sqoop : SQ L-to-Had oop ( Apache已經(jīng)終止Sqoop項(xiàng)目 ) 用途:把關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)轉(zhuǎn)移到HDFS(Hive、Hbase)(重點(diǎn)使用的場(chǎng)景);Hadoop中的數(shù)據(jù)轉(zhuǎn)移到關(guān)系型數(shù)據(jù)庫(kù)中。Sqoop是java語(yǔ)言開發(fā)的,底層使用 mapreduce 。 需要注意的是,Sqoop主要使用的是Map,是數(shù)據(jù)塊的轉(zhuǎn)移,沒有使

    2024年02月15日
    瀏覽(44)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月12日
    瀏覽(21)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(23)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(27)
  • 什么是Flink CDC,以及如何使用

    什么是Flink CDC,以及如何使用

    數(shù)據(jù)庫(kù)中的CDC(Change Data Capture,變更數(shù)據(jù)捕獲)是一種用于實(shí)時(shí)跟蹤數(shù)據(jù)庫(kù)中數(shù)據(jù)變化的技術(shù)。CDC的主要目的是在數(shù)據(jù)庫(kù)中捕獲增量數(shù)據(jù),以便在需要時(shí)可以輕松地將這些數(shù)據(jù)合并到其他系統(tǒng)或應(yīng)用程序中。CDC在數(shù)據(jù)庫(kù)管理、數(shù)據(jù)同步、數(shù)據(jù)集成和數(shù)據(jù)備份等方面具有廣泛的

    2024年02月11日
    瀏覽(18)
  • Flink cdc同步mysql到starrocks(日期時(shí)間格式/時(shí)區(qū)處理)

    flink 1.15.3(此時(shí)最新版本為1.16.1) mysql 5.7+ starrocks 2.5.2 mysql同步表結(jié)構(gòu) mysql中的timestamp字段是可以正常同步的,但是多了8小時(shí),設(shè)置了mysql鏈接屬性也沒效果 參考下方的鏈接有兩種方式; 參考資料 https://blog.csdn.net/cloudbigdata/article/details/122935333 https://blog.csdn.net/WuBoooo/article/deta

    2024年02月16日
    瀏覽(27)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(26)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(27)
  • 60、Flink CDC 入門介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫(kù)數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    60、Flink CDC 入門介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫(kù)數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月19日
    瀏覽(21)
  • ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實(shí)踐

    ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實(shí)踐

    ApacheStreamPark是流處理極速開發(fā)框架,流批一體 湖倉(cāng)一體的云原生平臺(tái),一站式流處理計(jì)算平臺(tái)。 ??特性中的簡(jiǎn)單易用和文檔詳盡這兩點(diǎn)我也是深有體會(huì)的,部署一點(diǎn)都不簡(jiǎn)單,照著官方文檔都不一定能搞出來(lái),下面部署環(huán)節(jié)慢慢來(lái)吐槽吧。 ??之前我們寫 Flink SQL 基本上

    2024年02月11日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包