一、TimescaleDB
TimescaleDB 是一個開源數(shù)據(jù)庫,旨在使 SQL 對于時間序列數(shù)據(jù)具有可擴展性。它是作為 PostgreSQL 數(shù)據(jù)庫的擴展實現(xiàn)的。這一事實促使我們重新使用標(biāo)準(zhǔn) Debezium PostgreSQL 連接器,并將 TimescaleDB 支持實現(xiàn)為單個消息轉(zhuǎn)換 (SMT)。
TimescaleDB 提供了三個基本構(gòu)建塊/概念:
- Hypertables
- Continuous aggregates
- Compression
描述實例定義的元數(shù)據(jù)(目錄)和原始數(shù)據(jù)通常存儲在 _timescaledb_internal_schema 中。TimescaleDb SMT 連接到數(shù)據(jù)庫并讀取和處理元數(shù)據(jù)。然后,從數(shù)據(jù)庫讀取的原始消息會使用存儲在 Kafka Connect 標(biāo)頭中的元數(shù)據(jù)進(jìn)行豐富,從而創(chuàng)建物理數(shù)據(jù)和 TimescaleDB 邏輯結(jié)構(gòu)之間的關(guān)系。
二、完整案例
Debezium 示例存儲庫包含基于 Docker Compose 的部署,該部署提供了完整的環(huán)境來演示 TimescaleDB 集成。
第一步,開始部署
$ docker-compose -f docker-compose-timescaledb.yaml up --build
該命令將啟動 Debezium(Zookeeper、Kafka、Kafka Connect)和源 TimescaleDB 數(shù)據(jù)庫。
啟動的數(shù)據(jù)庫已準(zhǔn)備好以下數(shù)據(jù)庫對象:
- 將溫度和濕度測量值表示為時間序列數(shù)據(jù)的超穩(wěn)定條件;使用 DDL
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time')
- 測量數(shù)據(jù)的單一記錄
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)
PostgreSQL 出版物用于將時間序列數(shù)據(jù)發(fā)布到復(fù)制槽中,因為演示使用 pgoutput 解碼插件
CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')
下一步需要注冊 Debezium PostgreSQL 連接器以捕獲數(shù)據(jù)庫中的更改
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml
注冊請求文件與常規(guī)文件不同,增加了這些行
{
"name": "inventory-connector",
"config": {
...
"schema.include.list": "_timescaledb_internal",
"transforms": "timescaledb",
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
"transforms.timescaledb.database.hostname": "timescaledb",
"transforms.timescaledb.database.port": "5432",
"transforms.timescaledb.database.user": "postgres",
"transforms.timescaledb.database.password": "postgres",
"transforms.timescaledb.database.dbname": "postgres"
}
}
三、Hypertables
連接器將捕獲內(nèi)部 TimescaleDB 架構(gòu)以及包含原始數(shù)據(jù)的物理表,并且將應(yīng)用 TimescaleDb SMT 來豐富消息并根據(jù)邏輯名稱將它們路由到正確命名的主題。 SMT 配置選項包含連接到數(shù)據(jù)庫所需的信息。在這種情況下,條件超表將物理存儲在 _timescaledb_internal._hyper_1_1_chunk 中,并且當(dāng)由 SMT 處理時,它將重新路由到根據(jù)固定配置的前綴 timescaledb 和邏輯名稱 public.conditions 命名的 timescaledb.public.conditions 主題符合超表名稱。
讓我們在表中添加更多測量值
docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);
并讀取捕獲的主題消息(在命令中啟用打印密鑰和標(biāo)題)
docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property print.headers=true \
--topic timescaledb.public.conditions
這些消息包含兩個標(biāo)頭 debezium_timescaledb_chunk_table:_hyper_1_1_chunk、debezium_timescaledb_chunk_schema:_timescaledb_internal,它們描述了邏輯超表名稱與從中捕獲它們的物理源表之間的映射。
四、Continuous aggregates
連續(xù)聚合對存儲在超表中的數(shù)據(jù)提供自動統(tǒng)計計算。聚合被定義為物化視圖,由其自己的超表支持,而超表又由一組物理表支持。重新計算聚合后(手動或自動),新值將存儲在超表中,可以從中捕獲和流式傳輸這些值。連接器捕獲物理表中的新值,SMT 通過將物理目標(biāo)重新映射回聚合邏輯名稱來再次解決路由問題。還添加了帶有原始超表和物理表名稱的 Kafka Connect 標(biāo)頭。
讓我們創(chuàng)建一個名為conditions_summary的連續(xù)聚合,用于計算每個位置和時間間隔的平均、最低和最高溫度
postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket;
并閱讀捕獲的主題消息
docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property print.headers=true \
--topic timescaledb.public.conditions_summary
這些消息包含兩個標(biāo)頭 debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal 公開哪個支持超表用于存儲聚合,以及兩個附加標(biāo)頭 debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal 公開存儲聚合的物理表。
`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.
如果添加新的測量并觸發(fā)聚合重新計算,則更新的聚合將發(fā)送到主題
postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);
看起來像
{
"schema":{
...
},
"payload":{
"before":null,
"after":{
"location":"Ostrava",
"bucket":"2024-01-09T13:00:00.000000Z",
"avg":10.0,
"max":10.0,
"min":10.0
},
"source":{
"version":"2.5.0.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1704806938840,
"snapshot":"false",
"db":"postgres",
"sequence":"[\"29727872\",\"29728440\"]",
"schema":"public",
"table":"conditions_summary",
"txId":764,
"lsn":29728440,
"xmin":null
},
"op":"c",
"ts_ms":1704806939163,
"transaction":null
}
}
因此,該主題包含針對兩個不同位置計算的兩條或多條消息。
五、Compression
TimescaleDB SMT 不會增強壓縮數(shù)據(jù)塊(物理表記錄),而只是將其作為存儲在超表中的副產(chǎn)品。壓縮后的數(shù)據(jù)被捕獲并存儲在 Kafka 主題中。通常,帶有壓縮塊的消息會被丟棄,并且不會被管道中的后續(xù)作業(yè)處理。
讓我們?yōu)槌韱⒂脡嚎s并壓縮它
postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');
show_chunks
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
(1 row)
postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');
消息寫入 timescaledb._timescaledb_internal._compressed_hypertable_3。
停止服務(wù)
docker-compose -f docker-compose-timescaledb.yaml down
六、結(jié)論
在這篇文章中,我們演示了從 TimescaleDB 時間序列數(shù)據(jù)庫捕獲數(shù)據(jù)以及通過 TimescaleDb SMT 對其進(jìn)行處理。我們已經(jīng)展示了如何根據(jù)作為數(shù)據(jù)源的超表和連續(xù)聚合來路由和豐富消息。文章來源:http://www.zghlxwxcb.cn/news/detail-798894.html
深入了解Debezium請閱讀博主專欄:文章來源地址http://www.zghlxwxcb.cn/news/detail-798894.html
- Debezium專欄
到了這里,關(guān)于Debezium日常分享系列之:Debezium and TimescaleDB的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!