本文通過示例介紹如何處理ClickHouse實時更新。OLAP數據庫并不歡迎數據變更操作,ClickHouse也不例外,和其他OLAP產品一樣,剛開始ClickHouse甚至不支持更新,更新能力是后來才加上的,但是按照ClickHouse方式增加的。當前ClickHouse更新是異步的,使得在交互應用中難以使用。有很多場景中用戶需要修改已存在的數據并期望立即看到,ClickHouse如何滿足這樣需求呢。
ClickHouse更新歷史
早在2016年,當時ClickHouse不支持數據修改,ClickHouse團隊發(fā)布文章“ClickHouse如何更新數據” ,僅使用特殊的插入結構用于模擬更新,數據最終有分區(qū)刪除。
在GDPR(General Data Protection Regulation)的壓力下,ClickHouse團隊在2018年發(fā)布了更新和刪除。該文章ClickHouse中的更新和刪除,仍然是Altinity博客中閱讀量最大的文章之一。這些異步、非原子更新使用通過ALTER TABLE UPDATE 語句實現的,并可能打亂大量數據,對于批量操作和少量更新且不急于看到最終結果場景是有用的。標準的SQL更新仍沒有,盡管它們每年都出現在產品路線中。如果我們確實需要實時更新,則必須使用其他方法實現。下面基于實際應用場景對比ClickHouse不同實現方式。
使用場景
假設系統(tǒng)產生各類報警信息,用戶和機器學習算法不斷地查詢數據庫獲取新的報警信息并確認,確認操作需要修改報警記錄,一旦確認則報警記錄不會再次出現在用戶視圖中,這看起來像是ClickHouse所不熟悉的OLTP操作。
因為不能使用更新,需要使用插入新記錄代替。一旦數據庫中有兩個記錄,我們需要有效方式獲得最新的記錄。下面嘗試三種方式實現。
ReplacingMergeTree
首先創(chuàng)建表存儲報警信息:
CREATE TABLE alerts(
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data String,
acked UInt8 DEFAULT 0,
ack_time DateTime DEFAULT toDateTime(0),
ack_user LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);
為了簡化,所有報警信息打包放在alert_data列中,實際可能包括幾十個或幾百個列信息。另外alert_id在示例中是隨機字符串。
注意ReplacingMergeTree引擎,是基于order by 子句指定的字段判斷重復,如果兩條記錄重復,則最新記錄保留,最新記錄有ack_time字段決定。去重操作在后端合并操作中執(zhí)行,不會立刻發(fā)生,不會保證什么時間發(fā)生。所以需要關心一致性查詢結果。ClickHouse有特殊語法實現,后面就要提及。
在運行查詢之前,需要填充表一些數據,我們生成1000個租戶的10M報警數據:
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
toUInt32(rand(1)%1000+1) AS tenant_id,
randomPrintableASCII(64) as alert_id,
toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
randomPrintableASCII(1024) as alert_data
FROM numbers(10000000);
接下來,讓我們確認99%的警報,為’ ack_user ‘、’ ack_user ‘和’ ack_time '列提供新值。不是更新,而是插入新行。
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data, 1 as acked,
concat('user', toString(rand()%1000)) as ack_user, now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;
如果現在查詢表,可以看到:
SELECT count() FROM alerts
┌──count()─┐
│ 19898060 │
└──────────┘
1 rows in set. Elapsed: 0.008 sec.
顯然已確認和未確認的行都在表中,替代還沒有發(fā)生。為了看到最后數據,可以使用final關鍵字。
SELECT count() FROM alerts FINAL
┌──count()─┐
│ 10000000 │
└──────────┘
1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.)
數量是正確的,但看查詢時間,使用final 關鍵字ClickHouse必須掃描所有行,并在查詢時按排序鍵進行合并。雖然獲得正確結果,但開銷很大?,F在看僅過濾尚未確認的行是否性能更好。
SELECT count() FROM alerts FINAL WHERE NOT acked
┌─count()─┐
│ 101940 │
└─────────┘
1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.)
查詢時間和處理的數據量沒有明顯差異,盡管計數要小得多。過濾無助于加快查詢速度。隨著表大小的增長,成本可能會更大。它無法擴展。
注意:為了可讀性,所有的查詢和查詢時間都像在“clickhouse-client”中運行一樣。事實上,我們多次嘗試查詢,以確保結果一致,并與“clickhouse-benchmark”實用程序進行確認。
查詢全部表意義不大,那么在我們的示例中還能使用ReplacingMergeTree引擎嗎?下面選擇隨機某個租戶,即該租戶下所有為確認的報警信息,想象該用戶正在看展示屏幕。因為alert_data是隨機數,這里僅計算下校驗和,就是為了看下結果:
SELECT
count(),
sum(cityHash64(*)) AS data
FROM alerts FINAL
WHERE (tenant_id = 451) AND (NOT acked)
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)
相當快,278毫秒查詢所有未確認數據。為什么這次快?過濾條件不同,tenant_id是主鍵的一部分,所以ClickHouse能在final之前過濾數據,這時ReplacingMergeTree有效的。
下面嘗試查詢某用戶已確認的數據。列的基數是相同的——我們有1000個用戶,可以試試user451。
SELECT count() FROM alerts FINAL
WHERE (ack_user = 'user451') AND acked
┌─count()─┐
│ 9725 │
└─────────┘
1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)
這個查詢很慢,因為沒有使用索引,ClickHouse掃描了19.04M行記錄。我們不能增加ack_user作為索引,這樣會破壞ReplacingMergeTree 語義。我們嘗試采用prewhere:
SELECT count() FROM alerts FINAL
PREWHERE (ack_user = 'user451') AND acked
┌─count()─┐
│ 9725 │
└─────────┘
1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)
PREWHERE是ClickHouse以不同方式應用過濾器的特殊方式。通常ClickHouse足夠聰明,可以自動將條件移動到PREWHERE,所以用戶不應該在意。但這個示例需要我們顯示指定。
Aggregate Functions
ClickHouse提供了大量的聚集函數,最新版本超過100多個,結合12個聚集合并器,極大地滿足了用戶需求。我們的示例僅需要三個函數:‘argMax’, ‘max’ and ‘any’.
下面使用argMax聚集函數查詢相同的租戶:
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
argMax(alert_data, ack_time) alert_data,
argMax(acked, ack_time) acked,
max(ack_time) ack_time_,
argMax(ack_user, ack_time) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)
結果相同,但性能提升了4倍。這是ClickHouse聚集能力。缺點是查詢變得復雜,但我們可以簡化。我們注意到,當確認報警信息時,僅需要更新三列:
- acked: 0 => 1
- ack_tiem: 0 => now()
- ack_user: ‘’ => ‘user1’
三個值都在增加,所以可以使用max代替argMax。既然不改變alert_data, 就不需要任何實際聚集函數,ClickHouse提供any函數實現該功能,它返回任何一個值,減少額外開銷。
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
any(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)
查詢變得簡單了,速度也快了一點!原因是使用’ any ‘函數,ClickHouse不需要計算’ alert_data ‘列上的’ max ’ !
AggregatingMergeTree
AggregatingMergeTree 是ClickHouse最強特性之一。結合物化視圖能夠實現實時數據聚合。既然前面示例需要使用聚集函數,直接適應AggregatingMergeTree 會更好嗎?實際提升不明顯。因為每行僅更新一次,僅有兩行需要聚合為一組。對于這種場景,AggregatingMergeTree 不是最佳選項。
但可以結合需求變點戲法。需求中報警信息首先插入非確認信息,然后變成確認信息,一旦用戶確認了,三個字段需要修改。如果其他列不重復存儲會節(jié)約空間、提升性能。
首先創(chuàng)建AggregatingMergeTree 表引擎,使用max聚合函數。代替使用max,可以any,但需要列為非空,any會選擇非空值。
DROP TABLE alerts_amt_max;
CREATE TABLE alerts_amt_max (
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data SimpleAggregateFunction(any, String),
acked SimpleAggregateFunction(max, UInt8),
ack_time SimpleAggregateFunction(max, DateTime),
ack_user SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);
既然原始數據是隨機的,我們就使用已存在的表數據進行填充。像之前一樣進行兩次插入,分別為非確認報警信息和確認信息。
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
INSERT INTO alerts_amt_max
SELECT tenant_id, alert_id, timestamp,
'' as alert_data,
acked, ack_time, ack_user
FROM alerts WHERE acked;
注意,對于alert_data字段插入空字符串,因為不需要存儲兩次。聚合函數會獲取非空值,其他列保持缺省值不變。一旦有了數據,現在檢查下數據大?。?/p>
SELECT
table,
sum(rows) AS r,
sum(data_compressed_bytes) AS c,
sum(data_uncompressed_bytes) AS uc,
uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'last_state')
GROUP BY table
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
│ alerts │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │
│ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘
我們幾乎沒有壓縮,多虧了隨機字符串。但是aggregate要小兩倍,因為我們不需要存儲兩次’ alerts_data '?,F在讓我們嘗試對聚合表進行查詢:
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
max(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts_amt_max
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)
多虧了AggregatingMergeTree,我們處理的數據更少了(40MB比之前的82MB),而且現在效率更高了。
實現更新
ClickHouse將盡其所能在后臺合并數據,刪除重復行并執(zhí)行聚合。但有時需要強制合并數據,為了釋放磁盤空間。如使用OPTIMIZE FINAL 語句,但該語句是阻塞的、且昂貴的操作。因此不能頻繁執(zhí)行,讓我們看看它是否對查詢性能有任何影響。
OPTIMIZE TABLE alerts FINAL
Ok.
0 rows in set. Elapsed: 105.675 sec.
OPTIMIZE TABLE alerts_amt_max FINAL
Ok.
0 rows in set. Elapsed: 70.121 sec.
執(zhí)行后兩者數量相同:
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
│ alerts │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
│ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘
不同方法之間的性能差異變得不那么顯著。以下是匯總表:
** After inserts** | After OPTIMIZE FINAL | |
---|---|---|
ReplacingMergeTree FINAL | 0.278 | 0.037 |
argMax | 0.059 | 0.034 |
any/max | 0.055 | 0.029 |
AggregatingMergeTree | 0.036 | 0.026 |
總結
ClickHouse提供豐富的工具集處理實時更新,如:ReplacingMergeTree, CollapsingMergeTree (本文未提及), AggregatingMergeTree 和aggregate 函數。所有這些方法都有三個共性:
- 數據通過插入新版本進行修改,插入在ClickHouse中很快
- 有多種有效方法實現類似OLTP中的更新語義
- 實際修改不會立刻發(fā)生
具體選擇哪種方法依賴具體應用場景。ReplacingMergeTree對用戶來說是最直接、方便,但一般用于數據量為中小量級或數據僅通過主鍵查詢場景。使用聚集函數更靈活,性能也不錯,但需要寫相對復雜的查詢。AggregatingMergeTree可以節(jié)約空間,僅保留修改列。這些都是ClickHouse DB設計師的好工具,可以在需要的時候使用。文章來源:http://www.zghlxwxcb.cn/news/detail-554243.html
參考文檔:https://dzone.com/articles/handling-real-time-updates-in-clickhouse文章來源地址http://www.zghlxwxcb.cn/news/detail-554243.html
到了這里,關于ClickHouse如何處理實時更新的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!