1.Debezium和postgres介紹
Debezium
是一個開源項目,為捕獲數(shù)據(jù)更改(change data capture,CDC)提供了一個低延遲的流式處理平臺。你可以安裝并且配置Debezium去監(jiān)控你的數(shù)據(jù)庫,然后你的應(yīng)用就可以消費對數(shù)據(jù)庫的每一個行級別(row-level)的更改。只有已提交的更改才是可見的,所以你的應(yīng)用不用擔心事務(wù)(transaction)或者更改被回滾(roll back)。Debezium為所有的數(shù)據(jù)庫更改事件提供了一個統(tǒng)一的模型,所以你的應(yīng)用不用擔心每一種數(shù)據(jù)庫管理系統(tǒng)的錯綜復雜性。另外,由于Debezium用持久化的、有副本備份的日志來記錄數(shù)據(jù)庫數(shù)據(jù)變化的歷史,因此,你的應(yīng)用可以隨時停止再重啟,而不會錯過它停止運行時發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉
postgres介紹
PostgreSQL是一個功能強大的開源數(shù)據(jù)庫系統(tǒng)。經(jīng)過長達15年以上的積極開發(fā)和不斷改進,PostgreSQL已在可靠性、穩(wěn)定性、數(shù)據(jù)一致性等獲得了業(yè)內(nèi)極高的聲譽。目前PostgreSQL可以運行在所有主流操作系統(tǒng)上,包括Linux、Unix(AIX、BSD、HP-UX、SGI IRIX、Mac OS X、Solaris和Tru64)和Windows。PostgreSQL是完全的事務(wù)安全性數(shù)據(jù)庫,完整地支持外鍵、聯(lián)合、視圖、觸發(fā)器和存儲過程(并支持多種語言開發(fā)存儲過程)。它支持了大多數(shù)的SQL:2008標準的數(shù)據(jù)類型,包括整型、數(shù)值型、布爾型、字節(jié)型、字符型、日期型、時間間隔型和時間型,它也支持存儲二進制的大對像,包括圖片、聲音和視頻。PostgreSQL對很多高級開發(fā)語言有原生的編程接口,如C/C++、Java、.Net、Perl、Python、Ruby、Tcl 和ODBC以及其他語言等,也包含各種文檔。
2.postgres測試環(huán)境搭建
1.安裝
step1:拉取 PostgreSQL 10.6 版本的鏡像:
docker pull postgres:10.6
step2:創(chuàng)建并啟動 PostgreSQL 容器,
在這里,我們將把容器的端口 5432 映射到主機的端口 30028,賬號密碼設(shè)置為postgres,并將 pgoutput 插件加載到 PostgreSQL 實例中:
docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
step3:查看容器是否創(chuàng)建成功:
docker ps | grep postgres-10.6
2.配置
step1:docker進去Postgresql數(shù)據(jù)的容器:
docker exec -it postgres-10.6 bash
step2:編輯postgresql.conf配置文件:
vi /var/lib/postgresql/data/postgresql.conf
配置內(nèi)容如下:
# 更改wal日志方式為logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個slots
max_replication_slots = 20
# 更改wal發(fā)送最大進程數(shù)(默認值為10),這個值和上面的solts設(shè)置一樣
max_wal_senders = 20
# 中斷那些停止活動超過指定毫秒數(shù)的復制連接,可以適當設(shè)置大一點(默認60s,0表示禁用)
wal_sender_timeout = 180s
step3:重啟容器:
docker restart postgres-10.6
連接數(shù)據(jù)庫,如果查詢一下語句,返回logical表示修改成功:
SHOW wal_level
3.新建用戶并賦權(quán)
使用創(chuàng)建容器時的賬號密碼(postgres/postgres)登錄Postgresql數(shù)據(jù)庫。
先創(chuàng)建數(shù)據(jù)庫和表:
--?創(chuàng)建數(shù)據(jù)庫test_db
CREATE DATABASE test_db;
--?連接到新創(chuàng)建的數(shù)據(jù)庫 test_db
\c test_db
--?創(chuàng)建 t_user 表
CREATE TABLE "public"."t_user" (
"id" int8 NOT NULL,
"name" varchar(255),
"age" int2,
PRIMARY KEY ("id")
);
新建用戶并且給用戶權(quán)限:
--?pg新建用戶
CREATE USER test1 WITH PASSWORD 'test123';
--?給用戶復制流權(quán)限
ALTER ROLE test1 replication;
--?給用戶登錄數(shù)據(jù)庫權(quán)限
GRANT CONNECT ON DATABASE test_db to test1;
--?把當前庫public下所有表查詢權(quán)限賦給用戶
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
5. 發(fā)布表
--?設(shè)置發(fā)布為true
update pg_publication set puballtables=true where pubname is not null;
--?把所有表進行發(fā)布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
--?查詢哪些表已經(jīng)發(fā)布
select * from pg_publication_tables;
--?更改復制標識包含更新和刪除之前值(目的是為了確保表 t_user 在實時同步過程中能夠正確地捕獲并同步更新和刪除的數(shù)據(jù)變化。如果不執(zhí)行這兩條語句,那么 t_user 表的復制標識可能默認為 NOTHING,這可能導致實時同步時丟失更新和刪除的數(shù)據(jù)行信息,從而影響同步的準確性)
ALTER TABLE t_user REPLICA IDENTITY FULL;
--?查看復制標識(為f標識說明設(shè)置成功,f(表示 full),否則為 n(表示 nothing),即復制標識未設(shè)置)
select relreplident from pg_class where relname='t_user';
3.代碼工程
實驗?zāi)繕耍簻y試讀取postgres數(shù)據(jù)庫中t_user增量數(shù)據(jù)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>postgre</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<debezium.version>1.9.4.Final</debezium.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- debezium -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
</project>
DebeziumConnectorConfig.java
package com.et.postgres.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.File;
import java.io.IOException;
@Configuration
public class DebeziumConnectorConfig {
@Bean
public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "customer_postgres_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", env.getProperty("customer.datasource.host"))
.with("database.port", env.getProperty("customer.datasource.port")) // defaults to 5432
.with("database.user", env.getProperty("customer.datasource.username"))
.with("database.password", env.getProperty("customer.datasource.password"))
.with("database.dbname", env.getProperty("customer.datasource.database"))
.with("database.server.id", "10181")
.with("database.server.name", "customer-postgres-db-server")
.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
.with("table.include.list", "public.t_user")
.with("column.include.list", "public.t_user.name,public.t_user.age")
.with("publication.autocreate.mode", "filtered")
.with("plugin.name", "pgoutput")
.with("slot.name", "dbz_customerdb_listener")
.build();
}
}
DebeziumListener.java
package com.et.postgres.listener;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Slf4j
@Component
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration customerConnectorConfiguration) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(customerConnectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
}
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
Object sourceRecordChangeValue= (Struct) sourceRecord.value();
log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
// if (sourceRecordChangeValue != null) {
// Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
// Operation.READ operation events are always triggered when application initializes
// We're only interested in CREATE operation which are triggered upon new insert registry
// if(operation != Operation.READ) {
// String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.
// Struct struct = (Struct) sourceRecordChangeValue.get(record);
// Map<String, Object> payload = struct.schema().fields().stream()
// .map(Field::name)
// .filter(fieldName -> struct.get(fieldName) != null)
// .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
// .collect(toMap(Pair::getKey, Pair::getValue));
// // this.customerService.replicateData(payload, operation);
// log.info("Updated Data: {} with Operation: {}", payload, operation.name());
// }
// }
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (Objects.nonNull(this.debeziumEngine)) {
this.debeziumEngine.close();
}
}
}
DemoApplication.java
package com.et.postgres;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
application.properties
customer.datasource.host=localhost
customer.datasource.port=30028
customer.datasource.database=test_db
customer.datasource.username=test1
customer.datasource.password=test123
logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG
以上只是一些關(guān)鍵代碼,所有代碼請參見下面代碼倉庫
代碼倉庫
https://github.com/Harries/springboot-demo
4.測試
啟動spring boot應(yīng)用程序
在表里面插入一些數(shù)據(jù)
INSERT INTO public.t_user(id, "name", age) VALUES(1, 'harries', 18);
觀察控制臺輸出
2024-04-07 14:22:01.621 INFO 29260 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:42.015, last recorded offset: {transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}
2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : Key = Struct{id=1}, Value = Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}
2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : SourceRecordChangeValue = 'EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
5.引用
https://github.com/davidarchanjo/spring-boot-debezium-postgres
http://www.liuhaihua.cn/archives/710398.html文章來源:http://www.zghlxwxcb.cn/news/detail-847489.html
https://debezium.io/documentation/reference/0.10/features.html文章來源地址http://www.zghlxwxcb.cn/news/detail-847489.html
到了這里,關(guān)于Spring Boot集成Debezium實現(xiàn)postgres增量同步的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!