一、概述
Debezium 信號機制提供了一種修改連接器行為或觸發(fā)一次性操作(例如啟動表的臨時快照)的方法。要使用信號觸發(fā)連接器執(zhí)行指定操作,可以將連接器配置為使用以下一個或多個通道:
- 源信號通道:可以發(fā)出 SQL 命令將信號消息添加到專門的信令數(shù)據(jù)集合中。在源數(shù)據(jù)庫上創(chuàng)建的信令數(shù)據(jù)集合專門用于與 Debezium 進行通信。
- Kafka信號通道;將信號消息提交到可配置的 Kafka 主題。
- Jmx信號通道:通過 JMX 信號操作提交信號。
- 文件信號通道:可以使用文件來發(fā)送信號。
- Custom:將信號提交到實施的自定義通道。當 Debezium 檢測到新的日志記錄或臨時快照記錄添加到通道時,它會讀取信號并啟動請求的操作。
信號傳輸可與以下 Debezium 連接器一起使用:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
可以通過設置 signal.enabled.channels 配置屬性來指定啟用哪個通道。該屬性列出了已啟用的通道的名稱。默認情況下,Debezium 提供以下渠道:source 和 kafka。源通道默認啟用,因為增量快照信號需要它。
二、激活源信號通道
默認情況下,Debezium 源信令通道已啟用。
必須為要使用它的每個連接器顯式配置信令。
程序:
- 在源數(shù)據(jù)庫上,創(chuàng)建信令數(shù)據(jù)收集表,用于向連接器發(fā)送信號。
- 對于實現(xiàn)本機變更數(shù)據(jù)捕獲 (CDC) 機制的源數(shù)據(jù)庫(例如 Db2 或 SQL Server),為信令表啟用 CDC。
- 將信令數(shù)據(jù)集合的名稱添加到 Debezium 連接器配置中。在連接器配置中,添加屬性 signal.data.collection,并將其值設置為您在步驟 1 中創(chuàng)建的信令數(shù)據(jù)集合的完全限定名稱。
例如,signal.data.collection = inventory.debezium_signals。
信令集合的完全限定名稱的格式取決于連接器。
以下示例顯示了每個連接器使用的命名格式:
- Db2:.
- MongoDB:.
- MySQL:.
- Oracle:..
- PostgreSQL:.
- SQL Server:..
三、信令數(shù)據(jù)集合的結構
信令數(shù)據(jù)集合或信令表存儲您發(fā)送到連接器以觸發(fā)指定操作的信號。信令表的結構必須符合以下標準格式。
- 包含三個字段(列)。
- 字段按特定順序排列,如表 1 所示。
表 1. 信令數(shù)據(jù)集合所需的結構
字段 | 類型 | 描述 |
---|---|---|
id(required) | string | 標識信號實例的任意唯一字符串。為提交到信令表的每個信號分配一個 ID。通常,ID 是 UUID 字符串??梢允褂眯盘枌嵗M行日志記錄、調(diào)試或重復數(shù)據(jù)刪除。當信號觸發(fā) Debezium 執(zhí)行增量快照時,它會生成帶有任意 id 字符串的信號消息。生成的消息包含的 id 字符串與提交信號中的 id 字符串無關。 |
type(required) | string | 指定要發(fā)送的信號類型??梢詫⒛承┬盘栴愋团c任何可提供信號傳輸?shù)倪B接器一起使用,而其他信號類型僅可用于特定的連接器。 |
data(optional) | string | 指定要傳遞給信號操作的 JSON 格式的參數(shù)。每種信號類型都需要一組特定的數(shù)據(jù)。 |
數(shù)據(jù)集合中的字段名稱是任意的。上表提供了建議的名稱。如果使用不同的命名約定,請確保每個字段中的值與預期內(nèi)容一致。
四、創(chuàng)建信令數(shù)據(jù)集合
可以通過向源數(shù)據(jù)庫提交標準 SQL DDL 查詢來創(chuàng)建信令表。
先決條件:
- 有足夠的訪問權限在源數(shù)據(jù)庫上創(chuàng)建表。
程序:
- 向源數(shù)據(jù)庫提交SQL查詢,創(chuàng)建符合所需結構的表,如下例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
注意:
分配給 id 變量的 VARCHAR 參數(shù)的空間量必須足以容納發(fā)送到信令表的信號 ID 字符串的大小。如果 ID 的大小超出可用空間,連接器將無法處理信號。
以下示例顯示了創(chuàng)建三列 debezium_signal 表的 CREATE TABLE 命令:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
五、激活kafka信號通道
可以通過將 Kafka 信令通道添加到 signal.enabled.channels 配置屬性,然后將接收信號的主題名稱添加到 signal.kafka.topic 屬性來啟用 Kafka 信令通道。啟用信令通道后,將創(chuàng)建 Kafka 消費者來消費發(fā)送到配置的信號主題的信號。
可供消費者使用的附加配置:
- Db2 connector Kafka signal configuration properties
- MongoDB connector Kafka signal configuration properties
- MySQL connector Kafka signal configuration properties
- Oracle connector Kafka signal configuration properties
- PostgreSQL connector Kafka signal configuration properties
- SQL Server connector Kafka signal configuration properties
注意:
- 要使用 Kafka 信令觸發(fā)大多數(shù)連接器的臨時增量快照,必須首先在連接器配置中啟用源信令通道。
- 源通道實現(xiàn)了水印機制,以對可能由增量快照捕獲并在流恢復后再次捕獲的事件進行重復數(shù)據(jù)刪除。
- 使用信令通道觸發(fā)啟用GTID的只讀MySQL數(shù)據(jù)庫的增量快照時,不需要啟用源通道。
六、數(shù)據(jù)格式
Kafka 消息的鍵必須與 topic.prefix 連接器配置選項的值匹配。
該值是具有類型和數(shù)據(jù)字段的 JSON 對象。
當信號類型設置為執(zhí)行快照時,數(shù)據(jù)字段必須包括下表中列出的字段:
表 2. 執(zhí)行快照數(shù)據(jù)字段
字段 | 默認值 | 值 |
---|---|---|
type | incremental | 要運行的快照的類型。目前 Debezium 支持增量和阻塞類型。 |
data-collections | N/A | 一組以逗號分隔的正則表達式,與要包含在快照中的數(shù)據(jù)集合的完全限定名稱相匹配。使用與 signal.data.collection 配置選項所需的格式相同的格式指定名稱。 |
additional-condition | N/A | 一個可選字符串,指定連接器評估的條件,以指定要包含在快照中的記錄子集。注意:此屬性已棄用,應由附加條件屬性替換。 |
additional-conditions | N/A | 一個可選數(shù)組,指定連接器評估的一組附加條件,以確定要包含在快照中的記錄子集。每個附加條件都是一個對象,指定過濾臨時快照捕獲的數(shù)據(jù)的條件。您可以為每個附加條件設置以下屬性:數(shù)據(jù)采集:過濾器應用到的 {data-collection} 的完全限定名稱。您可以對每個{data-collection}應用不同的過濾器。過濾:指定數(shù)據(jù)庫記錄中必須存在的列值,快照才能包含該列值,例如“color=‘blue’”。快照進程根據(jù)過濾器值評估 {data-collection} 中的記錄,并僅捕獲包含匹配值的記錄。分配給過濾器屬性的具體值取決于臨時快照的類型:對于增量快照,可以指定一個搜索條件片段,例如“color=‘blue’”,快照會將其附加到查詢的條件子句中。對于阻塞快照,可以指定完整的 SELECT 語句,例如您可以在 snapshot.select.statement.overrides 屬性中設置的語句。 |
以下示例顯示了典型的執(zhí)行快照 Kafka 消息:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
七、激活JMX信號通道
可以通過將 jmx 添加到連接器配置中的 signal.enabled.channels 屬性來啟用 JMX 信號,然后啟用 JMX MBean 服務器來公開信號 Bean。
程序
-
使用首選的 JMX 客戶端(例如 JConsole 或 JDK Mission Control)連接到 MBean 服務器。
-
搜索 Mbean debezium.<連接器類型>.management.signals.<服務器>。 Mbean 公開接受以下輸入?yún)?shù)的信號操作:
- p0:信號的 ID。
- p1:信號的類型,例如執(zhí)行快照。
- p2:包含有關指定信號類型的附加信息的 JSON 數(shù)據(jù)字段。
-
通過提供輸入?yún)?shù)的值來發(fā)送執(zhí)行快照信號。
在 JSON 數(shù)據(jù)字段中,包含下表中列出的信息:
表 2. 執(zhí)行快照數(shù)據(jù)字段
字段 | 默認值 | 值 |
---|---|---|
type | incremental | 要運行的快照的類型。目前 Debezium 支持增量和阻塞類型。 |
data-collections | N/A | 一組以逗號分隔的正則表達式,與要包含在快照中的數(shù)據(jù)集合的完全限定名稱相匹配。使用與 signal.data.collection 配置選項所需的格式相同的格式指定名稱。 |
additional-condition | N/A | 一個可選字符串,指定連接器評估的條件,以指定要包含在快照中的記錄子集。注意:此屬性已棄用,應由附加條件屬性替換。 |
additional-conditions | N/A | 一個可選數(shù)組,指定連接器評估的一組附加條件,以確定要包含在快照中的記錄子集。每個附加條件都是一個對象,指定過濾臨時快照捕獲的數(shù)據(jù)的條件。您可以為每個附加條件設置以下屬性:數(shù)據(jù)采集:過濾器應用到的 {data-collection} 的完全限定名稱。您可以對每個{data-collection}應用不同的過濾器。過濾:指定數(shù)據(jù)庫記錄中必須存在的列值,快照才能包含該列值,例如“color=‘blue’”。快照進程根據(jù)過濾器值評估 {data-collection} 中的記錄,并僅捕獲包含匹配值的記錄。分配給過濾器屬性的具體值取決于臨時快照的類型:對于增量快照,可以指定一個搜索條件片段,例如“color=‘blue’”,快照會將其附加到查詢的條件子句中。對于阻塞快照,可以指定完整的 SELECT 語句,例如可以在 snapshot.select.statement.overrides 屬性中設置的語句。 |
下圖顯示了如何使用 JConsole 發(fā)送信號的示例:
八、自定義信令通道
信令機制被設計為可擴展的??梢愿鶕?jù)需要實施通道,以最適合您環(huán)境的方式向 Debezium 發(fā)送信號。
添加信令通道涉及幾個步驟:
- 為通道創(chuàng)建一個Java項目來實現(xiàn)通道,并添加Debezium Core作為依賴項。
- 部署自定義信令通道。
- 通過修改連接器配置,使連接器能夠使用自定義信令通道。
提供自定義信令通道
自定義信號通道是實現(xiàn) io.debezium.pipeline.signal.channels.SignalChannelReader 服務提供者接口 (SPI) 的 Java 類。例如:
public interface SignalChannelReader {
String name();
void init(CommonConnectorConfig connectorConfig);
List<SignalRecord> read();
void close();
}
- 讀者姓名。要使 Debezium 能夠使用通道,請在連接器的 signal.enabled.channels 屬性中指定此名稱。
- 初始化通道所需的特定配置、變量或連接。
- 從通道讀取信號。 SignalProcessor 類調(diào)用此方法來檢索要處理的信號。
- 關閉所有分配的資源。 Debezium 在連接器停止時調(diào)用此方法。
九、Debezium 核心模塊依賴項
自定義信令通道 Java 項目具有對 Debezium 核心模塊的編譯依賴項。必須將這些編譯依賴項包含在項目的 pom.xml 文件中,如以下示例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
- ${version.debezium} 表示 Debezium 連接器的版本。
- 在 META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 中聲明實現(xiàn)
十、部署自定義信令通道
先決條件
- 有一個自定義信令通道 Java 程序。
程序
- 要將自定義信號通道與 Debezium 連接器結合使用,請將 Java 項目導出到 JAR 文件,然后將該文件復制到包含要與其一起使用的每個 Debezium 連接器的 JAR 文件的目錄。
- 例如,在典型部署中,Debezium 連接器文件存儲在 Kafka Connect 目錄 (/kafka/connect) 的子目錄中,每個連接器 JAR 位于其自己的子目錄中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。
注意:
- 要將自定義信號通道與多個連接器一起使用,必須將自定義信號通道 JAR 文件的副本放置在每個連接器的子目錄中。
配置連接器以使用自定義信號通道
- 將自定義信令通道的名稱添加到 signal.enabled.channels 配置屬性中。
十一、信號動作
可以使用信令來發(fā)起以下操作:
- 將消息添加到日志中。
- 觸發(fā)臨時增量快照。
- 停止執(zhí)行臨時快照。
- 暫停增量快照。
- 恢復增量快照。
- 觸發(fā)臨時阻塞快照。
- 自定義動作。
有些信號并不與所有連接器兼容。
十二、記錄信號
可以通過創(chuàng)建具有日志信號類型的信令表條目來請求連接器將條目添加到日志中。處理信號后,連接器將指定的消息打印到日志中?;蛘?,可以配置信號,以便生成的消息包含流坐標。
表 4. 用于添加日志消息的信令記錄示例
字段 | 值 | 描述 |
---|---|---|
id | 924e3ff8-2245-43ca-ba77-2af9af02fa07 | |
type | log | 信號的動作類型。 |
data | {“message”: “Signal message at offset {}”} | message 參數(shù)指定要打印到日志的字符串。 |
如果您向消息添加占位符 ({}),它將被替換為流坐標。 |
十三、即席快照信號
可以通過創(chuàng)建具有執(zhí)行快照信號類型的信號來請求連接器啟動臨時快照。處理信號后,連接器運行請求的快照操作。
與連接器首次啟動后運行的初始快照不同,臨時快照是在連接器已經(jīng)開始從數(shù)據(jù)庫傳輸更改事件之后在運行時期間發(fā)生的。可以隨時啟動臨時快照。
臨時快照可用于以下 Debezium 連接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
表 5. 臨時快照信號記錄示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | execute-snapshot |
data | {“data-collections”: [“public.MyFirstTable”, “public.MySecondTable”]} |
表 6. 即席快照信號消息示例
鍵 | 值 |
---|---|
test_connector | {“type”:“execute-snapshot”,“data”: {“data-collections”: [“public.MyFirstTable”], “type”: “INCREMENTAL”, “additional-conditions”:[{“data-collection”: “public.MyFirstTable”, “filter”:“color=‘blue’ AND brand=‘MyBrand’”]}} |
其他資源
- Db2 連接器增量快照
- MongoDB 連接器增量快照
- MySQL 連接器增量快照
- Oracle 連接器增量快照
- PostgreSQL 連接器增量快照
- SQL Server 連接器增量快照
十四、特別快照停止信號
可以通過創(chuàng)建具有停止快照信號類型的信號表條目來請求連接器停止正在進行的臨時快照。處理完信號后,連接器將停止當前正在進行的快照操作。
表 7. 停止臨時快照信號記錄示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | stop-snapshot |
data | {“type”:“INCREMENTAL”, “data-collections”: [“public.MyFirstTable”]} |
必須指定信號的類型。數(shù)據(jù)收集字段是可選的。將數(shù)據(jù)收集字段留空以請求連接器停止當前快照中的所有活動。如果希望繼續(xù)執(zhí)行增量快照,但希望從快照中排除特定集合,請?zhí)峁┮懦募匣蛘齽t表達式的名稱的逗號分隔列表。連接器處理信號后,增量快照將繼續(xù),但它會排除指定的集合中的數(shù)據(jù)。
十五、增量快照
增量快照是一種特定類型的臨時快照。在增量快照中,連接器捕獲您指定的表的基線狀態(tài),類似于初始快照。但是,與初始快照不同,增量快照以塊的形式捕獲表,而不是一次捕獲所有表。連接器使用水印方法來跟蹤快照的進度。
通過以塊的形式而不是在單個整體操作中捕獲指定表的初始狀態(tài),增量快照比初始快照過程具有以下優(yōu)勢:
- 當連接器捕獲指定表的基線狀態(tài)時,來自事務日志的近實時事件流將繼續(xù)不間斷。
- 如果增量快照過程中斷,可以從停止點恢復。
- 可以隨時啟動增量快照。
十六、增量快照暫停信號
可以通過創(chuàng)建具有暫??煺招盘栴愋偷男盘柋項l目來請求連接器暫停正在進行的增量快照。處理完信號后,連接器將停止暫停當前正在進行的快照操作。因此,無法指定數(shù)據(jù)收集,因為快照處理將暫停在處理信號時的位置。
表 8. 暫停增量快照信號記錄示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | pause-snapshot |
必須指定信號的類型。數(shù)據(jù)字段被忽略。
十七、增量快照恢復信號
可以通過創(chuàng)建具有恢復快照信號類型的信號表條目來請求連接器恢復暫停的增量快照。處理信號后,連接器將恢復之前暫停的快照操作。
表 9. 恢復增量快照信號記錄示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | resume-snapshot |
十八、阻止快照信號
可以通過創(chuàng)建具有執(zhí)行快照信號類型和具有值阻塞的 data.type 的信號來請求連接器啟動臨時阻塞快照。處理信號后,連接器運行請求的快照操作。
與連接器首次啟動后運行的初始快照不同,臨時阻塞快照在連接器停止從數(shù)據(jù)庫傳輸更改事件后在運行時發(fā)生。您可以隨時啟動臨時阻止快照。
表 10. 阻塞快照信號記錄示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | execute-snapshot |
data | {“type”: “blocking”, “data-collections”: [“schema1.table1”, “schema1.table2”], “additional-conditions”: [{“data-collection”: “schema1.table1”, “filter”: “SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC”}, {“data-collection”: “schema1.table2”, “filter”: “SELECT * FROM [schema1].[table2] WHERE column2 > 0”}]} |
表 11. 阻塞快照信號消息示例文章來源:http://www.zghlxwxcb.cn/news/detail-771907.html
鍵 | 值 |
---|---|
test_connector | {“type”:“execute-snapshot”,“data”: {“type”: “blocking”} |
十九、應用案例
- Debezium系列之:實現(xiàn)增量快照incremental技術的詳細步驟
- Debezium系列之:基于數(shù)據(jù)庫信號表和Kafka信號Topic兩種技術方案實現(xiàn)增量快照incremental技術的詳細步驟
- Debezium系列之:深入理解臨時阻塞快照
更多Debezium實戰(zhàn)應用可以參考博主Debezium專欄:文章來源地址http://www.zghlxwxcb.cn/news/detail-771907.html
- Debezium專欄,Debezium實戰(zhàn)應用詳細總結
到了這里,關于Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!