1. 環(huán)境信息
類型 | 版本/描述 |
---|---|
docker | 20.10.9 |
Postgresql | 10.6 |
初始化賬號密碼:postgres/postgres 普通用戶:test1/test123 數(shù)據(jù)庫:test_db |
|
flink | 1.13.6 |
2. 安裝
step1: 拉取 PostgreSQL 10.6 版本的鏡像:
docker pull postgres:10.6
step2:創(chuàng)建并啟動 PostgreSQL
容器,在這里,我們將把容器的端口 5432 映射到主機的端口 30028,賬號密碼設(shè)置為postgres
,并將 pgoutput
插件加載到 PostgreSQL
實例中:
docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
step3: 查看容器是否創(chuàng)建成功:
docker ps | grep postgres-10.6
3. 配置
step1:docker進(jìn)去Postgresql數(shù)據(jù)的容器:
docker exec -it postgres-10.6 bash
step2:編輯postgresql.conf
配置文件:
vi /var/lib/postgresql/data/postgresql.conf
配置內(nèi)容如下:
# 更改wal日志方式為logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個slots
max_replication_slots = 20
# 更改wal發(fā)送最大進(jìn)程數(shù)(默認(rèn)值為10),這個值和上面的solts設(shè)置一樣
max_wal_senders = 20
# 中斷那些停止活動超過指定毫秒數(shù)的復(fù)制連接,可以適當(dāng)設(shè)置大一點(默認(rèn)60s,0表示禁用)
wal_sender_timeout = 180s
step3:重啟容器:
docker restart postgres-10.6
連接數(shù)據(jù)庫,如果查詢一下語句,返回logical
表示修改成功:
SHOW wal_level;
4. 新建用戶并賦權(quán)
使用創(chuàng)建容器時的賬號密碼(postgres/postgres
)登錄Postgresql數(shù)據(jù)庫。
先創(chuàng)建數(shù)據(jù)庫和表:
-- 創(chuàng)建數(shù)據(jù)庫 test_db
CREATE DATABASE test_db;
-- 連接到新創(chuàng)建的數(shù)據(jù)庫 test_db
\c test_db
-- 創(chuàng)建 t_user 表
CREATE TABLE "public"."t_user" (
"id" int8 NOT NULL,
"name" varchar(255),
"age" int2,
PRIMARY KEY ("id")
);
新建用戶并且給用戶權(quán)限:
-- pg新建用戶
CREATE USER test1 WITH PASSWORD 'test123';
-- 給用戶復(fù)制流權(quán)限
ALTER ROLE test1 replication;
-- 給用戶登錄數(shù)據(jù)庫權(quán)限
GRANT CONNECT ON DATABASE test_db to test1;
-- 把當(dāng)前庫public下所有表查詢權(quán)限賦給用戶
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
5. 發(fā)布表
-- 設(shè)置發(fā)布為true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進(jìn)行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;
更改表的復(fù)制標(biāo)識包含更新和刪除的值:文章來源:http://www.zghlxwxcb.cn/news/detail-672886.html
-- 更改復(fù)制標(biāo)識包含更新和刪除之前值(目的是為了確保表 t_user 在實時同步過程中能夠正確地捕獲并同步更新和刪除的數(shù)據(jù)變化。如果不執(zhí)行這兩條語句,那么 t_user 表的復(fù)制標(biāo)識可能默認(rèn)為 NOTHING,這可能導(dǎo)致實時同步時丟失更新和刪除的數(shù)據(jù)行信息,從而影響同步的準(zhǔn)確性)
ALTER TABLE t_user REPLICA IDENTITY FULL;
-- 查看復(fù)制標(biāo)識(為f標(biāo)識說明設(shè)置成功,f(表示 full),否則為 n(表示 nothing),即復(fù)制標(biāo)識未設(shè)置)
select relreplident from pg_class where relname='t_user';
6. flink sql
-- 源表定義
CREATE TABLE `table_source_pg` (
id BIGINT,
name STRING,
age INT
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '10.194.183.120',
'port' = '30028',
'username' = 'test1',
'password' = 'test123',
'database-name' = 'test_db',
'schema-name' = 'public',
'table-name' = 't_user',
'decoding.plugin.name' = 'pgoutput'
)
-- 目標(biāo)表表定義
CREATE TABLE `table_sink_mysql` (
id BIGINT,
name STRING,
age INT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.194.183.120:30306/test',
'username' = 'root',
'password' = 'root',
'table-name' = 't_user_copy'
)
-- insert語句
INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`)
7. 命令匯總
-- pg新建用戶
CREATE USER test1 WITH PASSWORD 'test123';
-- 給用戶復(fù)制流權(quán)限
ALTER ROLE ODPS_ETL replication;
-- 給用戶數(shù)據(jù)庫權(quán)限
GRANT CONNECT ON DATABASE test_db to test1;
-- 設(shè)置發(fā)布開關(guān)
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表進(jìn)行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;
-- 給表查詢權(quán)限
grant select on TABLE aa to ODPS_ETL;
-- 給用戶讀寫權(quán)限
grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to bd_test;
-- 把當(dāng)前庫所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;
-- 把當(dāng)前庫以后新建的表查詢權(quán)限賦給用戶
alter default privileges in schema public grant select on tables to ODPS_ETL;
-- 更改復(fù)制標(biāo)識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看復(fù)制標(biāo)識
select relreplident from pg_class where relname='test0425';
-- 查看solt使用情況
SELECT * FROM pg_replication_slots;
-- 刪除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');
-- 查詢用戶當(dāng)前連接數(shù)
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;
-- 設(shè)置用戶最大連接數(shù)
alter role odps_etl connection limit 200;
附:文章來源地址http://www.zghlxwxcb.cn/news/detail-672886.html
- 參考文章:https://www.cnblogs.com/xiongmozhou/p/14817641.html
到了這里,關(guān)于flink postgresql cdc實時同步(含pg安裝配置等)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!