1. 背景
ClickHouse全稱是Click Stream,Data WareHouse,是一款高性能的OLAP數(shù)據(jù)庫,既使用了ROLAP模型,又擁有著比肩MOLAP的性能。我們可以用ClickHouse用來做分析平臺快速出數(shù)。其中的bitmap結(jié)構(gòu)方便我們對人群進(jìn)行交并。Bitmap位圖的每一位表示一個數(shù)據(jù)(比如說一個用戶)。假設(shè)5億用戶(一個用戶4個字節(jié)),則需要5 * 10^8 * 4byte = 3GB,而壓縮到32位的bitmap里只需要2^32bit = 512MB。壓縮空間的同時,利用位運(yùn)算還能快速處理人群集合。
常見的bitmap使用方法是將數(shù)據(jù)按維度打成窄表<user_id, attr_name, attr_value>,這里user_id已經(jīng)mapping成int類型,然后明細(xì)存入ClickHouse中,并對attr_name or attr_value進(jìn)行g(shù)roup by聚合,最后對id進(jìn)行g(shù)roupBitmapState (groupBitmap | ClickHouse Docs) 壓縮成bitmap二進(jìn)制格式存儲到數(shù)據(jù)庫里。導(dǎo)入方法可以是使用定時批量進(jìn)行顯式導(dǎo)入,或者使用ClickHouse物化視圖的能力隱式導(dǎo)入。
總所周知,ClickHouse是一個MPP式的分布式數(shù)據(jù)庫,引擎既負(fù)責(zé)計(jì)算又負(fù)責(zé)存儲,因此ClickHouse的節(jié)點(diǎn)一般選擇性能不錯(也就是價格不便宜)的計(jì)算機(jī)。而且由于ClickHouse的任務(wù)和節(jié)點(diǎn)強(qiáng)綁定,因此節(jié)點(diǎn)數(shù)也不宜過多。在使用Replicated*引擎的時候(屬于主從復(fù)制,利用ZooKeeper選出自己的主副本),節(jié)點(diǎn)過多會影響分布式性能。ClickHouse的Distributed表原則是誰執(zhí)行誰負(fù)責(zé),每個節(jié)點(diǎn)都負(fù)責(zé)把各個分片(shard)的數(shù)據(jù)發(fā)到其他分片上,節(jié)點(diǎn)過多則會增加傳輸故障的風(fēng)險。簡而言之,直接壓縮bitmap對ClickHouse集群壓力很大,而且消耗很高。
因此,我們希望可以預(yù)先計(jì)算bitmap的方式,用于分?jǐn)侰lickHouse集群的壓力。
2. 方案和實(shí)現(xiàn)
相對于MPP架構(gòu),還有另一種shared nothing架構(gòu),那就是hadoop生態(tài)的批處理架構(gòu)。批處理架構(gòu)和MPP架構(gòu)的一大區(qū)別在于任務(wù)和節(jié)點(diǎn)分離。由于批處理架構(gòu)有主從結(jié)構(gòu)的存在,主節(jié)點(diǎn)主要做調(diào)配工作就可以了,如果有一臺從節(jié)點(diǎn)變慢了那就給它分配更少的task。因此批處理架構(gòu)可以使用大量廉價機(jī)器。如果我們可以利用hadoop生態(tài)事先對數(shù)據(jù)進(jìn)行預(yù)處理,最后輸出到ClickHouse,我們就即可以減少ClickHouse集群的負(fù)擔(dān)而且可以加大結(jié)點(diǎn)數(shù)增加速度。
所以,這里要做的,就是將已經(jīng)存入tdw的hive明細(xì)表,轉(zhuǎn)成ClickHouse的bitmap類型。
本人更熟悉Python生態(tài)圈,因此選用了PySpark來完成這個任務(wù)(對于scala/java生態(tài)我也走通并復(fù)現(xiàn)了文章SparkSQL & ClickHouse RoaringBitmap使用實(shí)踐_spark clickhouse bitmap-CSDN博客的細(xì)節(jié),有需要的可以私下交流,這里也不鋪開了)。我們解決問題的方式,是從簡單到復(fù)雜到最后實(shí)現(xiàn)。
2.1 PySpark udf聚合bitmap
第一步要解決的問題,是如何將多個用戶壓縮到bitmap里,這里我們先假定已經(jīng)有了一個bitmap類,add user進(jìn)去就可以了。我們先按照傳統(tǒng)做法,對user進(jìn)行id映射,將string類型的id轉(zhuǎn)化成int,然后將維度表打成窄表,類似于如下建表語句:
并讀取hive表,做一些需要的過濾和處理操作,載入到spark dataframe里。
PySpark可以使用udf很方便地去實(shí)現(xiàn)自定義操作。它有兩種udf,自帶的udf和pandas_udf,但是按照本人的經(jīng)驗(yàn),pandas_udf雖然更方便也更快(默認(rèn)提供了group by),但是不同的spark版本會有一些意想不到的bug,比如0之前會有偶發(fā)性的rdd位置錯亂。所以在不確定公司內(nèi)部具體spark情況,穩(wěn)妥起見使用自帶udf(pyspark.sql.functions.udf — PySpark 3.1.1 documentation, Python Aggregate UDFs in PySpark - Dan Vatterott )。
對于udf,我們可以使用collect_list,去實(shí)現(xiàn)group by后傳入udf輸出單列的功能。代碼類似于如下:
udf函數(shù)的輸入是一個list,返回了一個單值作為一個新的udf列。到這里,我們就完成了第一個部分。
2.2 bitmap二進(jìn)制序列化
接下來第二部分是最麻煩的部分,就是如何實(shí)現(xiàn)上文的bitmap。這里,這個bitmap的特性是序列化成二進(jìn)制后可以被ClickHouse讀取使用。
我們需要知道ClickHouse怎么使用bitmap的。這里對照著ClickHouse的源碼(AggregateFunctionGroupBitmapData.h)和ClickHouse遇見RoaringBitmap-CSDN博客這篇文章的解讀,我們可以一覽ClickHouse bitmap的設(shè)計(jì)和實(shí)現(xiàn)。
ClickHouse并不是簡單的壓縮。在文檔里也可以看到(https://clickhouse.com/docs/en/sql-reference/functions/bitmap-functions/):對于位圖基數(shù)cardinality小于等于32的時候,使用了Set對象(源碼里叫做SmallSet),對于大于32的時候,使用了RoaringBitmap對象。最后的輸出,是將bitmap序列化成一個緊密排列的二進(jìn)制對象。
這是寫入方法:https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionGroupBitmapData.h#L122-L138
我們先來看看小于等于32的情況,使用了smallset(ClickHouse使用內(nèi)部實(shí)現(xiàn)的SmallSet<T, small_set_size> https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HyperLogLogWithSmallSetOptimization.h#L28),它在數(shù)據(jù)量小的時候會更快。它分為3部分,1 byte占位符(這里是0,表示小于等于32),1 byte的基數(shù)(2^8次方,但這里基數(shù)最大是2^5=32),和byte array(這里注意,是一個數(shù)字占了4 byte)。PySpark代碼如下:
這里對byte array用一個for循環(huán)將bitmap輸入進(jìn)去,最后用b""將整個二進(jìn)制對象的數(shù)組拼接成一個完全的二進(jìn)制對象。
接下來看大于32的情況,它也是3部分,1 byte占位符(這里是1,表示大于32),bitmap數(shù)的Valint表示,最后一部分是bitmap的序列化直接做字節(jié)。
這里補(bǔ)充一下,無論是Smallset還是RoaringBitmap,ClickHouse的bitmap都是使用了小端序,如果沒有指定,可能會產(chǎn)生一些奇怪的結(jié)果。python的大小端序語法可參見通過python理解大端小端_小端是高位補(bǔ)零-CSDN博客。
第二部分使用Varint用來表示第三部分RoaringBitmap最后序列化后的字節(jié)數(shù)。在ClickHouse的源碼可見(ClickHouse/src/IO/VarInt.h at master · ClickHouse/ClickHouse · GitHub)。
Varint的長度計(jì)算在ClickHouse/src/IO/VarInt.h at master · ClickHouse/ClickHouse · GitHub。不過python的bytes不需要,這里可以跳過,對java/scala的ByteBuffer就需要(需要預(yù)先預(yù)留字節(jié)位)。Varint的計(jì)算邏輯見Carl's Blog和ClickHouse源碼閱讀(0000 1101) —— ClickHouse是如何讀寫數(shù)據(jù)的(readVarUInt和writeVarUInt方法解析)_clickhouse writevaruint-CSDN博客 。講得很詳細(xì),這里不贅述。
第三部分是整個ClickHouse bitmap的核心,這里使用了RoaringBitmap(GitHub - RoaringBitmap/RoaringBitmap: A better compressed bitset in Java: used by Apache Spark, Netflix Atlas, Apache Pinot, Tablesaw, and many others)。位圖bitmap的問題是數(shù)據(jù)越稀疏,空間就越浪費(fèi)。有很多算法嘗試解決這個問題,壓縮位圖,比如WAH、EWAH、Concise等,RoaringBitmap是其中的佼佼者。它的主要思路是對32位無符號整數(shù)高低分桶。具體可見(高效壓縮位圖RoaringBitmap的原理與應(yīng)用 - 簡書 )
RoaringBitmap有不同的實(shí)現(xiàn),對于ClickHouse,使用的是CRoaring(https://github.com/RoaringBitmap/CRoaring)。各種語言的api實(shí)現(xiàn)都共享同一種序列化格式,這讓我們可以使用python/java/scala進(jìn)行序列化,然后用c++(ClickHouse)進(jìn)行讀取。格式具體見(GitHub - RoaringBitmap/RoaringFormatSpec: Specification of the compressed-bitmap Roaring format),官方說法如下:
這里我們使用了python版本的api(https://pyroaringbitmap.readthedocs.io/en/stable),源碼(https://github.com/Ezibenroc/PyRoaringBitMap)。pip安裝,然后需要對整個spark集群都更新這個library。這個庫底層仍是CRoaring,因?yàn)閜ython很容易對c/c++做wrapper封裝。
PySpark入ClickHouse的代碼如下:
這里,get_statistics()["cardinality"]獲取基數(shù),sys.getsizeof(rb)可以獲取rb序列后的字節(jié)數(shù),見Expose roaring_bitmap_portable_size_in_bytes as __sizeof__. by urdvr · Pull Request #40 · Ezibenroc/PyRoaringBitMap · GitHub (類似的,java API里是rb.serializedSizeInBytes()),rb.serialize()按照RoaringBitmap的序列化規(guī)則進(jìn)行序列化,這里是已經(jīng)封裝好了。
在這里其實(shí)本人嘗試了很多方案,也繞了很多彎路,比如使用了bytearray去承接byte數(shù)組(bytearray的值最大是256),比如直接存python struct(https://docs.python.org/zh-cn/3/library/struct.html)的格式而不是string后的base64,比如to_bytes沒指定大小端序,對源碼結(jié)構(gòu)不熟沒指定正確的位數(shù)等等,太多就不贅述了。給個腦圖:
在這一步的最后,我們需要對bitmap進(jìn)行抽樣檢測。這個是從SparkSQL & ClickHouse RoaringBitmap使用實(shí)踐_spark clickhouse bitmap-CSDN博客里學(xué)到的,我們對二進(jìn)制對象,進(jìn)行base64編碼(encode)再轉(zhuǎn)成string類型,可以做到快速可視化二進(jìn)制并來校驗(yàn)我們的結(jié)果是否正確。
base64編碼是使用64個可打印的ASCII字符將二進(jìn)制字節(jié)序列數(shù)據(jù)編碼成字符串。通過使用base64將二進(jìn)制數(shù)據(jù)序列化為二進(jìn)制字符串來實(shí)現(xiàn)將內(nèi)存里的二進(jìn)制字節(jié)固化到磁盤或者網(wǎng)絡(luò)傳輸。具體資料可見 Base64編碼及編碼性能測試_base64性能-CSDN博客 。
簡單使用一個ClickHouse SQL去執(zhí)行一下,并編碼成base64的字符串
結(jié)果是
同樣地,我們打印在PySpark里的dataframe的值,可以看到
兩者的ASCII碼是一致的,說明我們的字節(jié)已經(jīng)做到了兼容ClickHouse讀取格式。
2.3 hive bitmap出庫ClickHouse
最后一步,我們已經(jīng)有了一個預(yù)處理好的兼容ClickHouse bitmap的hive表,我們需要hive出庫到ClickHouse里。
由于公司的tdw工具沒法往hive表里寫入binary類型,我們只能考慮直接寫入base64的string進(jìn)去。這里對id聚合,然后打成bitmap再寫入新的字段bmp_base64。建表語句如下:
利用ClickHouse自帶的物化表達(dá)式(CREATE TABLE | ClickHouse Docs ),我們可以很容易新增一列bitmap列。MATERIALIZED關(guān)鍵字的含義接近于DEFAULT和ALIAS,也就是默認(rèn)值。此外,當(dāng) SELECT 中有星號(select *)時,查詢也不返回這一列。這是為了在把用SELECT * 得到的數(shù)據(jù)插到回表中的時候,不指定列名也不會改變數(shù)據(jù)。
ClickHouse建表語句如下:
本地表:
分布式表:
select attr_value, bitmapCardinality(bitmap) from user_attrbute_bmp_di_cluster可以得到如下樣例:
文章來源:http://www.zghlxwxcb.cn/news/detail-854373.html
至此,我們的Spark-ClickHouse的bitmap之旅到了一段落了。文章來源地址http://www.zghlxwxcb.cn/news/detail-854373.html
3. 總結(jié)
- 減少了ClickHouse集群計(jì)算壓力,大大縮短了hive表出庫時間(不需要出庫明細(xì)數(shù)據(jù),只需要出庫對應(yīng)的bitmap)。
- 可以將明細(xì)長時間存于hive表里,而不用留在ClickHouse中(物化視圖需要明細(xì)),降低存儲成本。
- 使用Python生態(tài)圈,減少數(shù)據(jù)同學(xué)的上手難度,同時也打通了其他Python分析工具。
到了這里,關(guān)于PySpark預(yù)計(jì)算ClickHouse Bitmap實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!