DB2是IBM的一款關(guān)系型數(shù)據(jù)庫管理系統(tǒng),JDBC DB2 Source Connector是一個用于通過JDBC讀取外部數(shù)據(jù)源數(shù)據(jù)的連接器。Apache SeaTunnel如何支持JDBC DB2 Sink Connector?請參考本文檔。
支持引擎
Spark
Flink
SeaTunnel Zeta
主要功能
使用
Xa 事務(wù)
來確保精確一次性
。因此,只支持對支持Xa 事務(wù)
的數(shù)據(jù)庫進行精確一次性
操作。您可以設(shè)置is_exactly_once=true
來啟用它。
描述
通過 JDBC 寫入數(shù)據(jù)。支持批處理模式和流式模式,支持并發(fā)寫入,支持精確一次性語義(使用 XA 事務(wù)保證)。
支持的數(shù)據(jù)源信息
數(shù)據(jù)源 | 支持的版本 | 驅(qū)動程序 | URL | Maven |
---|---|---|---|---|
DB2 | 不同的依賴版本有不同的驅(qū)動程序 | com.ibm.db2.jdbc.app.DB2Driver | jdbc:db2://127.0.0.1:50000/dbname | 下載 |
數(shù)據(jù)庫依賴
請下載與 'Maven' 相對應(yīng)的支持列表,并將其復(fù)制到
'$SEATNUNNEL_HOME/plugins/jdbc/lib/'
工作目錄中
例如,對于 DB2 數(shù)據(jù)源:cp db2-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
數(shù)據(jù)類型映射
Sink 選項
名稱 | 類型 | 必填 | 默認(rèn)值 | 描述 |
---|---|---|---|---|
url | 字符串 | 是 | - | JDBC 連接的 URL。例如:jdbc:db2://127.0.0.1:50000/dbname |
driver | 字符串 | 是 | - | 用于連接到遠(yuǎn)程數(shù)據(jù)源的 JDBC 類名,如果使用 DB2,則值為 com.ibm.db2.jdbc.app.DB2Driver 。 |
user | 字符串 | 否 | - | 連接實例的用戶名 |
password | 字符串 | 否 | - | 連接實例的密碼 |
query | 字符串 | 否 | - | 使用此 SQL 將上游輸入數(shù)據(jù)寫入數(shù)據(jù)庫。例如 INSERT ... ,query 具有更高的優(yōu)先級。 |
database | 字符串 | 否 | - | 使用此 database 和 table-name 自動生成 SQL,并接收上游輸入數(shù)據(jù)寫入數(shù)據(jù)庫。此選項與 query 互斥,并具有更高的優(yōu)先級。 |
table | 字符串 | 否 | - | 使用數(shù)據(jù)庫和此表名自動生成 SQL,接收上游輸入數(shù)據(jù)寫入數(shù)據(jù)庫。此選項與 query 互斥,并具有更高的優(yōu)先級。 |
primary_keys | 數(shù)組 | 否 | - | 此選項用于支持自動生成 SQL 時的 insert 、delete 和 update 操作。 |
support_upsert_by_query_primary_key_exist | 布爾 | 否 | false | 根據(jù)查詢主鍵是否存在選擇使用 INSERT SQL、UPDATE SQL 處理更新事件(INSERT、UPDATE_AFTER)。此配置僅在數(shù)據(jù)庫不支持 upsert 語法時使用。請注意,此方法性能較低。 |
connection_check_timeout_sec | 整數(shù) | 否 | 30 | 用于等待驗證連接的數(shù)據(jù)庫操作完成的時間(以秒為單位)。 |
max_retries | 整數(shù) | 否 | 0 | 提交失?。╡xecuteBatch)的重試次數(shù)。 |
batch_size | 整數(shù) | 否 | 1000 | 用于批處理寫入,當(dāng)緩沖記錄數(shù)量達(dá)到 batch_size 或時間達(dá)到 batch_interval_ms 時,數(shù)據(jù)將刷新到數(shù)據(jù)庫。 |
batch_interval_ms | 整數(shù) | 否 | 1000 | 用于批處理寫入,當(dāng)緩沖記錄數(shù)量達(dá)到 batch_size 或時間達(dá)到 batch_interval_ms 時,數(shù)據(jù)將刷新到數(shù)據(jù)庫。 |
is_exactly_once | 布爾 | 否 | false | 是否啟用精確一次性語義,將使用 XA 事務(wù)。如果啟用,需要設(shè)置 xa_data_source_class_name 。 |
generate_sink_sql | 布爾 | 否 | false | 基于要寫入的數(shù)據(jù)庫表自動生成 SQL 語句。 |
xa_data_source_class_name | 字符串 | 否 | - | 數(shù)據(jù)庫驅(qū)動程序的 XA 數(shù)據(jù)源類名,例如,DB2 為 com.db2.cj.jdbc.Db2XADataSource 。其他數(shù)據(jù)源請參考附錄。 |
max_commit_attempts | 整數(shù) | 否 | 3 | 事務(wù)提交失敗的重試次數(shù)。 |
transaction_timeout_sec | 整數(shù) | 否 | -1 | 事務(wù)打開后的超時時間,默認(rèn)為 -1(永不超時)。請注意,設(shè)置超時可能會影響精確一次性語義。 |
auto_commit | 布爾 | 否 | true | 默認(rèn)啟用自動事務(wù)提交。 |
common-options | 否 | - | Sink 插件的通用參數(shù),請參考 Sink Common Options 獲取詳細(xì)信息。 |
提示
如果未設(shè)置
partition_column
,則將以單一并發(fā)方式運行;如果設(shè)置了partition_column
,則根據(jù)任務(wù)的并發(fā)度并行執(zhí)行。
任務(wù)示例
簡單示例:
該示例定義了一個 SeaTunnel 同步任務(wù),通過 FakeSource 自動生成數(shù)據(jù)并發(fā)送到 JDBC Sink。FakeSource 生成總共 16 行數(shù)據(jù)(row.num=16),每行有兩個字段,name(字符串類型)和 age(整數(shù)類型)。最終的目標(biāo)表是 test_table,在表中也將有 16 行數(shù)據(jù)。在運行此作業(yè)之前,您需要在您的 DB2 中創(chuàng)建數(shù)據(jù)庫 test 和表 test_table。如果您尚未安裝和部署 SeaTunnel,請按照 安裝 SeaTunnel 中的說明安裝和部署 SeaTunnel。然后按照 使用 SeaTunnel 引擎快速入門 中的說明運行此作業(yè)。
# 定義運行時環(huán)境
env {
# 您可以在這里設(shè)置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}
source {
# 這是一個示例源插件,僅用于測試和演示源插件功能
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# 如果您想要獲取更多關(guān)于如何配置 SeaTunnel 并查看完整的源插件列表的信息,
# 請訪問 https://seatunnel.apache.org/docs/category/source-v2
}
transform {
# 如果您想要獲取更多關(guān)于如何配置 SeaTunnel 并查看完整的轉(zhuǎn)換插件列表的信息,
# 請訪問 https://seatunnel.apache.org/docs/category/transform-v2
}
生成 Sink SQL
該示例不需要編寫復(fù)雜的 SQL 語句,您可以配置數(shù)據(jù)庫名稱和表名稱,以自動生成要插入的語句。
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
# 如果您想要獲取更多關(guān)于如何配置 SeaTunnel 并查看完整的接收插件列表的信息,
# 請訪問 https://seatunnel.apache.org/docs/category/sink-v2
}
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
user = "root"
password = "123456"
# 根據(jù)數(shù)據(jù)庫表名自動生成 SQL 語句
generate_sink_sql = true
database = test
table = test_table
}
}
精確一次性:
為了確保精確寫入場景,我們保證精確一次性。
sink {
jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.db2.cj.jdbc.Db2XADataSource"
}
}文章來源:http://www.zghlxwxcb.cn/news/detail-855112.html
本文由 白鯨開源 提供發(fā)布支持!文章來源地址http://www.zghlxwxcb.cn/news/detail-855112.html
到了這里,關(guān)于SeaTunnel JDBC DB2 Sink Connector支持的工作原理,快來學(xué)習(xí)吧!的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!