国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Debezium系列之:基于debezium將mysql數(shù)據(jù)庫數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫

這篇具有很好參考價值的文章主要介紹了Debezium系列之:基于debezium將mysql數(shù)據(jù)庫數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、背景

  • 基于 Debezium 的端到端數(shù)據(jù)流用例,將數(shù)據(jù)流式傳輸?shù)?Elasticsearch 服務(wù)器,以利用其出色的功能對我們的數(shù)據(jù)進行全文搜索。
  • 同時把數(shù)據(jù)流式傳輸?shù)?PostgreSQL 數(shù)據(jù)庫,通過 SQL 查詢語言來優(yōu)化對數(shù)據(jù)的訪問。

二、技術(shù)路線

下面的圖表顯示了數(shù)據(jù)如何流經(jīng)我們的分布式系統(tǒng)。首先,Debezium MySQL 連接器不斷捕獲 MySQL 數(shù)據(jù)庫中的更改,并將每個表的更改發(fā)送到單獨的 Kafka 主題。然后,Confluence JDBC 接收器連接器不斷讀取這些主題并將事件寫入 PostgreSQL 數(shù)據(jù)庫。同時,Confluence Elasticsearch 連接器不斷讀取這些相同的主題并將事件寫入 Elasticsearch。

Debezium系列之:基于debezium將mysql數(shù)據(jù)庫數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫,日常分享專欄,Debezium系列,mysql數(shù)據(jù)庫,數(shù)據(jù)更改,流式傳輸,Elasticsearch,Postgresql
我們將把這些組件部署到幾個不同的進程中。在此示例中,我們將所有三個連接器部署到單個 Kafka Connect 實例,該實例將代表所有連接器向 Kafka 寫入和讀取(在生產(chǎn)中,可能需要將連接器分開以實現(xiàn)更好的性能)。

Debezium系列之:基于debezium將mysql數(shù)據(jù)庫數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫,日常分享專欄,Debezium系列,mysql數(shù)據(jù)庫,數(shù)據(jù)更改,流式傳輸,Elasticsearch,Postgresql

三、配置

我們將使用此 Docker Compose 文件來快速部署演示。該部署由以下 Docker 映像組成:

  • Apache ZooKeeper

  • Apache Kafka

  • 一個豐富的 Kafka Connect / Debezium 圖像,有一些變化:

    • PostgreSQL JDBC 驅(qū)動程序放置在 /kafka/libs 目錄中
    • Confluence JDBC 連接器放置在 /kafka/connect/kafka-connect-jdbc 目錄中
  • MySQL

  • PostgreSQL

  • Elasticsearch

Debezium 源連接器以及 JDBC 和 Elasticsearch 連接器的消息格式不同,因為它們是單獨開發(fā)的,并且各自關(guān)注的目標略有不同。 Debezium 發(fā)出更復(fù)雜的事件結(jié)構(gòu),以便捕獲所有可用信息。特別是,更改事件包含已更改記錄的舊狀態(tài)和新狀態(tài)。另一方面,兩個接收器連接器都期望一條簡單的消息,該消息僅表示要寫入的記錄狀態(tài)。

Debezium 的 UnwrapFromEnvelope 單消息轉(zhuǎn)換 (SMT) 將復(fù)雜的更改事件結(jié)構(gòu)折疊為兩個接收器連接器所期望的相同的基于行的格式,并有效地充當上述兩種格式之間的消息轉(zhuǎn)換器。

四、從mysql同步數(shù)據(jù)到Elasticsearch和PostgreSQL數(shù)據(jù)庫

當所有組件啟動后,我們將注冊 Elasticsearch Sink 連接器寫入 Elasticsearch 實例。我們希望在源以及 PostgreSQL 和 Elasticsearch 中使用相同的密鑰(主 id):

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @es-sink.json

我們正在使用此注冊請求:

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}

該請求配置這些選項:

  • 1.從 Debezium 的更改數(shù)據(jù)消息中僅提取新行的狀態(tài)
  • 2.從密鑰結(jié)構(gòu)中提取 id 字段,然后將相同的密鑰用于源和兩個目標。這是為了解決 Elasticsearch 連接器僅支持數(shù)字類型和字符串作為鍵的事實。如果我們不提取 ID,則由于密鑰類型未知,消息將被連接器過濾掉。
  • 3.使用事件中的密鑰而不是生成合成密鑰
  • 4.事件將在 Elasticsearch 中注冊的類型

接下來我們將注冊 JDBC Sink 連接器寫入 PostgreSQL 數(shù)據(jù)庫:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @jdbc-sink.json

最后,必須設(shè)置源連接器:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @source.json

讓我們檢查一下數(shù)據(jù)庫和搜索服務(wù)器是否同步??蛻舯淼乃行卸紤?yīng)該在源數(shù)據(jù)庫 (MySQL) 以及目標數(shù)據(jù)庫 (Postgres) 和 Elasticsearch 中找到:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
curl 'http://localhost:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

在連接器仍在運行的情況下,我們可以向 MySQL 數(shù)據(jù)庫添加一個新行,然后檢查它是否已復(fù)制到 PostgreSQL 數(shù)據(jù)庫和 Elasticsearch 中:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)

curl 'http://localhost:9200/customers/_search?pretty'
...
{
  "_index" : "customers",
  "_type" : "customer",
  "_id" : "1005",
  "_score" : 1.0,
  "_source" : {
    "id" : 1005,
    "first_name" : "John",
    "last_name" : "Doe",
    "email" : "john.doe@example.com"
  }
}
...

五、總結(jié)

我們設(shè)置了一個復(fù)雜的流數(shù)據(jù)管道來將 MySQL 數(shù)據(jù)庫與另一個數(shù)據(jù)庫以及 Elasticsearch 實例同步。我們設(shè)法在所有系統(tǒng)中保留相同的標識符,這使我們能夠?qū)⒄麄€系統(tǒng)的記錄關(guān)聯(lián)起來。

將數(shù)據(jù)更改從主數(shù)據(jù)庫近乎實時地傳播到 Elasticsearch 等搜索引擎可以實現(xiàn)許多有趣的用例。除了全文搜索的不同應(yīng)用之外,我們還可以考慮使用 Kibana 創(chuàng)建儀表板和各種可視化效果,以進一步深入了解數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-544016.html

到了這里,關(guān)于Debezium系列之:基于debezium將mysql數(shù)據(jù)庫數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Debezium同步Mysql數(shù)據(jù)到Kafka

    Kafka:3.3.2 mysql-connector:1.8.1 (0)前提是安裝好mysql,開啟binlog (1)下載kafka (2)下載mysql-connector插件 (3)編輯配置文件 (4)啟動kafka自帶的zk (5)啟動kafka (6)啟動connect (7)調(diào)用api 注意:當成功調(diào)用api,創(chuàng)建此連接器后會有如下主題產(chǎn)生:dbhistory.inventory、mysql1、

    2024年02月10日
    瀏覽(22)
  • Debezium系列之:監(jiān)控 Debezium

    Debezium JMX相關(guān)的技術(shù)博客: Debezium系列之:安裝jmx導出器監(jiān)控debezium指標 Debezium系列之:為Debezium集群JMX頁面增加監(jiān)控,JMX頁面出現(xiàn)異常時發(fā)送飛書告警,確保任務(wù)能夠獲取debezium集群指標 Debezium系列之:深入解讀Debezium重要的jmx指標 Debezium系列之:mysql JMX metrics指標詳細解讀

    2024年02月11日
    瀏覽(27)
  • Debezium系列之:更新數(shù)據(jù),在數(shù)據(jù)的headers中添加數(shù)據(jù)發(fā)生變化的字段

    在保持debezium數(shù)據(jù)格式不變的情況下,更新數(shù)據(jù),把數(shù)據(jù)發(fā)生變化的字段和數(shù)據(jù)沒有發(fā)生變化的字段存放到headers中

    2024年02月15日
    瀏覽(16)
  • 從 MySQL 到 DolphinDB,Debezium + Kafka 數(shù)據(jù)同步實戰(zhàn)

    從 MySQL 到 DolphinDB,Debezium + Kafka 數(shù)據(jù)同步實戰(zhàn)

    Debezium 是一個開源的分布式平臺,用于實時捕獲和發(fā)布數(shù)據(jù)庫更改事件。它可以將關(guān)系型數(shù)據(jù)庫(如 MySQL、PostgreSQL、Oracle 等)的變更事件轉(zhuǎn)化為可觀察的流數(shù)據(jù),以供其他應(yīng)用程序?qū)崟r消費和處理。 本文中我們將采用 Debezium 與 Kafka 組合的方式來實現(xiàn)從 MySQL 到 DolphinDB 的數(shù)

    2024年02月02日
    瀏覽(26)
  • Debezium系列之:在 Kubernetes 上部署 Debezium

    K8s相關(guān)知識可以閱讀博主以下幾篇技術(shù)博客: K8s系列之:搭建高可用K8s v1.23.5集群詳細步驟,3個master節(jié)點,3個Node節(jié)點 K8s系列之:Pod的基本用法 k8s系列之:kubectl子命令詳解一 k8s系列之:kubectl子命令詳解二 更多K8s知識點詳見博主K8s系列文章 更多Debezium內(nèi)容請閱讀博主Debezi

    2024年02月11日
    瀏覽(28)
  • Debezium日常分享系列之:Debezium and TimescaleDB

    TimescaleDB 是一個開源數(shù)據(jù)庫,旨在使 SQL 對于時間序列數(shù)據(jù)具有可擴展性。它是作為 PostgreSQL 數(shù)據(jù)庫的擴展實現(xiàn)的。這一事實促使我們重新使用標準 Debezium PostgreSQL 連接器,并將 TimescaleDB 支持實現(xiàn)為單個消息轉(zhuǎn)換 (SMT)。 TimescaleDB 提供了三個基本構(gòu)建塊/概念: Hypertables Contin

    2024年01月17日
    瀏覽(20)
  • Debezium日常分享系列之:在 OpenShift 上部署 Debezium

    Debezium日常分享系列之:在 OpenShift 上部署 Debezium

    此過程用于在 Red Hat 的 OpenShift 容器平臺上設(shè)置 Debezium 連接器。要在 OpenShift 上進行開發(fā)或測試,您可以使用 CodeRady 容器。 為了使容器與集群上的其他工作負載分開,請為 Debezium 創(chuàng)建一個專用項目。在本文檔的其余部分中,將使用 debezium-example 命名空間: 對于 Debezium 部署,

    2024年02月16日
    瀏覽(25)
  • Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號

    Debezium日常分享系列之:向 Debezium 連接器發(fā)送信號

    Debezium 信號機制提供了一種修改連接器行為或觸發(fā)一次性操作(例如啟動表的臨時快照)的方法。要使用信號觸發(fā)連接器執(zhí)行指定操作,可以將連接器配置為使用以下一個或多個通道: 源信號通道:可以發(fā)出 SQL 命令將信號消息添加到專門的信令數(shù)據(jù)集合中。在源數(shù)據(jù)庫上創(chuàng)

    2024年02月03日
    瀏覽(24)
  • Debezium日常分享系列之:Debezium 信號發(fā)送和通知 - 第 1 部分

    本系列文章將介紹 Debezium 提供的信號和通知功能,并討論與平臺交互的可用渠道。在本系列的后續(xù)部分中,我們將更深入地研究自定義信令通道并探索其他主題,例如 JMX 信令和通知。 在當今互連的軟件應(yīng)用程序和系統(tǒng)中,與其他產(chǎn)品無縫集成對于構(gòu)建強大而高效的解決方案

    2024年02月16日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包