flink cdc 連接posgresql 數(shù)據(jù)庫
01 、flink posgresql cdc
前置工作
1,更改配置文件postgresql.conf
# 更改wal日志方式為logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個slots
max_replication_slots = 20 # max number of replication slots
# 更改wal發(fā)送最大進程數(shù)(默認(rèn)值為10),這個值和上面的solts設(shè)置一樣
max_wal_senders = 20 # max number of walsender processes
# 中斷那些停止活動超過指定毫秒數(shù)的復(fù)制連接,可以適當(dāng)設(shè)置大一點(默認(rèn)60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
wal_level是必須更改的,其它參數(shù)選著性更改,如果同步表數(shù)量超過10張建議修改為合適的值
更改配置文件postgresql.conf完成,需要重啟pg服務(wù)生效,所以一般是在業(yè)務(wù)低峰期更改
2,新建用戶并且給用戶復(fù)制流權(quán)限
-- pg新建用戶
CREATE USER user WITH PASSWORD 'pwd';
-- 給用戶復(fù)制流權(quán)限
ALTER ROLE user replication;
-- 給用戶登錄數(shù)據(jù)庫權(quán)限
grant CONNECT ON DATABASE test to user;
-- 把當(dāng)前庫public下所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
3,發(fā)布表
-- 設(shè)置發(fā)布為true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;
DataStream Api
1: maveny依賴引入
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.2</version>
</dependency>
2.postgresqlCDC2Kafka.java代碼
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class postgresqlCDC2Kafka {
public static void main(String[] args) throws Exception {
String fileName = args[0];
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileName);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.enableCheckpointing(5000L);
//指定 CK 的一致性語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//設(shè)置任務(wù)關(guān)閉的時候保留最后一次 CK 數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 指定從 CK 自動重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));
//設(shè)置狀態(tài)后端
env.setStateBackend(new FsStateBackend("hdfs://ip:8020/../.."));
//設(shè)置訪問 HDFS 的用戶名
System.setProperty("HADOOP_USER_NAME", "hadoop");
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "initial");
properties.setProperty("debezium.slot.name", "pg_cdc");
properties.setProperty("debezium.slot.drop.on.stop", "true");
properties.setProperty("include.schema.changes", "true");
SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
.hostname("192.168.1.xxx")
.port(5432)
.database("databseName") // monitor postgres database
.schemaList("schemaName") // monitor inventory snachema
.tableList("schemaName.table1,scheamName.tabl2,...") // monitor products table
.username("userName")
.password("password")
.decodingPluginName("pgoutput")
.deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();
DataStreamSource<String> pgDataStream =
env
.addSource(sourceFunction)
.setParallelism(1); // use parallelism 1 for sink to keep message ordering
// 設(shè)置kafka配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers","ip1:9092");
kafkaProps.setProperty("transaction.max.timeout.ms",90000);
// sink到kafka
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer<>("topicName"), new SimpleStringSchema(), kafkaProps);
pgDataStream.addSink(flinkKafkaProducer).name("sink2Kafka");
env.execute("pg_cdc job");
}
}
注意:postgresql 11以上,decodingPluginName為pgoutput
02、flink cdc錯誤整理
1:mysql-cdc指定剔除不需要監(jiān)聽的字段信息時拋出異常:
即指定"‘debezium.column.blacklist’"配置信息時拋出異常
org.apache.kafka.connect.errors.DataException: order_sales is not a valid field name
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$createRowConverter$508c5858$1(RowDataDebeziumDeserializeSchema.java:364)
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$wrapIntoNullableConverter$7b91dc26$1(RowDataDebeziumDeserializeSchema.java:390)
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:126)
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:101)
at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97)
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
分析:指定debezium.column.blacklist該參數(shù)的意思是指在debezium監(jiān)聽到事件后會把記錄中的指定字段刪除,然后在flink做解析轉(zhuǎn)換的時候找不到字段。
2:cdc source掃描mysql表期間,進行加鎖操作。
解決方案:
給使用的mysql用戶授予reload權(quán)限即可。詳細(xì)見:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server
使用'debezium.snapshot.locking.mode'='none'
3:同步鎖表
User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.
原因是連接MySQL的用戶缺乏必要的CDC權(quán)限。
Flink CDC基于Debezium實現(xiàn)。當(dāng)啟動MySQL CDC源時,它將獲取一個全局讀取鎖(FLUSH TABLES WITH READ LOCK),該鎖將阻止其他數(shù)據(jù)庫的寫入,然后讀取當(dāng)前binlog位置以及數(shù)據(jù)庫和表的schema,之后將釋放全局讀取鎖。然后它掃描數(shù)據(jù)庫表并從先前記錄的位置讀取binlog,F(xiàn)link將定期執(zhí)行checkpoints以記錄binlog位置。如果發(fā)生故障,作業(yè)將重新啟動并從checkpoint完成的binlog位置恢復(fù),因此它保證了僅一次的語義。
解決辦法:創(chuàng)建一個新的MySQL用戶并授予其必要的權(quán)限。
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
4:Flink作業(yè)掃描MySQL全量數(shù)據(jù)出現(xiàn)fail-over
Flink 作業(yè)在掃描 MySQL 全量數(shù)據(jù)時,checkpoint 超時,出現(xiàn)作業(yè) failover,如下圖:
原因:Flink CDC 在 scan 全表數(shù)據(jù)(我們的實收表有千萬級數(shù)據(jù))需要小時級的時間(受下游聚合反壓影響),而在 scan 全表過程中是沒有 offset 可以記錄的(意味著沒法做 checkpoint),但是 Flink 框架任何時候都會按照固定間隔時間做 checkpoint,所以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的過程中,會讓執(zhí)行中的 checkpoint 一直等待甚至超時。超時的 checkpoint 會被仍未認(rèn)為是 failed checkpoint,默認(rèn)配置下,這會觸發(fā) Flink 的 failover 機制,而默認(rèn)的 failover 機制是不重啟。所以會造成上面的現(xiàn)象。
解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數(shù),以及失敗重啟策略,如下:
execution.checkpointing.interval: 10min # checkpoint間隔時間
execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint 失敗容忍次數(shù)
restart-strategy: fixed-delay # 重試策略
restart-strategy.fixed-delay.attempts: 2147483647 # 重試次數(shù)
5:作業(yè)在運行時 mysql cdc source 報 no viable alternative at input ‘a(chǎn)lter table std’
原因:因為數(shù)據(jù)庫中別的表做了字段修改,CDC source 同步到了 ALTER DDL 語句,但是解析失敗拋出的異常。
解決方法:在 flink-cdc-connectors 最新版本中已經(jīng)修復(fù)該問題(跳過了無法解析的 DDL)。升級 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。
6:多個作業(yè)共用同一張 source table 時,沒有修改 server id 導(dǎo)致讀取出來的數(shù)據(jù)有丟失。
原因:MySQL binlog 數(shù)據(jù)同步的原理是,CDC source 會偽裝成 MySQL 集群的一個 slave(使用指定的 server id 作為唯一 id),然后從 MySQL 拉取 binlog 數(shù)據(jù)。如果一個 MySQL 集群中有多個 slave 有同樣的 id,就會導(dǎo)致拉取數(shù)據(jù)錯亂的問題。
解決方法:默認(rèn)會隨機生成一個 server id,容易有碰撞的風(fēng)險。所以建議使用動態(tài)參數(shù)(table hint)在 query 中覆蓋 server id。如下所示:
FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;
7: flinksql cdc時區(qū)差8小時的問題
在連接參數(shù)中設(shè)置 ‘server-time-zone’ = ‘Asia/Shanghai’
比如:WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘xxx’,
‘port’ = ‘3306’,
‘username’ = ‘root’,
‘password’ = ‘root’,
‘database-name’ = ‘xxx’,
‘table-name’ = ‘xxx’,
‘server-time-zone’ = ‘Asia/Shanghai’
不設(shè)置的話可能會改變MySQL中時間字段比如datetime減8小時
在sql語句中使用LOCALTIMESTAMP或者手動給時間戳加8小時而不要用current_date等
補充:
如果要sink到MySQL的話,在url后加&serverTimezone=Asia/Shanghai 否則時區(qū)也會對不上或者在url上
添加
jdbc:mysql://${hostname}/${db_name}useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true
8:flink cdc Encountered chage event for table xxx.xxxx whose schema isn’t known to this connector
解決方案:
inconsistent.schema.handing.mode=''warn'
9: Flinksql From Mysql-cdc Sink to Hbase Cause Miss Data
定位:
1:改源碼,增加log
2:查看寫入邏輯
#open邏輯,有個定時任務(wù)刷新
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
}
}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
}
# invoke邏輯
if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
}
# snapshot邏輯,當(dāng)隊列中還有數(shù)據(jù)請求未刷新時才滿足
while (numPendingRequests.get() != 0) {
flush();
}
以RowKey=0為例發(fā)現(xiàn)操作已經(jīng)被封住在Mutation中,且已經(jīng)被刷新了。但在hbase中并未找到該key.猜測可能在Mutator處理亂序數(shù)據(jù)了。
搜索查證資料:
https://www.jianshu.com/p/1a753ffcbe2ahttps://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455
解決方案:文章來源:http://www.zghlxwxcb.cn/news/detail-726906.html
1:短期方案:設(shè)置'sink.buffer-flush.max-rows'='2'暫時規(guī)避該問題,但對rs會有較大壓力
2:徹底解決:基于issue改造源碼
10:相關(guān)參數(shù)說明:
snapshot.mode的各種參數(shù),以下是測試效果
properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector
properties.setProperty("snapshot.mode", "initial");每次重啟都會讀全量
properties.setProperty("snapshot.mode", "initial_only");//讀不到數(shù)據(jù)
properties.setProperty("snapshot.mode", "when_needed");//跟initial效果類似
properties.setProperty("snapshot.mode", "schema_only");//只會記錄最新的更改,歷史全量讀不到
properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot
文章來源地址http://www.zghlxwxcb.cn/news/detail-726906.html
到了這里,關(guān)于flink cdc 連接posgresql 數(shù)據(jù)庫相關(guān)問題整理的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!