国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

FLINK CDC postgresql (Stream與SQL)

這篇具有很好參考價(jià)值的文章主要介紹了FLINK CDC postgresql (Stream與SQL)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Postgres CDC Connector — CDC Connectors for Apache Flink? documentation

flink cdc捕獲postgresql數(shù)據(jù)

1)更改配置文件

需要更改

linux>vi postgresql.conf

# 更改wal日志方式為logical

wal_level = logical # minimal, replica, or logical

# 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個(gè)

slotsmax_replication_slots = 20 # max number of replication slots

# 更改wal發(fā)送最大進(jìn)程數(shù)(默認(rèn)值為10),這個(gè)值和上面的solts設(shè)置一樣

max_wal_senders = 20 # max number of walsender processes

# 中斷那些停止活動(dòng)超過(guò)指定毫秒數(shù)的復(fù)制連接,可以適當(dāng)設(shè)置大一點(diǎn)(默認(rèn)60s)

wal_sender_timeout = 180s # in milliseconds; 0 disable

2)注意

注意:wal_level = logical源表的數(shù)據(jù)修改時(shí),默認(rèn)的邏輯復(fù)制流只包含歷史記錄的primary key,如果需要輸出更新記錄的歷史記錄的所有字段,需要在表級(jí)別修改參數(shù):ALTER TABLE tableName REPLICA IDENTITY FULL; 這樣才能捕獲到源表所有字段更新后的值

3) 將jar包導(dǎo)入flink lib目錄

flink-sql-connector-postgres-cdc-2.2.0.jar 到 flink/lib下

4)新建用戶并且給用戶復(fù)制流權(quán)限
-- pg新建用戶

CREATE USER user WITH PASSWORD 'pwd';

5) 給用戶復(fù)制流權(quán)限

ALTER ROLE user replication;

6) 給用戶登錄數(shù)據(jù)庫(kù)權(quán)限

grant CONNECT ON DATABASE test to user;

7)把當(dāng)前庫(kù)public下所有表查詢權(quán)限賦給用戶

GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

8) 發(fā)布表

-- 設(shè)置發(fā)布為true

update pg_publication set puballtables=true where pubname is not null;

-- 把所有表進(jìn)行發(fā)布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查詢哪些表已經(jīng)發(fā)布

select * from pg_publication_tables;

9) 更改表的復(fù)制標(biāo)識(shí)包含更新和刪除的值

-- 更改復(fù)制標(biāo)識(shí)包含更新和刪除之前值

ALTER TABLE test0425 REPLICA IDENTITY FULL;

-- 查看復(fù)制標(biāo)識(shí)(為f標(biāo)識(shí)說(shuō)明設(shè)置成功)

select relreplident from pg_class where relname='testname';

到這一步,設(shè)置已經(jīng)完全可以啦,上面步驟都是必須的

flink sql 端 創(chuàng)建postgresql 連接器

linux>bin/sql-client.sh     //進(jìn)入flink sql客戶端

CREATE TABLE flink_cdc_source (
id INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'pg數(shù)據(jù)庫(kù)IP地址',
'port' = '5432',
'database-name' = 'postgres',
'schema-name' = 'public',
'username' = 'postgres',
'password' = '123456',
'table-name' = 'pg_cdc_source',
'decoding.plugin.name' = 'pgoutput'
);

錯(cuò)誤: 復(fù)制槽名 "flink" 已經(jīng)存在

FLINK CDC postgresql (Stream與SQL)

(?解決復(fù)制槽名 "flink" 已經(jīng)存在)

1.切換用戶

# su - postgres

2.登陸用戶

-bash-4.2$ psql -U postgres

3. 查看復(fù)制槽

postgres=# select * from pg_replication_slots; 查看復(fù)制槽

FLINK CDC postgresql (Stream與SQL)

?4. 刪除復(fù)制槽

SELECT * FROM pg_drop_replication_slot('flink'); 刪除復(fù)制槽

FLINK CDC postgresql (Stream與SQL)

5.驗(yàn)證

postgres=# select * from pg_replication_slots; 查看復(fù)制槽

Flink CDC Stream Postgres變更捕獲 (java)

package pg;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;


public class FlinkCdcPg {
public static void main(String[]args) throws Exception {
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "initial");
properties.setProperty("decimal.handling.mode", "double"); 
properties.setProperty("database.serverTimezone", "GMT+8"); //設(shè)置時(shí)區(qū)


SourceFunction<String>sourceFunction = PostgreSQLSource.<String>builder()
.hostname("Pg數(shù)據(jù)庫(kù)IP地址")
.port(5432)
.database("postgres") // monitor postgresdatabase
.schemaList("public") // monitor inventory schema
.tableList("public.sink2") // monitor productstable
.username("postgres")
.password("123456")
.decodingPluginName("pgoutput") // pg解碼插件
.slotName("t_table_slot") // 復(fù)制槽名稱(chēng) 不能重復(fù)
.deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism1 for sink to keep message ordering


env.execute();

}
}

Flink CDC? SQL TABLE pg讀?。╦ava)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-427006.html

package pg;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCdcOracleExample {
public static void main(String[]args) throws Exception {

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.disableOperatorChaining();
StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);


String sourceDDL ="CREATE TABLEpg_source (\n" +
" ID INT, \n" +
" NAME STRING, \n" +
" PRIMARY KEY (ID) NOT ENFORCED \n" +
" ) WITH (\n" +
" 'connector' = 'postgres-cdc',\n" +
" 'hostname' = 'Pg數(shù)據(jù)庫(kù)IP地址',\n" +
" 'port' = '5432',\n" +
" 'username' = 'postgres',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'postgres',\n" +
" 'schema-name' = 'public',\n" + // 注意這里要大寫(xiě)
" 'table-name' = 'sink2',\n" +
" 'debezium.log.mining.strategy'='online_catalog'\n" +
)";



//執(zhí)行source表ddl
tableEnv.executeSql(sourceDDL);
TableResult tableResult =tableEnv.executeSql("select * from pg_source");
tableResult.print();
env.execute();

}
}

到了這里,關(guān)于FLINK CDC postgresql (Stream與SQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink Oracle CDC Connector源碼解讀

    Flink Oracle CDC Connector源碼解讀

    flink cdc是在flink的基礎(chǔ)上對(duì)oracle的數(shù)據(jù)進(jìn)行實(shí)時(shí)采集,底層使用的是debezium框架來(lái)實(shí)現(xiàn),debezium使用oracle自帶的logminer技術(shù)來(lái)實(shí)現(xiàn)。logminer的采集需要對(duì)數(shù)據(jù)庫(kù)和采集表添加補(bǔ)充日志,由于oracle18c不支持對(duì)數(shù)據(jù)添加補(bǔ)充日志,所以目前支持的oracle11、12、19三個(gè)版本。 flink oracle

    2024年02月02日
    瀏覽(22)
  • 【大數(shù)據(jù)】基于 Flink CDC 構(gòu)建 MySQL 和 Postgres 的 Streaming ETL

    【大數(shù)據(jù)】基于 Flink CDC 構(gòu)建 MySQL 和 Postgres 的 Streaming ETL

    這篇教程將展示如何基于 Flink CDC 快速構(gòu)建 MySQL 和 Postgres 的流式 ETL。本教程的演示都將在 Flink SQL CLI 中進(jìn)行,只涉及 SQL,無(wú)需一行 Java / Scala 代碼,也無(wú)需安裝 IDE。 假設(shè)我們正在經(jīng)營(yíng)電子商務(wù)業(yè)務(wù),商品和訂單的數(shù)據(jù)存儲(chǔ)在 MySQL 中,訂單對(duì)應(yīng)的物流信息存儲(chǔ)在 Postgres 中。

    2024年02月03日
    瀏覽(15)
  • 第3.4章:StarRocks數(shù)據(jù)導(dǎo)入--Flink Connector與CDC秒級(jí)數(shù)據(jù)同步

    Flink作為當(dāng)前流行的流式計(jì)算框架,在對(duì)接StarRocks時(shí),若直接使用JDBC的方式“流式”寫(xiě)入數(shù)據(jù),對(duì)StarRocks是不友好的,StarRocks作為一款MVCC的數(shù)據(jù)庫(kù),其導(dǎo)入的核心思想還是“攢微批+降頻率”。為此,StarRocks單獨(dú)開(kāi)發(fā)了flink-connector-starrocks,其內(nèi)部實(shí)現(xiàn)仍是通過(guò)對(duì)數(shù)據(jù)緩存攢批

    2023年04月15日
    瀏覽(51)
  • 60、Flink CDC 入門(mén)介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫(kù)數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    60、Flink CDC 入門(mén)介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫(kù)數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月19日
    瀏覽(21)
  • flink postgresql cdc實(shí)時(shí)同步(含pg安裝配置等)

    類(lèi)型 版本/描述 docker 20.10.9 Postgresql 10.6 初始化賬號(hào)密碼:postgres/postgres 普通用戶:test1/test123 數(shù)據(jù)庫(kù):test_db flink 1.13.6 step1 : 拉取 PostgreSQL 10.6 版本的鏡像: step2 :創(chuàng)建并啟動(dòng) PostgreSQL 容器,在這里,我們將把容器的端口 5432 映射到主機(jī)的端口 30028,賬號(hào)密碼設(shè)置為 postgre

    2024年02月11日
    瀏覽(27)
  • Flink SQL Hive Connector使用場(chǎng)景

    目錄 1.介紹 2.使用 2.1注冊(cè)HiveCatalog 2.2Hive Read 2.2.1流讀關(guān)鍵配置 2.2.2示例

    2024年02月06日
    瀏覽(24)
  • Flink Upsert Kafka SQL Connector 介紹

    Flink Upsert Kafka SQL Connector 介紹

    在某些場(chǎng)景中,比方GROUP BY聚合之后的后果,須要去更新之前的結(jié)果值。這個(gè)時(shí)候,須要將 Kafka 記錄的 key 當(dāng)成主鍵解決,用來(lái)確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來(lái)解決。在 Flink1.11 中,能夠通過(guò) flink-cdc-connectors 項(xiàng)目提供的 changelog-json format 來(lái)實(shí)現(xiàn)該性能。 在

    2024年02月20日
    瀏覽(21)
  • SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    最近做的一個(gè)項(xiàng)目,使用的是pg數(shù)據(jù)庫(kù),公司沒(méi)有成熟的DCD組件,為了實(shí)現(xiàn)數(shù)據(jù)變更消息發(fā)布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka。 監(jiān)聽(tīng)數(shù)據(jù)變化,進(jìn)行異步通知,做系統(tǒng)內(nèi)異步任務(wù)。 架構(gòu)方案(懶得寫(xiě)了,看圖吧): -- 創(chuàng)建pg 高線數(shù)據(jù)同步用

    2024年02月02日
    瀏覽(31)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭(zhēng)取搞完最后這一部分,學(xué)完趕緊把 Kafka 和 Flume 學(xué)完,就要開(kāi)始做實(shí)時(shí)數(shù)倉(cāng)了。據(jù)說(shuō)是應(yīng)屆生得把實(shí)時(shí)數(shù)倉(cāng)搞個(gè) 80%~90% 才能差不多找個(gè)工作,太牛馬了。 ????????之前我們已經(jīng)用過(guò)了一些簡(jiǎn)單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(51)
  • 關(guān)于flink-sql-connector-phoenix的重寫(xiě)邏輯

    目錄 重寫(xiě)意義 代碼結(jié)構(gòu)? 調(diào)用鏈路 POM文件配置 代碼解析 一、PhoenixJdbcD

    2024年02月12日
    瀏覽(25)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包