一、背景
- 基于 Debezium 的端到端數(shù)據(jù)流用例,將數(shù)據(jù)流式傳輸?shù)?Elasticsearch 服務(wù)器,以利用其出色的功能對我們的數(shù)據(jù)進行全文搜索。
- 同時把數(shù)據(jù)流式傳輸?shù)?PostgreSQL 數(shù)據(jù)庫,通過 SQL 查詢語言來優(yōu)化對數(shù)據(jù)的訪問。
二、技術(shù)路線
下面的圖表顯示了數(shù)據(jù)如何流經(jīng)我們的分布式系統(tǒng)。首先,Debezium MySQL 連接器不斷捕獲 MySQL 數(shù)據(jù)庫中的更改,并將每個表的更改發(fā)送到單獨的 Kafka 主題。然后,Confluence JDBC 接收器連接器不斷讀取這些主題并將事件寫入 PostgreSQL 數(shù)據(jù)庫。同時,Confluence Elasticsearch 連接器不斷讀取這些相同的主題并將事件寫入 Elasticsearch。
我們將把這些組件部署到幾個不同的進程中。在此示例中,我們將所有三個連接器部署到單個 Kafka Connect 實例,該實例將代表所有連接器向 Kafka 寫入和讀取(在生產(chǎn)中,可能需要將連接器分開以實現(xiàn)更好的性能)。
三、配置
我們將使用此 Docker Compose 文件來快速部署演示。該部署由以下 Docker 映像組成:
-
Apache ZooKeeper
-
Apache Kafka
-
一個豐富的 Kafka Connect / Debezium 圖像,有一些變化:
- PostgreSQL JDBC 驅(qū)動程序放置在 /kafka/libs 目錄中
- Confluence JDBC 連接器放置在 /kafka/connect/kafka-connect-jdbc 目錄中
-
MySQL
-
PostgreSQL
-
Elasticsearch
Debezium 源連接器以及 JDBC 和 Elasticsearch 連接器的消息格式不同,因為它們是單獨開發(fā)的,并且各自關(guān)注的目標略有不同。 Debezium 發(fā)出更復(fù)雜的事件結(jié)構(gòu),以便捕獲所有可用信息。特別是,更改事件包含已更改記錄的舊狀態(tài)和新狀態(tài)。另一方面,兩個接收器連接器都期望一條簡單的消息,該消息僅表示要寫入的記錄狀態(tài)。
Debezium 的 UnwrapFromEnvelope 單消息轉(zhuǎn)換 (SMT) 將復(fù)雜的更改事件結(jié)構(gòu)折疊為兩個接收器連接器所期望的相同的基于行的格式,并有效地充當上述兩種格式之間的消息轉(zhuǎn)換器。
四、從mysql同步數(shù)據(jù)到Elasticsearch和PostgreSQL數(shù)據(jù)庫
當所有組件啟動后,我們將注冊 Elasticsearch Sink 連接器寫入 Elasticsearch 實例。我們希望在源以及 PostgreSQL 和 Elasticsearch 中使用相同的密鑰(主 id):
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d @es-sink.json
我們正在使用此注冊請求:
{
{
"name": "elastic-sink",
"config": {
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "http://elastic:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", (1)
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
"transforms.key.field": "id", (2)
"key.ignore": "false", (3)
"type.name": "customer" (4)
}
}
}
該請求配置這些選項:
- 1.從 Debezium 的更改數(shù)據(jù)消息中僅提取新行的狀態(tài)
- 2.從密鑰結(jié)構(gòu)中提取 id 字段,然后將相同的密鑰用于源和兩個目標。這是為了解決 Elasticsearch 連接器僅支持數(shù)字類型和字符串作為鍵的事實。如果我們不提取 ID,則由于密鑰類型未知,消息將被連接器過濾掉。
- 3.使用事件中的密鑰而不是生成合成密鑰
- 4.事件將在 Elasticsearch 中注冊的類型
接下來我們將注冊 JDBC Sink 連接器寫入 PostgreSQL 數(shù)據(jù)庫:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d @jdbc-sink.json
最后,必須設(shè)置源連接器:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d @source.json
讓我們檢查一下數(shù)據(jù)庫和搜索服務(wù)器是否同步??蛻舯淼乃行卸紤?yīng)該在源數(shù)據(jù)庫 (MySQL) 以及目標數(shù)據(jù)庫 (Postgres) 和 Elasticsearch 中找到:
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
curl 'http://localhost:9200/customers/_search?pretty'
{
"took" : 42,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 1.0,
"hits" : [
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar",
"email" : "annek@noanswer.org"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
}
]
}
}
在連接器仍在運行的情況下,我們可以向 MySQL 數(shù)據(jù)庫添加一個新行,然后檢查它是否已復(fù)制到 PostgreSQL 數(shù)據(jù)庫和 Elasticsearch 中:
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Doe | 1005 | John | john.doe@example.com
(5 rows)
curl 'http://localhost:9200/customers/_search?pretty'
...
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "John",
"last_name" : "Doe",
"email" : "john.doe@example.com"
}
}
...
五、總結(jié)
我們設(shè)置了一個復(fù)雜的流數(shù)據(jù)管道來將 MySQL 數(shù)據(jù)庫與另一個數(shù)據(jù)庫以及 Elasticsearch 實例同步。我們設(shè)法在所有系統(tǒng)中保留相同的標識符,這使我們能夠?qū)⒄麄€系統(tǒng)的記錄關(guān)聯(lián)起來。文章來源:http://www.zghlxwxcb.cn/news/detail-544016.html
將數(shù)據(jù)更改從主數(shù)據(jù)庫近乎實時地傳播到 Elasticsearch 等搜索引擎可以實現(xiàn)許多有趣的用例。除了全文搜索的不同應(yīng)用之外,我們還可以考慮使用 Kibana 創(chuàng)建儀表板和各種可視化效果,以進一步深入了解數(shù)據(jù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-544016.html
到了這里,關(guān)于Debezium系列之:基于debezium將mysql數(shù)據(jù)庫數(shù)據(jù)更改流式傳輸?shù)?Elasticsearch和PostgreSQL數(shù)據(jù)庫的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!