Clickhouse分布式表引擎(Distributed)寫入核心原理解析
- Clickhouse分布式表引擎(Distributed)寫入核心原理解析
- Clickhouse分布式表引擎(Distributed)查詢核心原理解析
Distributed表引擎是分布式表的代名詞,它自身不存儲(chǔ)任何數(shù)據(jù),而是作為數(shù)據(jù)分片的透明代理,能夠自動(dòng)路由數(shù)據(jù)至集群中的各個(gè)節(jié)點(diǎn),所以Distributed表引擎需要和其他數(shù)據(jù)表引擎一起協(xié)同工作。
從實(shí)體表層面來看,一張分片表由兩部分組成:
- 本地表:通常以_local為后綴進(jìn)行命名。本地表是承載數(shù)據(jù)的載體,可以使用非Distributed的任意表引擎,一張本地表對(duì)應(yīng)了一個(gè)數(shù)據(jù)分片。
- 分布式表:通常以_dist為后綴進(jìn)行命名。分布式表只能使用Distributed表引擎,它與本地表形成一對(duì)多的映射關(guān)系,日后將通過分布式表代理操作多張本地表。
對(duì)于分布式表與本地表之間表結(jié)構(gòu)的一致性檢查,Distributed表引擎采用了讀時(shí)檢查的機(jī)制,這意味著如果它們的表結(jié)構(gòu)不兼容,只有在查詢時(shí)才會(huì)拋出錯(cuò)誤,而在創(chuàng)建表時(shí)并不會(huì)進(jìn)行檢查。
定義形式
Distributed表引擎的定義形式如下所示:
ENGINE = Distributed(cluster, database, table, [,sharding_key])
其中,各個(gè)參數(shù)的含義分別如下:
- cluster:集群名稱,與集群配置中的自定義名稱相對(duì)應(yīng)。在對(duì)分布式表執(zhí)行寫入和查詢的過程中,它會(huì)使用集群的配置信息來找到相應(yīng)的host節(jié)點(diǎn)。
- database和table:分別對(duì)應(yīng)數(shù)據(jù)庫和表的名稱,分布式表使用這組配置映射到本地表。
- sharding_key:分片鍵,選填參數(shù)。在數(shù)據(jù)寫入的過程中,分布式表會(huì)依據(jù)分片鍵的規(guī)則,將數(shù)據(jù)分布到各個(gè)host節(jié)點(diǎn)的本地表。
比如如下分布式表:
CREATE TABLE test_shard_dist on cluster ch_cluster(
`id` Int8
) ENGINE=Distributed('ch_cluster', 'test', 'test_shard_local', rand());
上述建表語句將分片鍵指定為rand()函數(shù),此時(shí)在數(shù)據(jù)寫入時(shí)會(huì)根據(jù)隨機(jī)函數(shù)的取值決定數(shù)據(jù)寫入哪個(gè)分片。
值得注意的是,此時(shí)對(duì)應(yīng)的本地表還未創(chuàng)建,所以從這里也可以看出來,Distributed表運(yùn)用的是讀時(shí)檢查的機(jī)制,對(duì)創(chuàng)建分布式表和本地表的順序并沒有強(qiáng)制要求。
接著,創(chuàng)建本地表,一張本地表代表著一個(gè)數(shù)據(jù)分片。
CREATE TABLE test_shard_local on cluster ch_cluster(
`id` Int8
)ENGINE=MergeTree()
order by id
partition by id
在本次案例中,我們的ch_cluster由三個(gè)節(jié)點(diǎn)組成,因此會(huì)在三個(gè)節(jié)點(diǎn)上都創(chuàng)建出對(duì)應(yīng)的本地表和分布式表。
分布式寫入的核心流程
在向集群內(nèi)的分片寫入數(shù)據(jù)時(shí),通常有兩種思路:一種是借助外部計(jì)算系統(tǒng),事先將數(shù)據(jù)均勻分片,再借由計(jì)算系統(tǒng)直接將數(shù)據(jù)寫入Clickhouse集群的各個(gè)本地表。如下圖所示:
在這個(gè)流程中,繼續(xù)使用集群ch_cluster的示例,該集群由2個(gè)分片和0個(gè)副本組成。整個(gè)流程從上至下按照時(shí)間順序進(jìn)行,其大致分為5個(gè)步驟:
(1)在第一個(gè)分片節(jié)點(diǎn)寫入本地分片數(shù)據(jù)
首先在Linux121節(jié)點(diǎn),對(duì)分布式表test_shard_dist執(zhí)行INSERT查詢,嘗試寫入10、30、200、55四行數(shù)據(jù)。
執(zhí)行后分布式表主要會(huì)做兩件事情:
- 根據(jù)分片規(guī)則劃分?jǐn)?shù)據(jù),將不同的數(shù)據(jù)標(biāo)記劃分給不同的節(jié)點(diǎn)插入。
- 將屬于當(dāng)前分片的數(shù)據(jù)直接寫入本地表test_shard_local
(2)第一個(gè)分片建立遠(yuǎn)端連接,準(zhǔn)備發(fā)送遠(yuǎn)端分片數(shù)據(jù)
將歸至遠(yuǎn)端分片的數(shù)據(jù)以分區(qū)為單位,分別寫入test_shard_all存儲(chǔ)目錄下的臨時(shí)bin文件,如下圖所示:
這里的1.bin是一個(gè)[increase_num].bin,有幾個(gè)shard分片,則依次遞增。
(3)第一個(gè)分片向遠(yuǎn)端分片發(fā)送數(shù)據(jù)
此時(shí),Clickhouse會(huì)有另一組監(jiān)聽任務(wù)負(fù)責(zé)監(jiān)聽/test_shard_dist目錄下的文件變化,這些任務(wù)負(fù)責(zé)將目錄數(shù)據(jù)發(fā)送至遠(yuǎn)端分片。
<Debug> test.test_shard_dist.DirectoryMonitor: Started processing `/var/lib/clickhouse/data/test/test_shard_dist/shard2_replica1/1.bin` (2.00 rows, 2.00 B bytes)
<Debug> test.test_shard_dist.DirectoryMonitor: Started processing `/var/lib/clickhouse/data/test/test_shard_dist/shard3_all_replicas/2.bin` (2.00 rows, 2.00 B bytes)
從Linux121節(jié)點(diǎn)的Clickhouse日志中我們可以看到,其負(fù)責(zé)將分片數(shù)據(jù)發(fā)送到對(duì)應(yīng)的遠(yuǎn)端節(jié)點(diǎn)上。我這里一共有三個(gè)節(jié)點(diǎn),所以Linux121節(jié)點(diǎn)將分片數(shù)據(jù)發(fā)往對(duì)應(yīng)的Linux122、Linux123節(jié)點(diǎn),分別對(duì)應(yīng)數(shù)據(jù)塊1.bin和2.bin。
其中需要注意的是,每份目錄將會(huì)由獨(dú)立的線程負(fù)責(zé)發(fā)送,數(shù)據(jù)在傳輸之前會(huì)被壓縮。
(4)第二個(gè)分片接收數(shù)據(jù)并寫入本地
Linux122和Linux123節(jié)點(diǎn)確認(rèn)建立與Linux121的連接
<Trace> Connection (linux121:9000): Connected to ClickHouse server version 22.4.6.
在接收到來自Linux121節(jié)點(diǎn)發(fā)送的數(shù)據(jù)后,將它們寫入本地表中
Linux122節(jié)點(diǎn)服務(wù)日志:
<Debug> executeQuery: (from [::ffff:192.168.80.121]:56626, initial_query_id: 8579dc90-d839-4e63-8442-b31a8c7fe6cf) INSERT INTO test.test_shard_local (id) VALUES
<Trace> ContextAccess (default): Access granted: INSERT(id) ON test.test_shard_local
<Trace> test.test_shard_local (e43a2707-58a8-4bfb-8615-d9b175debdfe): Renaming temporary part tmp_insert_10_1_1_0 to 10_1_1_0
Linux123節(jié)點(diǎn)服務(wù)日志:
<Debug> executeQuery: (from [::ffff:192.168.80.121]:44996, initial_query_id: 8579dc90-d839-4e63-8442-b31a8c7fe6cf) INSERT INTO test.test_shard_local (id) VALUES
<Trace> ContextAccess (default): Access granted: INSERT(id) ON test.test_shard_local
<Trace> test.test_shard_local (e43a2707-58a8-4bfb-8615-d9b175debdfe): Renaming temporary part tmp_insert_-56_1_1_0 to -56_1_1_0.
接收數(shù)據(jù)–>執(zhí)行寫入命令->重命名臨時(shí)分區(qū)->實(shí)際分區(qū)(這里原表是按id作為分區(qū)的,所以是-56_1_1_0)
(5)由第一個(gè)分片確認(rèn)完成寫入
最后,還是由Linux121節(jié)點(diǎn)確認(rèn)所有的數(shù)據(jù)發(fā)送完畢:
<Trace> test.test_shard_dist.DirectoryMonitor: Finished processing `/var/lib/clickhouse/data/test/test_shard_dist/shard2_replica1/1.bin` (took 5 ms)
<Trace> test.test_shard_dist.DirectoryMonitor: Finished processing `/var/lib/clickhouse/data/test/test_shard_dist/shard3_all_replicas/2.bin` (took 5 ms)
至此,整個(gè)流程結(jié)束。
可以看到,在整個(gè)流程中,Distributed表負(fù)責(zé)所有分片的寫入工作。本著誰執(zhí)行誰負(fù)責(zé)的原則,在這個(gè)示例中,由Linux121節(jié)點(diǎn)的分布式表負(fù)責(zé)切分?jǐn)?shù)據(jù),并向所有其他分片節(jié)點(diǎn)發(fā)送數(shù)據(jù)。
在由Distributed表負(fù)責(zé)向遠(yuǎn)端分片發(fā)送數(shù)據(jù)時(shí),有異步寫入和同步寫入兩種模式:如果是異步寫,則在Distributed表寫完本地分片之后,INSERT查詢就會(huì)返回成功寫入的信息。如果是同步寫,則在執(zhí)行INSERT查詢之后,會(huì)等待所有分片完成寫入。
使用何種模式由參數(shù)insert_distributed_sync
參數(shù)控制,默認(rèn)為false,即為異步寫。如果將其設(shè)置為true,則可以進(jìn)一步通過insert_distributed_timeout
參數(shù)控制同步等待的超時(shí)時(shí)間。
分布式寫入時(shí)副本復(fù)制數(shù)據(jù)的核心流程
如果在集群的配置中包含了副本,那么除了剛才的分片寫入流程之外,還會(huì)觸發(fā)副本數(shù)據(jù)的復(fù)制流程。數(shù)據(jù)在多個(gè)副本之間,有兩種復(fù)制實(shí)現(xiàn)方式:一種是繼續(xù)借助Distributed表引擎,由它將數(shù)據(jù)寫入副本。另一種則是借助ReplicatedMergeTree表引擎實(shí)現(xiàn)副本數(shù)據(jù)的分發(fā)。
(1)通過Distributed復(fù)制數(shù)據(jù)
在這種實(shí)現(xiàn)方式下,即使本地表不使用ReplicatedMergeTree表引擎,也能實(shí)現(xiàn)數(shù)據(jù)副本的功能。Distributed會(huì)同時(shí)負(fù)責(zé)分片和副本的數(shù)據(jù)寫入工作,而副本數(shù)據(jù)的寫入流程與分片邏輯相同。
比如對(duì)于下面的配置
<!-- 1個(gè)分片,1個(gè)副本 -->
<ch_cluster>
<shard>
<replica>
<host>linux121</host>
<port>9000</port>
</replica>
<replica>
<host>linuxxxx</host>
<port>9000</port>
</replica>
</shard>
</ch_cluster>
Linux121和linuxxxx互為副本,此時(shí),按照上面介紹分布式數(shù)據(jù)寫入的邏輯,Linux121的分布式表不僅負(fù)責(zé)將數(shù)據(jù)寫入本地,還需要負(fù)責(zé)將數(shù)據(jù)發(fā)往副本所在的節(jié)點(diǎn)。
總結(jié),用這種方式時(shí),向Distributed表寫入數(shù)據(jù),它會(huì)負(fù)責(zé)將數(shù)據(jù)寫入集群內(nèi)的每個(gè)replica
細(xì)心的朋友應(yīng)該能發(fā)現(xiàn),在這種實(shí)現(xiàn)方案下,Distributed節(jié)點(diǎn)需要同時(shí)負(fù)責(zé)分片和副本的數(shù)據(jù)寫入工作,它很可能會(huì)成為寫入的單點(diǎn)瓶頸, 所有就有了接下來將要說明的第二種方案。
(2)通過ReplicatedMergeTree復(fù)制數(shù)據(jù)
如果在集群的shard配置中增加internal_replication
參數(shù)并將其設(shè)置為true(默認(rèn)為false),那么Distributed表在該shard中只會(huì)選擇一個(gè)合適的replica并對(duì)其寫入數(shù)據(jù)。此時(shí),如果使用ReplicatedMergeTree作為本地表的引擎,則在該shard內(nèi),多個(gè)replica副本之間的數(shù)據(jù)復(fù)制會(huì)交由ReplicatedMergeTree自己處理,不再由Distributed負(fù)責(zé),從而為其減負(fù)。
在shard中選擇replica的算法大致如下:首選,在Clickhouse的服務(wù)節(jié)點(diǎn)中,擁有一個(gè)全局及數(shù)據(jù)器errors_count。當(dāng)服務(wù)出現(xiàn)任何異常時(shí),該計(jì)數(shù)器累加1。接著,當(dāng)一個(gè)shard內(nèi)擁有多個(gè)replica時(shí),選擇errors_count錯(cuò)誤最少的那個(gè)。
至此,我們介紹了關(guān)于Clickhouse使用分布式表寫入的核心流程原理和副本復(fù)制原理。文章來源:http://www.zghlxwxcb.cn/news/detail-426907.html
但是在此還是需要注意,通過上面我們介紹可以知道,在使用Distributed表引擎寫入時(shí),由分布式表寫入的節(jié)點(diǎn)負(fù)責(zé)將數(shù)據(jù)分片,并發(fā)送到集群中的其他節(jié)點(diǎn)中,這種情況在數(shù)據(jù)量比較大時(shí),很有可能造成寫入的單點(diǎn)瓶頸。同時(shí)加重網(wǎng)絡(luò)中傳輸?shù)臄?shù)據(jù)量,容易造成網(wǎng)絡(luò)擁塞。因此,在實(shí)際生產(chǎn)中,更建議直接寫本地表。文章來源地址http://www.zghlxwxcb.cn/news/detail-426907.html
到了這里,關(guān)于Clickhouse分布式表引擎(Distributed)寫入核心原理解析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!