国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring Boot集成Debezium實現(xiàn)postgres增量同步

這篇具有很好參考價值的文章主要介紹了Spring Boot集成Debezium實現(xiàn)postgres增量同步。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1.Debezium和postgres介紹

Debezium

是一個開源項目,為捕獲數(shù)據(jù)更改(change data capture,CDC)提供了一個低延遲的流式處理平臺。你可以安裝并且配置Debezium去監(jiān)控你的數(shù)據(jù)庫,然后你的應(yīng)用就可以消費對數(shù)據(jù)庫的每一個行級別(row-level)的更改。只有已提交的更改才是可見的,所以你的應(yīng)用不用擔心事務(wù)(transaction)或者更改被回滾(roll back)。Debezium為所有的數(shù)據(jù)庫更改事件提供了一個統(tǒng)一的模型,所以你的應(yīng)用不用擔心每一種數(shù)據(jù)庫管理系統(tǒng)的錯綜復雜性。另外,由于Debezium用持久化的、有副本備份的日志來記錄數(shù)據(jù)庫數(shù)據(jù)變化的歷史,因此,你的應(yīng)用可以隨時停止再重啟,而不會錯過它停止運行時發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉

postgres介紹

PostgreSQL是一個功能強大的開源數(shù)據(jù)庫系統(tǒng)。經(jīng)過長達15年以上的積極開發(fā)和不斷改進,PostgreSQL已在可靠性、穩(wěn)定性、數(shù)據(jù)一致性等獲得了業(yè)內(nèi)極高的聲譽。目前PostgreSQL可以運行在所有主流操作系統(tǒng)上,包括Linux、Unix(AIX、BSD、HP-UX、SGI IRIX、Mac OS X、Solaris和Tru64)和Windows。PostgreSQL是完全的事務(wù)安全性數(shù)據(jù)庫,完整地支持外鍵、聯(lián)合、視圖、觸發(fā)器和存儲過程(并支持多種語言開發(fā)存儲過程)。它支持了大多數(shù)的SQL:2008標準的數(shù)據(jù)類型,包括整型、數(shù)值型、布爾型、字節(jié)型、字符型、日期型、時間間隔型和時間型,它也支持存儲二進制的大對像,包括圖片、聲音和視頻。PostgreSQL對很多高級開發(fā)語言有原生的編程接口,如C/C++、Java、.Net、Perl、Python、Ruby、Tcl 和ODBC以及其他語言等,也包含各種文檔。

2.postgres測試環(huán)境搭建

1.安裝

step1:拉取 PostgreSQL 10.6 版本的鏡像:
docker pull postgres:10.6
step2:創(chuàng)建并啟動 PostgreSQL 容器,

在這里,我們將把容器的端口 5432 映射到主機的端口 30028,賬號密碼設(shè)置為postgres,并將 pgoutput 插件加載到 PostgreSQL 實例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
step3:查看容器是否創(chuàng)建成功:
docker ps | grep postgres-10.6

2.配置

step1:docker進去Postgresql數(shù)據(jù)的容器:
docker exec -it postgres-10.6 bash
step2:編輯postgresql.conf配置文件:
vi /var/lib/postgresql/data/postgresql.conf

配置內(nèi)容如下:

# 更改wal日志方式為logical(方式有:minimal、replica 、logical  )
wal_level = logical


# 更改solts最大數(shù)量(默認值為10),flink-cdc默認一張表占用一個slots
max_replication_slots = 20


# 更改wal發(fā)送最大進程數(shù)(默認值為10),這個值和上面的solts設(shè)置一樣
max_wal_senders = 20


# 中斷那些停止活動超過指定毫秒數(shù)的復制連接,可以適當設(shè)置大一點(默認60s,0表示禁用)
wal_sender_timeout = 180s
step3:重啟容器:
docker restart postgres-10.6

連接數(shù)據(jù)庫,如果查詢一下語句,返回logical表示修改成功:

SHOW wal_level

3.新建用戶并賦權(quán)

使用創(chuàng)建容器時的賬號密碼(postgres/postgres)登錄Postgresql數(shù)據(jù)庫。

先創(chuàng)建數(shù)據(jù)庫和表:

--?創(chuàng)建數(shù)據(jù)庫test_db

CREATE DATABASE test_db;

--?連接到新創(chuàng)建的數(shù)據(jù)庫 test_db

\c test_db

--?創(chuàng)建 t_user 表

CREATE TABLE "public"."t_user" (
"id" int8 NOT NULL,
"name" varchar(255),
"age" int2,
PRIMARY KEY ("id")
);
新建用戶并且給用戶權(quán)限:

--?pg新建用戶

CREATE USER test1 WITH PASSWORD 'test123';

--?給用戶復制流權(quán)限

ALTER ROLE test1 replication;

--?給用戶登錄數(shù)據(jù)庫權(quán)限

GRANT CONNECT ON DATABASE test_db to test1;

--?把當前庫public下所有表查詢權(quán)限賦給用戶

GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

5. 發(fā)布表

--?設(shè)置發(fā)布為true

update pg_publication set puballtables=true where pubname is not null;

--?把所有表進行發(fā)布

CREATE PUBLICATION dbz_publication FOR ALL TABLES;

--?查詢哪些表已經(jīng)發(fā)布

select * from pg_publication_tables;

--?更改復制標識包含更新和刪除之前值(目的是為了確保表 t_user 在實時同步過程中能夠正確地捕獲并同步更新和刪除的數(shù)據(jù)變化。如果不執(zhí)行這兩條語句,那么 t_user 表的復制標識可能默認為 NOTHING,這可能導致實時同步時丟失更新和刪除的數(shù)據(jù)行信息,從而影響同步的準確性)

ALTER TABLE t_user REPLICA IDENTITY FULL;

--?查看復制標識(為f標識說明設(shè)置成功,f(表示 full),否則為 n(表示 nothing),即復制標識未設(shè)置)

select relreplident from pg_class where relname='t_user';

3.代碼工程

實驗?zāi)繕耍簻y試讀取postgres數(shù)據(jù)庫中t_user增量數(shù)據(jù)

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>postgre</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <debezium.version>1.9.4.Final</debezium.version>


    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>


        <!-- debezium -->
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-postgres</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>


    </dependencies>
</project>

DebeziumConnectorConfig.java

package com.et.postgres.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;


import java.io.File;
import java.io.IOException;


@Configuration
public class DebeziumConnectorConfig {


    @Bean
    public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
        File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
        return io.debezium.config.Configuration.create()
            .with("name", "customer_postgres_connector")
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
            .with("offset.flush.interval.ms", "60000")
            .with("database.hostname", env.getProperty("customer.datasource.host"))
            .with("database.port", env.getProperty("customer.datasource.port")) // defaults to 5432
            .with("database.user", env.getProperty("customer.datasource.username"))
            .with("database.password", env.getProperty("customer.datasource.password"))
            .with("database.dbname", env.getProperty("customer.datasource.database"))
            .with("database.server.id", "10181")
            .with("database.server.name", "customer-postgres-db-server")
            .with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
            .with("table.include.list", "public.t_user")
            .with("column.include.list", "public.t_user.name,public.t_user.age")
            .with("publication.autocreate.mode", "filtered")
            .with("plugin.name", "pgoutput")
            .with("slot.name", "dbz_customerdb_listener")
            .build();
    }
}

DebeziumListener.java

package com.et.postgres.listener;


import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


@Slf4j
@Component
public class DebeziumListener {


    private final Executor executor = Executors.newSingleThreadExecutor();    
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;


    public DebeziumListener(Configuration customerConnectorConfiguration) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
            .using(customerConnectorConfiguration.asProperties())
            .notifying(this::handleChangeEvent)
            .build();
    }


    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
        log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
        Object sourceRecordChangeValue= (Struct) sourceRecord.value();
        log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
        // if (sourceRecordChangeValue != null) {
        //     Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));


        // Operation.READ operation events are always triggered when application initializes
        // We're only interested in CREATE operation which are triggered upon new insert registry
        //     if(operation != Operation.READ) {
        //         String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.


        //         Struct struct = (Struct) sourceRecordChangeValue.get(record);
        //         Map<String, Object> payload = struct.schema().fields().stream()
        //             .map(Field::name)
        //             .filter(fieldName -> struct.get(fieldName) != null)
        //             .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
        //             .collect(toMap(Pair::getKey, Pair::getValue));


        //         // this.customerService.replicateData(payload, operation);
        //         log.info("Updated Data: {} with Operation: {}", payload, operation.name());
        //     }
        // }
    }


    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }


    @PreDestroy
    private void stop() throws IOException {
        if (Objects.nonNull(this.debeziumEngine)) {
            this.debeziumEngine.close();
        }
    }


}

DemoApplication.java

package com.et.postgres;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class DemoApplication {


   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

application.properties

customer.datasource.host=localhost
customer.datasource.port=30028
customer.datasource.database=test_db
customer.datasource.username=test1
customer.datasource.password=test123


logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG

以上只是一些關(guān)鍵代碼,所有代碼請參見下面代碼倉庫

代碼倉庫

  • https://github.com/Harries/springboot-demo

4.測試

  • 啟動spring boot應(yīng)用程序

  • 在表里面插入一些數(shù)據(jù)

 
 
INSERT INTO public.t_user(id, "name", age) VALUES(1, 'harries', 18);
  • 觀察控制臺輸出

2024-04-07 14:22:01.621 INFO 29260 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:42.015, last recorded offset: {transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}
2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : Key = Struct{id=1}, Value = Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}
2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : SourceRecordChangeValue = 'EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'

5.引用

  • https://github.com/davidarchanjo/spring-boot-debezium-postgres

  • http://www.liuhaihua.cn/archives/710398.html

  • https://debezium.io/documentation/reference/0.10/features.html文章來源地址http://www.zghlxwxcb.cn/news/detail-847489.html

到了這里,關(guān)于Spring Boot集成Debezium實現(xiàn)postgres增量同步的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Spring Boot集成Spring AI實現(xiàn)快速接入openAI

    Spring Boot集成Spring AI實現(xiàn)快速接入openAI

    Spring AI API 涵蓋了廣泛的功能。每個主要功能都在其專門的部分中進行了詳細介紹。為了提供概述,可以使用以下關(guān)鍵功能: 跨 AI 提供商的可移植 API,用于聊天、文本到圖像和嵌入模型。支持同步和流 API 選項。還支持下拉訪問模型特定功能。我們支持 OpenAI、Microsoft、Amaz

    2024年04月28日
    瀏覽(15)
  • spring-boot集成spring-brick實現(xiàn)動態(tài)插件

    spring-boot集成spring-brick實現(xiàn)動態(tài)插件

    spring-boot集成spring-brick實現(xiàn)動態(tài)插件 項目結(jié)構(gòu) 需求實現(xiàn) spring-boot集成spring-brick 環(huán)境說明 1. 主程序集成spring-brick 第一步:引入相關(guān)依賴 第二步:修改程序入口方法 第三步:編寫配置 第四步:設(shè)置maven插件 2. 準備plugin-api 第一步:引入相關(guān)依賴 第二步:引入相關(guān)依賴 3. 實現(xiàn)

    2024年02月14日
    瀏覽(23)
  • Spring Boot集成WebSocket實現(xiàn)消息推送

    Spring Boot集成WebSocket實現(xiàn)消息推送

    項目中經(jīng)常會用到消息推送功能,關(guān)于推送技術(shù)的實現(xiàn),我們通常會聯(lián)想到輪詢、comet長連接技術(shù),雖然這些技術(shù)能夠?qū)崿F(xiàn),但是需要反復連接,對于服務(wù)資源消耗過大,隨著技術(shù)的發(fā)展,HtML5定義了WebSocket協(xié)議,能更好的節(jié)省服務(wù)器資源和帶寬,并且能夠更實時地進行通訊。

    2023年04月08日
    瀏覽(20)
  • Spring Boot 集成 Redisson 實現(xiàn)分布式鎖

    Spring Boot 集成 Redisson 實現(xiàn)分布式鎖

    ????????Redisson 是一種基于 Redis 的 Java 駐留集群的分布式對象和服務(wù)庫,可以為我們提供豐富的分布式鎖和線程安全集合的實現(xiàn)。在 Spring Boot 應(yīng)用程序中使用 Redisson 可以方便地實現(xiàn)分布式應(yīng)用程序的某些方面,例如分布式鎖、分布式集合、分布式事件發(fā)布和訂閱等。本篇

    2024年02月08日
    瀏覽(24)
  • 【Spring Boot】集成Kafka實現(xiàn)消息發(fā)送和訂閱

    【Spring Boot】集成Kafka實現(xiàn)消息發(fā)送和訂閱

    最近忙著搞低代碼開發(fā),好久沒新建spring項目了,結(jié)果今天心血來潮準備建個springboot項目 注意Type選Maven,java選8,其他默認 點下一步后完成就新建了一個spring boot項目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉庫,不然可能依賴下載不下來 在maven配置沒問

    2024年02月09日
    瀏覽(31)
  • spring boot集成jasypt 并 實現(xiàn)自定義加解密

    由于項目中的配置文件 配置的地方過多,現(xiàn)將配置文件統(tǒng)一放到nacos上集中管理 且密碼使用加密的方式放在配置文件中 項目中組件使用的版本環(huán)境如下 spring cloud 2021.0.5 spring cloud alibaba 2021.0.5.0 spring boot 2.6.13 配置文件的加密使用 加密庫 jasypt 引入maven依賴 添加配置 使用jasy

    2024年02月11日
    瀏覽(21)
  • Spring Boot集成EasyExcel實現(xiàn)excel導入導出操作

    Spring Boot集成EasyExcel實現(xiàn)excel導入導出操作

    Easy Excel 官網(wǎng) Java解析、生成Excel比較有名的框架有Apache poi、jxl。但他們都存在一個嚴重的問題就是非常的耗內(nèi)存,poi有一套SAX模式的API可以一定程度的解決一些內(nèi)存溢出的問題,但POI還是有一些缺陷,比如07版Excel解壓縮以及解壓后存儲都是在內(nèi)存中完成的,內(nèi)存消耗依然很

    2024年02月14日
    瀏覽(19)
  • 【Redis系列】Spring Boot 集成 Redis 實現(xiàn)緩存功能

    【Redis系列】Spring Boot 集成 Redis 實現(xiàn)緩存功能

    ??????歡迎來到我的博客,很高興能夠在這里和您見面!希望您在這里可以感受到一份輕松愉快的氛圍,不僅可以獲得有趣的內(nèi)容和知識,也可以暢所欲言、分享您的想法和見解。 推薦:kwan 的首頁,持續(xù)學習,不斷總結(jié),共同進步,活到老學到老 導航 檀越劍指大廠系列:全面總

    2024年04月10日
    瀏覽(95)
  • Spring Boot 集成 EasyExcel 3.x 優(yōu)雅實現(xiàn)Excel導入導出

    Spring Boot 集成 EasyExcel 3.x 優(yōu)雅實現(xiàn)Excel導入導出

    本章節(jié)將介紹 Spring Boot 集成 EasyExcel(優(yōu)雅實現(xiàn)Excel導入導出)。 ?? Spring Boot 2.x 實踐案例(代碼倉庫) EasyExcel 是一個基于 Java 的、快速、簡潔、解決大文件內(nèi)存溢出的 Excel 處理工具。它能讓你在不用考慮性能、內(nèi)存的等因素的情況下,快速完成 Excel 的讀、寫等功能。 Ea

    2024年02月03日
    瀏覽(52)
  • Spring Boot 集成 WebSocket 實現(xiàn)服務(wù)端推送消息到客戶端

    Spring Boot 集成 WebSocket 實現(xiàn)服務(wù)端推送消息到客戶端

    ? ? ? 假設(shè)有這樣一個場景:服務(wù)端的資源經(jīng)常在更新,客戶端需要盡量及時地了解到這些更新發(fā)生后展示給用戶,如果是 HTTP 1.1,通常會開啟 ajax 請求詢問服務(wù)端是否有更新,通過定時器反復輪詢服務(wù)端響應(yīng)的資源是否有更新。 ? ? ? ? ? ? ? ?? ? ? ? ?在長時間不更新

    2024年02月16日
    瀏覽(86)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包