Postgres CDC Connector — CDC Connectors for Apache Flink? documentation
flink cdc捕獲postgresql數(shù)據(jù)
1)更改配置文件
需要更改
linux>vi postgresql.conf
# 更改wal日志方式為logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個(gè)
slotsmax_replication_slots = 20 # max number of replication slots
# 更改wal發(fā)送最大進(jìn)程數(shù)(默認(rèn)值為10),這個(gè)值和上面的solts設(shè)置一樣
max_wal_senders = 20 # max number of walsender processes
# 中斷那些停止活動(dòng)超過(guò)指定毫秒數(shù)的復(fù)制連接,可以適當(dāng)設(shè)置大一點(diǎn)(默認(rèn)60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable
2)注意
注意:wal_level = logical源表的數(shù)據(jù)修改時(shí),默認(rèn)的邏輯復(fù)制流只包含歷史記錄的primary key,如果需要輸出更新記錄的歷史記錄的所有字段,需要在表級(jí)別修改參數(shù):ALTER TABLE tableName REPLICA IDENTITY FULL; 這樣才能捕獲到源表所有字段更新后的值
3) 將jar包導(dǎo)入flink lib目錄
flink-sql-connector-postgres-cdc-2.2.0.jar 到 flink/lib下
4)新建用戶并且給用戶復(fù)制流權(quán)限
-- pg新建用戶
CREATE USER user WITH PASSWORD 'pwd';
5) 給用戶復(fù)制流權(quán)限
ALTER ROLE user replication;
6) 給用戶登錄數(shù)據(jù)庫(kù)權(quán)限
grant CONNECT ON DATABASE test to user;
7)把當(dāng)前庫(kù)public下所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
8) 發(fā)布表
-- 設(shè)置發(fā)布為true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進(jìn)行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;
9) 更改表的復(fù)制標(biāo)識(shí)包含更新和刪除的值
-- 更改復(fù)制標(biāo)識(shí)包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看復(fù)制標(biāo)識(shí)(為f標(biāo)識(shí)說(shuō)明設(shè)置成功)
select relreplident from pg_class where relname='testname';
到這一步,設(shè)置已經(jīng)完全可以啦,上面步驟都是必須的
flink sql 端 創(chuàng)建postgresql 連接器
linux>bin/sql-client.sh //進(jìn)入flink sql客戶端
CREATE TABLE flink_cdc_source (
id INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'pg數(shù)據(jù)庫(kù)IP地址',
'port' = '5432',
'database-name' = 'postgres',
'schema-name' = 'public',
'username' = 'postgres',
'password' = '123456',
'table-name' = 'pg_cdc_source',
'decoding.plugin.name' = 'pgoutput'
);
錯(cuò)誤: 復(fù)制槽名 "flink" 已經(jīng)存在
(?解決復(fù)制槽名 "flink" 已經(jīng)存在)
1.切換用戶
# su - postgres
2.登陸用戶
-bash-4.2$ psql -U postgres
3. 查看復(fù)制槽
postgres=# select * from pg_replication_slots; 查看復(fù)制槽
?4. 刪除復(fù)制槽
SELECT * FROM pg_drop_replication_slot('flink'); 刪除復(fù)制槽
5.驗(yàn)證
postgres=# select * from pg_replication_slots; 查看復(fù)制槽
Flink CDC Stream Postgres變更捕獲 (java)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-427006.html
package pg;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
public class FlinkCdcPg {
public static void main(String[]args) throws Exception {
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "initial");
properties.setProperty("decimal.handling.mode", "double");
properties.setProperty("database.serverTimezone", "GMT+8"); //設(shè)置時(shí)區(qū)
SourceFunction<String>sourceFunction = PostgreSQLSource.<String>builder()
.hostname("Pg數(shù)據(jù)庫(kù)IP地址")
.port(5432)
.database("postgres") // monitor postgresdatabase
.schemaList("public") // monitor inventory schema
.tableList("public.sink2") // monitor productstable
.username("postgres")
.password("123456")
.decodingPluginName("pgoutput") // pg解碼插件
.slotName("t_table_slot") // 復(fù)制槽名稱(chēng) 不能重復(fù)
.deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism1 for sink to keep message ordering
env.execute();
}
}
Flink CDC? SQL TABLE pg讀?。╦ava)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-427006.html
package pg;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkCdcOracleExample {
public static void main(String[]args) throws Exception {
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.disableOperatorChaining();
StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
String sourceDDL ="CREATE TABLEpg_source (\n" +
" ID INT, \n" +
" NAME STRING, \n" +
" PRIMARY KEY (ID) NOT ENFORCED \n" +
" ) WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = 'Pg數(shù)據(jù)庫(kù)IP地址',\n" +
" 'port' = '5432',\n" +
" 'username' = 'postgres',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'postgres',\n" +
" 'schema-name' = 'public',\n" + // 注意這里要大寫(xiě)
" 'table-name' = 'sink2',\n" +
" 'debezium.log.mining.strategy'='online_catalog'\n" +
)";
//執(zhí)行source表ddl
tableEnv.executeSql(sourceDDL);
TableResult tableResult =tableEnv.executeSql("select * from pg_source");
tableResult.print();
env.execute();
}
}
到了這里,關(guān)于FLINK CDC postgresql (Stream與SQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!