国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于Flink CDC實時同步PostgreSQL與Tidb【Flink SQL Client模式下親測可行,詳細教程】

這篇具有很好參考價值的文章主要介紹了基于Flink CDC實時同步PostgreSQL與Tidb【Flink SQL Client模式下親測可行,詳細教程】。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。


操作系統(tǒng):ubuntu-22.04,運行于wsl 2【 注意,請務必使用wsl 2;wsl 1會出現(xiàn)各種各樣的問題】

軟件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0

一、PostgreSQL作為數(shù)據(jù)來源(source),由flink讀取

1.postgre安裝與配置

已有postgre的跳過此步

(1)pg安裝

https://zhuanlan.zhihu.com/p/143156636

sudo apt install postgresql
sudo -u postgres psql -c "SELECT version();"
sudo -u postgres psql # 連接進入postgre shell(以管理員用戶)

(2)pg配置

# 創(chuàng)建新用戶和數(shù)據(jù)庫
sudo su - postgres -c "createuser domeya"
sudo su - postgres -c "createdb domeya_db"

sudo -u postgres psql # 進入psql(管理員用戶postgres)
grant all privileges on database domeya_db to domeya; # 授權用戶操作數(shù)據(jù)庫
\password # 設置當前用戶密碼(\password domeya,可設置用戶domeya的密碼)
\q # 退出psql

# psql postgres://username:password@host:port/dbname  
psql postgres://domeya:123@localhost:5432/domeya_db # 新用戶測試連接

可能出現(xiàn)的問題

sudo -u postgres psql報錯:

psql: error: connection to server on socket “/var/run/postgresql/.s.PGSQL.5432” failed: No such file or directory
Is the server running locally and accepting connections on that socket?

https://stackoverflow.com/questions/69639250/pgconnectionbad-connection-to-server-on-socket-var-run-postgresql-s-pgsql

https://blog.csdn.net/psiitoy/article/details/7310003

【解決關鍵】:重啟pg服務

# 重啟
sudo service postgresql restart # 重要!
ps -ef | grep postgres

(可選)重裝pg

# 卸載
dpkg --list | grep postgresql
dpkg --purge postgresql postgresql-14 postgresql-client-14 postgresql-client-common postgresql-common # 根據(jù)dpkg --list | grep postgresql中展示的結果進行填寫
# rm -rf /var/lib/postgresql/
# 重裝
sudo apt install postgresql

2.flink安裝與配置

已有flink的跳過此步

flink安裝,配置環(huán)境變量

# https://flink.apache.org/downloads/
curl -O -L https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
tar zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt

sudo vim /etc/profile.d/flink.sh
# flink.sh
export FLINK_HOME=/opt/flink-1.17.1
export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

如果webUI無法外機訪問把rest.bind-address: 0.0.0.0這個設置放開權限即可

cd $FLINK_HOME/conf
cp flink-conf.yaml flink-conf.yaml.backup

vim flink-conf.yaml

# 修改以下設置
rest.bind-address: 0.0.0.0

啟動flink

cd $FLINK_HOME
./bin/start-cluster.sh # 啟動flink
jps # 查看是否啟動StandaloneSessionClusterEntrypoint, TaskManagerRunner

# ./bin/stop-cluster.sh # 關閉flink

3.flink cdc postgre配置

3.1 postgre配置(for flink cdc)

https://www.cnblogs.com/xiongmozhou/p/14817641.html

(1)修改配置文件

cd /etc/postgresql/14/main
cp postgresql.conf postgresql.conf.backup
vim postgresql.conf

postgresql.conf修改幾個關鍵配置如下:

# 更改wal日志方式為logical
wal_level = logical # minimal, replica, or logical

# 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個slots
max_replication_slots = 20 # max number of replication slots

# 更改wal發(fā)送最大進程數(shù)(默認值為10),這個值和上面的solts設置一樣
max_wal_senders = 20 # max number of walsender processes

# 中斷那些停止活動超過指定毫秒數(shù)的復制連接,可以適當設置大一點(默認60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

修改完之后重啟postgresql,service postgresql restart

(2)賦予權限

以管理員進入psql,sudo -u postgres psql

(可選)如果沒有測試表,可以新建一個

-- 如果沒有測試表,可創(chuàng)建一個
CREATE TABLE test_table1(
   id varchar(8),
   p_dt varchar(8)
);
-- 查看表
\d

insert into test_table1 values('1', '20230820');
select * from test_table1;

賦予普通用戶復制流權限、發(fā)布表、更改表的復制標識包含更新和刪除的值

-- 給用戶復制流權限
ALTER ROLE domeya replication;
-- 查看權限
\du


\c domeya_db -- 重要:進入到domeya_db數(shù)據(jù)庫(以管理員賬號進入)
-- 設置發(fā)布為true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進行發(fā)布(包括以后新建的表);
-- 注意,此處PUBLICATION名字必須為dbz_publication,否則后續(xù)flink sql報錯must be superuser to create FOR ALL TABLES publication
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;

-- 更改復制標識包含更新和刪除之前值
ALTER TABLE test_table1 REPLICA IDENTITY FULL; -- 對應前面創(chuàng)建的測試表
-- 查看復制標識(為f標識說明設置成功)
select relreplident from pg_class where relname='test_table1'; -- 對應前面創(chuàng)建的測試表

-- 退出
\q

wal_level = logical源表的數(shù)據(jù)修改時,默認的邏輯復制流只包含歷史記錄的primary key,如果需要輸出更新記錄的歷史記錄的所有字段,需要在表級別修改參數(shù):ALTER TABLE tableName REPLICA IDENTITY FULL; 這樣才能捕獲到源表所有字段更新后的值

發(fā)布所有表可能太多,也可以創(chuàng)建publication,添加指定表到publication。

update pg_publication set puballtables=false where pubname is not null; -- 默認發(fā)布所有表為false
CREATE PUBLICATION flink_cdc_publication;
alter publication flink_cdc_publication add table test_table1;
select * from pg_publication;
select * from pg_publication_tables;

3.2 flink cdc postgres的jar包下載

下載flink cdc postgres相關jar包,放在$FLINK_HOME/lib

cd $FLINK_HOME/lib

# 以下用于flink cdc postgres連接
# 注意:用于flink sql的jar包是flink-sql-connector-postgres-cdc,不是flink-connector-postgres-cdc
# https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/2.4.0
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4.0/flink-sql-connector-postgres-cdc-2.4.0.jar

如果flink在運行狀態(tài),需要重啟flink,之后再啟動flink sql client

cd $FLINK_HOME
./bin/stop-cluster.sh
./bin/start-cluster.sh

4.flink cdc postgre測試

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html【官方文檔demo】

啟動flink sql client(之前重啟了flink cluster)

cd $FLINK_HOME
./bin/sql-client.sh

在flink sql client創(chuàng)建表,與pg中的表結構對應,表名字可以不同

CREATE TABLE source_table (
    id STRING,
    p_dt STRING
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = 'domeya',
    'password' = '123',
    'database-name' = 'domeya_db',
    'schema-name' = 'public',
    'table-name' = 'test_table1',
    'slot.name' = 'flink',
    -- experimental feature: incremental snapshot (default off)
    -- 'scan.incremental.snapshot.enabled' = 'true'
    'decoding.plugin.name' = 'pgoutput' -- 必須加,否則報錯could not access file "decoderbufs"
);

select * from source_table;

可能出現(xiàn)的問題

運行select * from source_table;時報錯

報錯1:

[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory

https://github.com/ververica/flink-cdc-connectors/issues/37

table sql加:WITH('decoding.plugin.name' = 'pgoutput')【flink sql】

dataStream加:PostgreSQLSource.<String>builder().decodingPluginName("pgoutput").build()

報錯2:

[ERROR] Could not execute SQL statement. Reason:
org.postgresql.util.PSQLException: ERROR: must be superuser to create FOR ALL TABLES publication

https://gist.github.com/alexhwoods/4c4c90d83db3c47d9303cb734135130d

檢查之前的操作(psql):

CREATE PUBLICATION dbz_publication FOR ALL TABLES;
select * from pg_publication_tables;

報錯3:

Caused by: org.postgresql.util.PSQLException: ERROR: replication slot “flink” already exists

https://zhuanlan.zhihu.com/p/449066277

當上面debezium.slot.name的值超過20個,就會報錯,即使之前的job已經(jīng)下線,這個slot文件依舊在,此時需要執(zhí)行下面語句并刪除slot即可:

psql:

-- https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION
SELECT slot_name, slot_type, active FROM pg_replication_slots;
SELECT pg_drop_replication_slot('flink'); # 這個和之前flink sql中的'slot.name' = 'flink'對應

注意,flink postgres-cdc只能讀(作為source),不能寫(作為sink)

Flink SQL> insert into source_table values('3', '20230820');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'postgres-cdc' can only be used as a source. It cannot be used as a sink.

二、Tidb作為數(shù)據(jù)去向(sink),由flink寫入

1.tidb安裝與配置

已有tidb的跳過此步

https://docs.pingcap.com/zh/tidb/stable/quick-start-with-tidb

su xxx # 切換到你的普通用戶
curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
source /home/xxx/.bashrc # 按上個命令輸出的路徑來,上面顯示的是Shell profile:  /home/xxx/.bashrc

tiup playground # 下載鏡像,并啟動某個版本的集群
  • 以這種方式執(zhí)行的 playground,在結束部署測試后 TiUP 會清理掉原集群數(shù)據(jù),重新執(zhí)行該命令后會得到一個全新的集群。
  • 若希望持久化數(shù)據(jù),可以執(zhí)行 TiUP 的 --tag 參數(shù):tiup --tag <your-tag> playground ...

下載完畢,啟動成功之后展示信息:

Connect TiDB: mysql --comments --host 127.0.0.1 --port 4000 -u root
TiDB Dashboard: http://127.0.0.1:2379/dashboard
Grafana: http://127.0.0.1:3000

連接tidb

# 使用mysql client連接tidb
sudo apt install mysql-client
mysql --comments --host 127.0.0.1 --port 4000 -u root

# 設置root密碼 
# https://docs.pingcap.com/zh/tidb/stable/user-account-management#%E8%AE%BE%E7%BD%AE%E5%AF%86%E7%A0%81
# https://blog.csdn.net/qq_45675449/article/details/106866700
SET PASSWORD FOR 'root'@'%' = '123'; # root的localhost是%,可通過 select user,host from mysql.user; 查看

exit;

# mysql -uroot -p無法連接,必須加上port和host,并且host不能寫成localhost
# https://blog.csdn.net/hjf161105/article/details/78850658
mysql -uroot --port 4000 -h 127.0.0.1 -p

2.flink cdc tidb的jar包下載

下載用于jdbc mysql連接的jar包,用于flink cdc tidb連接。

特別注意:Tidb的sink模式得用jdbc+mysql連接,不用官方提供的tidb cdc因為其不能作為sink,只能曲線救國參考這種方法了。

cd $FLINK_HOME/lib
# 以下用于jdbc mysql(用于flink cdc tidb連接)
# https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc/3.1.1-1.17
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.1.0/mysql-connector-j-8.1.0.jar

如果flink在運行狀態(tài),需要重啟flink,之后再啟動flink sql client

cd $FLINK_HOME
./bin/stop-cluster.sh
./bin/start-cluster.sh

3.flink cdc tidb測試

基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

flink cdc tidb 官方文檔demo(無法作為sink,只能作為source)

(可選)tidb創(chuàng)建測試表

# mysql -uroot --port 4000 -h 127.0.0.1 -p

# 創(chuàng)建測試表
CREATE TABLE test.test_table1(
   id varchar(8),
   p_dt varchar(8)
);
insert into test.test_table1 values('3', '20230819');

啟動flink sql client(之前重啟了flink cluster)

cd $FLINK_HOME
./bin/sql-client.sh

flink sql連接tidb,仿照mysql的連接

-- checkpoint every 3000 milliseconds                       
SET 'execution.checkpointing.interval' = '3s';

-- register a TiDB table in Flink SQL
CREATE TABLE sink_table (
    id STRING,
    p_dt STRING,
    PRIMARY KEY(id) NOT ENFORCED 
    -- 必須寫PRIMARY KEY,否則報錯:[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:4000/test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = '123',
    'table-name' = 'test_table1'
);
  
-- read snapshot and binlogs from table
SELECT * FROM sink_table;

三、用Flink SQL Client同步PostgreSQL到Tidb

# 將會提交一個作業(yè),進行source_table->sink_table的單向同步
insert into sink_table select * from source_table;

[INFO] Submitting SQL update statement to the cluster…
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8e47bfa3ea78da4c47b395f7517c2812

在flink web ui上可以看到作業(yè)運行狀態(tài)。

只要這個作業(yè)是正常runnning,那么對source_table的任何修改都會同步到sink_table。注意這種是單向同步,source_table的變動(增/刪/改)會同步到sink_table,但反過來sink_table的變動不會影響到source_table(不會觸發(fā)source_table->sink_table的同步)。文章來源地址http://www.zghlxwxcb.cn/news/detail-668737.html

到了這里,關于基于Flink CDC實時同步PostgreSQL與Tidb【Flink SQL Client模式下親測可行,詳細教程】的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 基于 Flink CDC 構建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    基于 Flink CDC 構建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    這篇教程將展示如何基于 Flink CDC 快速構建 MySQL 到 Databend 的實時數(shù)據(jù)同步。本教程的演示都將在 Flink SQL CLI 中進行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。 假設我們有電子商務業(yè)務,商品的數(shù)據(jù)存儲在 MySQL ,我們需要實時把它同步到 Databend 中。 接下來的內(nèi)容

    2024年02月10日
    瀏覽(29)
  • Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    環(huán)境說明: flink 1.15.2 mysql 版本5.7 ? ?注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 具體前提設置,請看這篇,包含 binlog 設置、Maven...... Flink CDC 基于mysql binlog 實時同步mysql表_彩虹豆的博客-CSDN博客 經(jīng)過不懈努力,終于從阿里help頁面找到了支

    2024年02月08日
    瀏覽(27)
  • Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運行 先上官網(wǎng)使用說明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 開啟 log archiving (1).啟用 log archiving ?? ??? ?a:以DBA用戶連接數(shù)據(jù)庫? ??

    2024年02月11日
    瀏覽(44)
  • Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql(無主鍵)

    環(huán)境說明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地運行 具體環(huán)境設置和maven依賴請看上篇:Flink CDC 基于Oracle log archiving 實時同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 現(xiàn)在操作的是源表和目標表都無主鍵數(shù)

    2024年02月15日
    瀏覽(30)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫: 一種是通過FlinkSQL直接輸入兩表數(shù)據(jù)庫映射進行數(shù)據(jù)同步,缺點是只能單表進行同步; 一種是通過DataStream開發(fā)一個maven項目,打成jar包上傳到服務器運行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應用可以部署在具有公網(wǎng)IP的服務

    2023年04月11日
    瀏覽(27)
  • flink oracle cdc實時同步(超詳細)

    flink oracle cdc實時同步(超詳細)

    官方文檔:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文參照官方文檔來記錄Oracle CDC 的配置。 在本文開始前,需要先安裝Oracle,有興趣的同學可以參考博主之前寫的《docker下安裝oracle11g(一次安裝成功)》。 如果要做oracle的實時同步

    2024年02月12日
    瀏覽(21)
  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(94)
  • Flink CDC實時同步PG數(shù)據(jù)庫

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)
  • FLINK CDC postgresql (Stream與SQL)

    FLINK CDC postgresql (Stream與SQL)

    Postgres CDC Connector — CDC Connectors for Apache Flink? documentation flink cdc捕獲postgresql數(shù)據(jù) 1)更改配置文件 需要更改 # 更改wal日志方式為logical # 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個 # 更改wal發(fā)送最大進程數(shù)(默認值為10),這個值和上面的solts設置一樣 # 中斷

    2023年04月27日
    瀏覽(21)
  • 用flink cdc sqlserver 將數(shù)據(jù)實時同步到clickhouse

    flink cdc 終于支持 sqlserver 了。 現(xiàn)在互聯(lián)網(wǎng)公司用sqlserver的不多,大部分都是一些國企的老舊系統(tǒng)。我們以前同步數(shù)據(jù),都是用datax,但是不能實時同步數(shù)據(jù)?,F(xiàn)在有了flinkcdc,可以實現(xiàn)實時同步了。 1、首先sqlserver版本:要求sqlserver版本為14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包