国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink postgresql cdc實時同步(含pg安裝配置等)

這篇具有很好參考價值的文章主要介紹了flink postgresql cdc實時同步(含pg安裝配置等)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1. 環(huán)境信息

類型 版本/描述
docker 20.10.9
Postgresql 10.6
初始化賬號密碼:postgres/postgres
普通用戶:test1/test123
數(shù)據(jù)庫:test_db
flink 1.13.6

2. 安裝

step1: 拉取 PostgreSQL 10.6 版本的鏡像:

docker pull postgres:10.6

step2:創(chuàng)建并啟動 PostgreSQL 容器,在這里,我們將把容器的端口 5432 映射到主機的端口 30028,賬號密碼設(shè)置為postgres,并將 pgoutput 插件加載到 PostgreSQL 實例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'

step3: 查看容器是否創(chuàng)建成功:

docker ps | grep postgres-10.6

3. 配置

step1:docker進(jìn)去Postgresql數(shù)據(jù)的容器:

docker exec -it postgres-10.6  bash

step2:編輯postgresql.conf配置文件:

vi /var/lib/postgresql/data/postgresql.conf 

配置內(nèi)容如下:

# 更改wal日志方式為logical(方式有:minimal、replica 、logical  )
wal_level = logical  

# 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個slots
max_replication_slots = 20

# 更改wal發(fā)送最大進(jìn)程數(shù)(默認(rèn)值為10),這個值和上面的solts設(shè)置一樣
max_wal_senders = 20     

# 中斷那些停止活動超過指定毫秒數(shù)的復(fù)制連接,可以適當(dāng)設(shè)置大一點(默認(rèn)60s,0表示禁用)
wal_sender_timeout = 180s	          

step3:重啟容器:

docker restart postgres-10.6

連接數(shù)據(jù)庫,如果查詢一下語句,返回logical表示修改成功:

SHOW wal_level;

4. 新建用戶并賦權(quán)

使用創(chuàng)建容器時的賬號密碼(postgres/postgres)登錄Postgresql數(shù)據(jù)庫。

先創(chuàng)建數(shù)據(jù)庫和表:

-- 創(chuàng)建數(shù)據(jù)庫 test_db
CREATE DATABASE test_db;

-- 連接到新創(chuàng)建的數(shù)據(jù)庫 test_db
\c test_db

-- 創(chuàng)建 t_user 表
CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);

新建用戶并且給用戶權(quán)限:

-- pg新建用戶
CREATE USER test1 WITH PASSWORD 'test123';

-- 給用戶復(fù)制流權(quán)限
ALTER ROLE test1 replication;

-- 給用戶登錄數(shù)據(jù)庫權(quán)限
GRANT CONNECT ON DATABASE test_db to test1;

-- 把當(dāng)前庫public下所有表查詢權(quán)限賦給用戶
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

5. 發(fā)布表

-- 設(shè)置發(fā)布為true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表進(jìn)行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;

更改表的復(fù)制標(biāo)識包含更新和刪除的值:

-- 更改復(fù)制標(biāo)識包含更新和刪除之前值(目的是為了確保表 t_user 在實時同步過程中能夠正確地捕獲并同步更新和刪除的數(shù)據(jù)變化。如果不執(zhí)行這兩條語句,那么 t_user 表的復(fù)制標(biāo)識可能默認(rèn)為 NOTHING,這可能導(dǎo)致實時同步時丟失更新和刪除的數(shù)據(jù)行信息,從而影響同步的準(zhǔn)確性)
ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看復(fù)制標(biāo)識(為f標(biāo)識說明設(shè)置成功,f(表示 full),否則為 n(表示 nothing),即復(fù)制標(biāo)識未設(shè)置)
select relreplident from pg_class where relname='t_user';

6. flink sql

-- 源表定義
CREATE TABLE `table_source_pg` (
      id BIGINT,
      name STRING,
      age INT
      ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = '10.194.183.120',
      'port' = '30028',
      'username' = 'test1',
      'password' = 'test123',
      'database-name' = 'test_db',
      'schema-name' = 'public',
      'table-name' = 't_user',
      'decoding.plugin.name' = 'pgoutput'
)

-- 目標(biāo)表表定義
CREATE TABLE `table_sink_mysql` (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://10.194.183.120:30306/test',
      'username' = 'root',
      'password' = 'root',
      'table-name' = 't_user_copy'
)

-- insert語句
INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`)

7. 命令匯總

-- pg新建用戶
CREATE USER test1 WITH PASSWORD 'test123';

-- 給用戶復(fù)制流權(quán)限
ALTER ROLE ODPS_ETL replication;

-- 給用戶數(shù)據(jù)庫權(quán)限
GRANT CONNECT ON DATABASE test_db to test1;

-- 設(shè)置發(fā)布開關(guān)
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表進(jìn)行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;

-- 給表查詢權(quán)限
grant select on TABLE aa to ODPS_ETL;

-- 給用戶讀寫權(quán)限
grant select,insert,update,delete ON  ALL TABLES IN SCHEMA public to bd_test;

-- 把當(dāng)前庫所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;

-- 把當(dāng)前庫以后新建的表查詢權(quán)限賦給用戶
alter default privileges in schema public grant select on tables to ODPS_ETL;

-- 更改復(fù)制標(biāo)識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;

-- 查看復(fù)制標(biāo)識
select relreplident from pg_class where relname='test0425';

-- 查看solt使用情況
SELECT * FROM pg_replication_slots;

-- 刪除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');

-- 查詢用戶當(dāng)前連接數(shù)
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;

-- 設(shè)置用戶最大連接數(shù)
alter role odps_etl connection limit 200;

附:文章來源地址http://www.zghlxwxcb.cn/news/detail-672886.html

  • 參考文章:https://www.cnblogs.com/xiongmozhou/p/14817641.html

到了這里,關(guān)于flink postgresql cdc實時同步(含pg安裝配置等)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(94)
  • flink oracle cdc實時同步(超詳細(xì))

    flink oracle cdc實時同步(超詳細(xì))

    官方文檔:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文參照官方文檔來記錄Oracle CDC 的配置。 在本文開始前,需要先安裝Oracle,有興趣的同學(xué)可以參考博主之前寫的《docker下安裝oracle11g(一次安裝成功)》。 如果要做oracle的實時同步

    2024年02月12日
    瀏覽(21)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進(jìn)行實時數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進(jìn)行實時數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務(wù)變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標(biāo)端進(jìn)行實時數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進(jìn)行實時數(shù)

    2024年02月12日
    瀏覽(36)
  • 基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

    基于Flink CDC實時同步數(shù)據(jù)(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠(yuǎn)程服務(wù)器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過程略) 準(zhǔn)備三個數(shù)據(jù)庫:flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實時同步到flink_sink和flink_sink_second的sink_test表。 (建庫建表過程略) 開發(fā)過程

    2024年02月06日
    瀏覽(27)
  • 用flink cdc sqlserver 將數(shù)據(jù)實時同步到clickhouse

    flink cdc 終于支持 sqlserver 了。 現(xiàn)在互聯(lián)網(wǎng)公司用sqlserver的不多,大部分都是一些國企的老舊系統(tǒng)。我們以前同步數(shù)據(jù),都是用datax,但是不能實時同步數(shù)據(jù)?,F(xiàn)在有了flinkcdc,可以實現(xiàn)實時同步了。 1、首先sqlserver版本:要求sqlserver版本為14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    瀏覽(32)
  • Flink CDC 基于mysql binlog 實時同步mysql表

    Flink CDC 基于mysql binlog 實時同步mysql表

    環(huán)境說明: flink?1.15.2 mysql 版本5.7? ? 注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 先上官網(wǎng)使用說明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql開啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測試是捕獲不到binlog日志的,增量相

    2024年02月10日
    瀏覽(24)
  • 基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    基于 Flink CDC 構(gòu)建 MySQL 到 Databend 的 實時數(shù)據(jù)同步

    這篇教程將展示如何基于 Flink CDC 快速構(gòu)建 MySQL 到 Databend 的實時數(shù)據(jù)同步。本教程的演示都將在 Flink SQL CLI 中進(jìn)行,只涉及 SQL,無需一行 Java/Scala 代碼,也無需安裝 IDE。 假設(shè)我們有電子商務(wù)業(yè)務(wù),商品的數(shù)據(jù)存儲在 MySQL ,我們需要實時把它同步到 Databend 中。 接下來的內(nèi)容

    2024年02月10日
    瀏覽(29)
  • Flink CDC2.4 整庫實時同步MySql 到Doris

    ????????Flink 1.15.4? ? ? ? ? 目前有很多工具都支持無代碼實現(xiàn)Mysql - Doris 的實時同步 ? ? ? ? 如:SlectDB 已發(fā)布的功能包 ? ? ? ? ? ? ? ??Dinky?SeaTunnel?TIS?等等 ? ? ? ? ?不過好多要么不支持表結(jié)構(gòu)變動,要不不支持多sink,我們的業(yè)務(wù)必須支持對表結(jié)構(gòu)的實時級變動

    2024年02月11日
    瀏覽(35)
  • Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    Flink CDC 基于mysql binlog 實時同步mysql表(無主鍵)

    環(huán)境說明: flink 1.15.2 mysql 版本5.7 ? ?注意:需要開啟binlog,因為增量同步是基于binlog捕獲數(shù)據(jù) windows11 IDEA 本地運行 具體前提設(shè)置,請看這篇,包含 binlog 設(shè)置、Maven...... Flink CDC 基于mysql binlog 實時同步mysql表_彩虹豆的博客-CSDN博客 經(jīng)過不懈努力,終于從阿里help頁面找到了支

    2024年02月08日
    瀏覽(27)
  • 使用Flink CDC將Mysql中的數(shù)據(jù)實時同步到ES

    最近公司要搞搜索,需要把mysql中的數(shù)據(jù)同步到es中來進(jìn)行搜索,由于公司已經(jīng)搭建了flink集群,就打算用flink來做這個同步。本來以為很簡單,跟著官網(wǎng)文檔走就好了,結(jié)果沒想到折騰了將近一周的時間…… 我也是沒想到,這玩意網(wǎng)上資源竟然這么少,找到的全部都是通過

    2024年02月11日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包