国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

通過kafka connector實(shí)現(xiàn)mysql數(shù)據(jù)自動同步es

這篇具有很好參考價(jià)值的文章主要介紹了通過kafka connector實(shí)現(xiàn)mysql數(shù)據(jù)自動同步es。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

整體思路:

1、使用?io.debezium.connector.mysql.MySqlConnector 自動同步數(shù)據(jù)到kafka消息隊(duì)列

2、通過listener監(jiān)聽消息隊(duì)列,代碼控制數(shù)據(jù)插入es

ps:其實(shí)有更簡單的方式:在此基礎(chǔ)上使用ElasticsearchSinkConnector、ksql,完成數(shù)據(jù)的轉(zhuǎn)換與自動同步es,全程無需代碼控制,后續(xù)本地跑通流程后再來記錄

一、連接器的下載與配置

下載debezium mysql connector

在kafka中建立connect文件夾,并解壓連接器

在kafka/config下的connect-distributed.properties文件中,修改plugin.path=連接器地址

啟動連接器:

bin/connect-distributed.sh -daemon config/connect-distributed.properties

postman查詢連接器是否配置成功

http://localhost:8083/connector-plugins

如果返回連接器,則表示配置成功

[
   
    {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "2.1.2.Final"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "3.3.2"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "3.3.2"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "3.3.2"
    }
]

二、創(chuàng)建同步連接器實(shí)例

post請求地址:

http://localhost:8083/connectors

請求體:

{
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "127.0.0.1", // 數(shù)據(jù)庫ip
        "database.port": "3306", 
        "database.user": "root", // 數(shù)據(jù)庫登陸用戶名
        "database.password": "123456", // 登陸密碼
        "database.server.id": "2", 
        "database.server.name": "hc",
        "database.include.list": "store", // 需要同步的庫
        "table.include.list": "store.product", // 需要同步的表
        "database.history.kafka.bootstrap.servers": "localhost:9092", // kafka地址
        "database.history.kafka.topic": "schema-changes-inventory", 
        "topic.prefix": "pro",
        "include.schema.changes": "true",
        "transforms": "unwrap,Cast",
        "transforms.Cast.type": 
        "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.Cast.spec": "amount:float64,unit_price:float64",
        "transforms.unwrap.type": 
        "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}

查看是否建立成功:

get請求:

http://localhost:8083/connectors

返回結(jié)果:

[
    "mysql-connector"
]

三、代碼里監(jiān)聽消息

@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class HcCustomerListener {

    private final EsSearchService esSearchService;

    private final IEsRepository esRepository;

    private final String INDEX = "product";

    /**
     * 監(jiān)聽產(chǎn)品表
     * @param record
     */
    @KafkaListener(topics = "test.store.product")
    public void onMessage(ConsumerRecord<String, String> record) {

        String kafkaMessage = record.value();
        if (StrUtil.isBlank(kafkaMessage)) {
            return;
        }

        // 檢查索引是否存在,沒有則新建
        if (!esRepository.checkIndex(INDEX)) {
            if (!esRepository.createIndex(INDEX)) {
                log.error("建立索引失?。∷饕? + INDEX);
            }
        }

        // 數(shù)據(jù)轉(zhuǎn)換為要存儲的對象
        Product item = JSONObject.toJavaObject(JSONObject.parseObject(kafkaMessage), Product.class);

        // 數(shù)據(jù)同步
        if (!esRepository.dataSync(Product, INDEX, QueryBuilders.termQuery("code", Product.getCode()))) {
            log.error("產(chǎn)品信息同步es失?。‘a(chǎn)品編號:" + customer.getCode());
        }
    }
}

ps:關(guān)于數(shù)據(jù)存儲es,數(shù)據(jù)查詢es的具體方法,會寫一篇專門的文章記錄

中間也遇到了一些坑,比如建connector的時(shí)候一直報(bào)錯(cuò)缺少什么值,最后把jdk1.8改到j(luò)dk17就好了

比如同步數(shù)據(jù)一直報(bào)數(shù)據(jù)轉(zhuǎn)換錯(cuò)誤,才發(fā)現(xiàn)bigdecimal類型的字段需要在建connector時(shí)顯示的去做轉(zhuǎn)換:"transforms.Cast.spec": "unit_price:float64,amount:float64"這樣就行文章來源地址http://www.zghlxwxcb.cn/news/detail-721507.html

到了這里,關(guān)于通過kafka connector實(shí)現(xiàn)mysql數(shù)據(jù)自動同步es的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【kafka】JDBC connector進(jìn)行表數(shù)據(jù)增量同步過程中的源表與目標(biāo)表時(shí)間不一致問題解決...

    【kafka】JDBC connector進(jìn)行表數(shù)據(jù)增量同步過程中的源表與目標(biāo)表時(shí)間不一致問題解決...

    〇、參考資料 時(shí)間不一致,差了8個(gè)小時(shí) (1)source (2)sink 即sink和source都加 ?\\\"db.timezone\\\":?\\\"Asia/Shanghai\\\", 并需要保持一直

    2024年02月11日
    瀏覽(17)
  • 通過零代碼ETLCloud實(shí)現(xiàn)金蝶云星空數(shù)據(jù)自動化同步

    通過零代碼ETLCloud實(shí)現(xiàn)金蝶云星空數(shù)據(jù)自動化同步

    金蝶云星空是一款基于云計(jì)算架構(gòu)打造的全面財(cái)務(wù)管理軟件,旨在為企業(yè)提供全方位、一站式的財(cái)務(wù)解決方案。其功能包括 財(cái)務(wù)核算、現(xiàn)金管理、應(yīng)付應(yīng)收管理、成本核算、固定資產(chǎn)管理、稅務(wù)管理等,覆蓋了財(cái)務(wù)管理的各個(gè)方面,可以幫助企業(yè)提高財(cái)務(wù)管理效率,降低財(cái)務(wù)

    2024年02月09日
    瀏覽(20)
  • 通過logstash實(shí)現(xiàn)mysql與es的雙向數(shù)據(jù)同步

    通過logstash實(shí)現(xiàn)mysql與es的雙向數(shù)據(jù)同步

    參考題目 一種基于MySQL和Elasticsearch的數(shù)據(jù)同步方法及系統(tǒng) 基于MySQL和Elasticsearch的數(shù)據(jù)同步方法 一種基于MySQL和Elasticsearch的數(shù)據(jù)同步系統(tǒng) 基于MySQL和Elasticsearch的數(shù)據(jù)同步技術(shù) 目錄 1【理論調(diào)研】 方案1:使用Logstash實(shí)現(xiàn)數(shù)據(jù)同步 方案2:使用Canal實(shí)現(xiàn)數(shù)據(jù)同步 方案3:使用Debe

    2024年02月15日
    瀏覽(21)
  • 60、Flink CDC 入門介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    60、Flink CDC 入門介紹及Streaming ELT示例(同步Mysql數(shù)據(jù)庫數(shù)據(jù)到Elasticsearch)-CDC Connector介紹及示例 (1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月19日
    瀏覽(21)
  • MySQL FlinkCDC 通過Kafka實(shí)時(shí)同步到ClickHouse(自定義Debezium格式支持增加刪除修改)

    MySQL FlinkCDC 通過Kafka實(shí)時(shí)同步到ClickHouse(自定義Debezium格式支持增加刪除修改) 把MySQL多庫多表的數(shù)據(jù)通過FlinkCDC DataStream的方式實(shí)時(shí)同步到同一個(gè)Kafka的Topic中,然后下游再寫Flink SQL拆分把數(shù)據(jù)寫入到ClickHouse,F(xiàn)linkCDC DataStream通過自定義Debezium格式的序列化器,除了增加,還能進(jìn)行

    2024年02月15日
    瀏覽(23)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進(jìn)行實(shí)時(shí)數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進(jìn)行實(shí)時(shí)數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務(wù)變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標(biāo)端進(jìn)行實(shí)時(shí)數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進(jìn)行實(shí)時(shí)數(shù)

    2024年02月12日
    瀏覽(36)
  • Debezium同步Mysql數(shù)據(jù)到Kafka

    Kafka:3.3.2 mysql-connector:1.8.1 (0)前提是安裝好mysql,開啟binlog (1)下載kafka (2)下載mysql-connector插件 (3)編輯配置文件 (4)啟動kafka自帶的zk (5)啟動kafka (6)啟動connect (7)調(diào)用api 注意:當(dāng)成功調(diào)用api,創(chuàng)建此連接器后會有如下主題產(chǎn)生:dbhistory.inventory、mysql1、

    2024年02月10日
    瀏覽(22)
  • cancel框架同步mysql數(shù)據(jù)到kafka

    1、下載cancel 2、修改conf文件夾下的canal.properties配置文件 3、修改conf/example文件夾下的instance.properties配置文件 在sql查詢show binary logs語句得到binlog日志 4、啟動 在bin目錄下執(zhí)行 啟動程序 注:MySQL需要?jiǎng)?chuàng)建新用戶

    2024年02月15日
    瀏覽(17)
  • 通過ETLCloud自動化數(shù)據(jù)處理:用友U8數(shù)據(jù)一鍵同步

    通過ETLCloud自動化數(shù)據(jù)處理:用友U8數(shù)據(jù)一鍵同步

    用友U8是一款成熟的企業(yè)管理軟件,是一套適用于企業(yè)全面管理的ERP(Enterprise Resource Planning)軟件。主要用于管理企業(yè)的財(cái)務(wù)、人力資源、供應(yīng)鏈、生產(chǎn)制造等業(yè)務(wù)。它具有模塊化設(shè)計(jì)和高度可定制化的特點(diǎn),可以根據(jù)企業(yè)的實(shí)際需求進(jìn)行配置和部署。同時(shí),用友U8還提供了

    2024年02月09日
    瀏覽(20)
  • 從 MySQL 到 DolphinDB,Debezium + Kafka 數(shù)據(jù)同步實(shí)戰(zhàn)

    從 MySQL 到 DolphinDB,Debezium + Kafka 數(shù)據(jù)同步實(shí)戰(zhàn)

    Debezium 是一個(gè)開源的分布式平臺,用于實(shí)時(shí)捕獲和發(fā)布數(shù)據(jù)庫更改事件。它可以將關(guān)系型數(shù)據(jù)庫(如 MySQL、PostgreSQL、Oracle 等)的變更事件轉(zhuǎn)化為可觀察的流數(shù)據(jù),以供其他應(yīng)用程序?qū)崟r(shí)消費(fèi)和處理。 本文中我們將采用 Debezium 與 Kafka 組合的方式來實(shí)現(xiàn)從 MySQL 到 DolphinDB 的數(shù)

    2024年02月02日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包