一.消息隊(duì)列
消息隊(duì)列:分布式系統(tǒng)必備的一個基礎(chǔ)軟件,能支持組件通信消息的快速讀寫
Redis本身支持?jǐn)?shù)據(jù)的快速訪問,滿足消息隊(duì)列的讀寫性能需求
二.Redis適合做消息隊(duì)列嗎?
消息隊(duì)列的消息存取需求
消息隊(duì)列存取消息的過程
- 在分布式系統(tǒng)中,兩個組件要基于消息隊(duì)列進(jìn)行通信,一個組件就會把要處理的數(shù)據(jù)以消息的形式傳遞給消息隊(duì)列,然后這個組件就可以繼續(xù)執(zhí)行其他操作;
- 遠(yuǎn)端的另一個組件從消息隊(duì)列中把消息讀取出來,在本地進(jìn)行處理。
需求:
- 組件1需要對采集到的數(shù)據(jù)進(jìn)行求和計(jì)算,并寫入數(shù)據(jù)庫;
- 消息到達(dá)速度很快,組件1沒有辦法及時既做采集又做計(jì)算,并寫入數(shù)據(jù)庫。
解決方案:
消息隊(duì)列:
- 組件1把數(shù)據(jù)x和y保存為JSON格式的消息,再把它發(fā)送到消息隊(duì)列,這樣就可以繼續(xù)接受新的數(shù)據(jù)。
- 組件2從消息隊(duì)列中 把數(shù)據(jù)讀取出來在服務(wù)器2上進(jìn)行求和計(jì)算上,再寫入數(shù)據(jù)庫。
通用的消息隊(duì)列的架構(gòu)模型:
? ? ??
消息隊(duì)列存取消息時候,必須要滿足的三個需求:
- ? ? 消息順序性?
- ? ? 消息冪等性
- ? ? 保證消息的可靠性
消息的順序性
? ? ?消息順序被消費(fèi)者異步處理,但是消費(fèi)者仍然按照生產(chǎn)者發(fā)送消息的順序來處理消息,避免后被發(fā)送的消息先被處理了。
? ? ?需求:對于消息順序性的場景來看,一旦出現(xiàn)消息亂序處理時,會導(dǎo)致業(yè)務(wù)邏輯被錯誤執(zhí)行,給業(yè)務(wù)方造成損失。
重復(fù)消息處理
? ? ?消費(fèi)者從 消息隊(duì)列讀取消息時,有時候會因?yàn)榫W(wǎng)絡(luò)堵塞出現(xiàn)消息重傳的情況。此時,消費(fèi)者可能會收到多條重復(fù)消息。對于重復(fù)消息,消費(fèi)者如果多次處理的話,可能造成一個業(yè)務(wù)邏輯被多次執(zhí)行,如果業(yè)務(wù)邏輯正好要修改數(shù)據(jù),就會出現(xiàn)數(shù)據(jù)被多次修改的問題。
消息可靠性
? ? ? 消費(fèi)者在處理消息的時候,可能出現(xiàn)因?yàn)楣收?或者宕機(jī)導(dǎo)致消息沒有處理完就丟失的情況。當(dāng)消費(fèi)者重啟時候,可以重新讀取消息再次進(jìn)行處理,否則就會 出現(xiàn)消息漏處理的問題。
Redis如何實(shí)現(xiàn)消息隊(duì)列的需求
? ? ?基于List消息隊(duì)列解決方案
? ? ?List本身就是按照先進(jìn)先出的順序對數(shù)據(jù)進(jìn)行存取,所以如果使用List作為消息隊(duì)列保存 消息的話,就可以滿足消息的順序性。
? ? 生產(chǎn)者使用LPUSH命令要把發(fā)送的消息依次寫入list,消費(fèi)者通過RPOP命令從LIST的另一端按照消息的寫入順序,依次讀取消息并處理。? ?
? ?存在問題:
? ? ?生產(chǎn)者往list寫入數(shù)據(jù)時,List并不會主動通知消費(fèi)者有新消息寫入,如果消費(fèi)者想要及時處理消息,就需要程序不斷調(diào)用RPOP命令(比如使用一個while(1)循環(huán)),如果新消息寫入,RPOP就會返回結(jié)果,否則,RPOP命令返回空值,再繼續(xù)循環(huán)。
? ? ?危害:
? ? ? ? 沒有新消息寫入LIST消費(fèi)者也要不停的調(diào)用RPOP命令,這就會導(dǎo)致消費(fèi)者程序cpu一直消耗在執(zhí)行RPOP命令上,帶來不必要的性能損失。
? ? 解決:
? ? ? ? ?Redis提供了BRPOP命令。BRPOP命令,也稱為阻塞式讀取,客戶端在沒有讀取到隊(duì)列數(shù)據(jù)時,自動阻塞,知道有新的數(shù)據(jù)寫入隊(duì)列,再開始讀取新數(shù)據(jù),和消費(fèi)者程序在自己不停調(diào)用RPOP命令相比,這種方式能節(jié)省CPU開銷。
? ? ? ??
重復(fù)消息的處理:消息的冪等性
? ? ? ?消費(fèi)者程序本身可以對重復(fù)消息進(jìn)行判斷。
? ? ? 消息隊(duì)列要能給每個消息提供全局唯一的ID號;另一方面,消費(fèi)者程序要把已經(jīng)處理過的消息ID記錄下來。當(dāng)收到一條消息后,消費(fèi)者程序可以對比收到的消息ID和記錄處理過的消息ID。來判斷當(dāng)前收到的消息有么有經(jīng)過處理。
? ? ?如果已經(jīng)處理 過了就不再處理了。這種處理特性被稱為消息 冪等性。
? ? ?冪等性:對于同一消息,消費(fèi)者收到生成一次的處理結(jié)果和收到多次的處理結(jié)果是一致的。
不過List本身不會為每個消息生成ID號的,所以,消息的全局唯一ID號就需要生產(chǎn)者程序發(fā)送消息前自行生成,生成之后,我們在用LPUSH命令把消息插入List中,需要在消息中包含這個全局唯一ID。
消息可靠性:
? ? ? List 類型是如何保證消息可靠性---?備份
? ? ?背景:? 消費(fèi)者List中讀取一條消息后,List就不會存留這條消息,所以如果消費(fèi)者程序在處理消息的過程中出現(xiàn)了故障或者宕機(jī),就會導(dǎo)致消息沒有處理完成,那么消費(fèi)者程序再次啟動就會導(dǎo)致消息丟失。
? ? 解決方案:為了存留消息,list提供了BRPOPLUSH命令,這個命令的作用就是讓消費(fèi)者從一個List中讀取消息,同時Redis會把這個消息再插入到另一個List(可以叫作備份 List)留存。
? ? ? 如果消費(fèi)者程序讀取了消息但是沒能正常處理,等它重啟以后就可以從備份List中重新讀取消息并進(jìn)行處理。
? ? ? 生產(chǎn)者消息發(fā)送很快,而消費(fèi)者處理消息的速度緩慢,這就導(dǎo)致List中消息堆積的很多,給Redis內(nèi)存帶來壓力。
? ? ?啟動多個消費(fèi)者程序組成消費(fèi)組,一起分擔(dān)處理 List中消息的消息。但是List類型并不支持消費(fèi)組的實(shí)現(xiàn)。
基于Stream消息隊(duì)列解決方案
streams是Redis專門為消息隊(duì)列設(shè)計(jì) 的數(shù)據(jù)類型:
- XADD插入消息,保證有序,可以自動生成全局唯一ID;
- XREAD用于讀取消息,可以按ID讀取數(shù)據(jù);
- XREADGROUP按消費(fèi)組的形式讀取消息;
- XPENDING和XACK:?XPENDING查詢每個消費(fèi)組內(nèi)所有消費(fèi)者已讀取但是尚未確認(rèn)消息,ASCK命令用于向消息隊(duì)列確認(rèn)消息處理已經(jīng)完成。
XADD命令
可以往消息隊(duì)列中插入新消息,消息的格式 是鍵-值對形式。對于插入的每一條消息,Streams可以自動為其生成一個全局唯一ID。
XADD mqstream * repo 5
"1599203861727-0"
可以往名稱為mqstream的消息隊(duì)列插入一條消息,消息的鍵為 repo, 值為5;
消息隊(duì)列中的* ,表示讓Redis為插入數(shù)據(jù)自動生成一個全局唯一的ID,例如"1599203861727-0"
也可以自行設(shè)定一個ID號,保證這個ID號是全局唯一的就行。不過使用*號會更加方便高效。
消息的全局唯一ID由兩部分組成
- ? ?第一部分"1599203861727"是指當(dāng)前時間戳 毫秒級
- ? ?第二部分表示插入消息在當(dāng)前毫秒內(nèi)的消息序列,這是從0開始編號的,
- ? “1599203861727-0”就表示在“1599203861727”毫秒內(nèi)的第 1 條消息。
XREAD 命令
? ? ? ?使用XREAD命令從消息隊(duì)列讀取
? ? ? ? XREAD在讀取消息時候,可以指定一個消息ID,并從這個消息ID的下一條消息開始進(jìn)行讀取。例如我們可以執(zhí)行下面的命令,從ID號為 1599203861727-0 的消息開始,讀取后續(xù)的所有消息:
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
2) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
3) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
消息者也可以在調(diào)用XREAD時設(shè)定block配置項(xiàng),實(shí)現(xiàn)類似于BRPOP的阻塞讀取操作。
當(dāng)消息隊(duì)列中沒有消息時,一旦設(shè)置了block配置項(xiàng),XREAD就會阻塞;
阻塞的時長可以在block配置項(xiàng)進(jìn)行設(shè)置。
XREAD block 10000 streams mqstream $
(nil)
(10.00s)
? ? ? ?,命令最后的$符號表示讀取最新消息,同時設(shè)置block 10000配置項(xiàng),1000的單位是毫秒,表示XREAD 在讀取最新消息時,如果沒有消息到來,XREAD 將阻塞 10000 毫秒(即 10 秒),然后再返回。上面命令中XREAD執(zhí)行后,消息隊(duì)列命令中mqstream 中一直沒有消息XREAD 在 10 秒后返回空值(nil)。
? ? ?
消費(fèi)組
? ? ??Stream本身可以使用XGROUP創(chuàng)建消費(fèi)組,創(chuàng)建消費(fèi)組后,Stream可以使用XREADGROUP命令讓消費(fèi)組內(nèi)的消費(fèi)者讀取消息
? ? ??
XGROUP create mqstream group1 0
ok
? ?我們再執(zhí)行一段命令,讓GROUP1消費(fèi)組中的消費(fèi)者consumer1 從 mqstream 中讀取所有消息
XREADGROUP group group1 cinsumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
2) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
3) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
4) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
讓group1消費(fèi)組里的消費(fèi)者consumer1從mqstream中讀取所有消息,
命令">"表示從第一天尚未被消費(fèi)的消息開始讀取。
因?yàn)樵赾onsumer1讀取消息前,group1并沒有其他消費(fèi)者讀取過消息,所以consumer1就得到了mqstream消息隊(duì)列中的所有消息。
消息隊(duì)列中的消息一旦被消費(fèi)組里的一個消息讀取了,就不能再被該消費(fèi)組內(nèi)的其他消費(fèi)者讀取。
我們繼續(xù)執(zhí)行下面命令
XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
比如說,我們執(zhí)行完剛才的 XREADGROUP 命令后,再執(zhí)行下面的命令,讓 group1 內(nèi)的 consumer2 讀取消息時,consumer2 讀到的就是空值,因?yàn)橄⒁呀?jīng)被 consumer1 讀取完了?
消費(fèi)組的目的?
? ? 讓組內(nèi)多個消費(fèi)者共同分擔(dān)讀取消息,通常會讓每個消費(fèi)者讀取部分消息,從而實(shí)現(xiàn)讓組內(nèi)的多個消費(fèi)者共同分擔(dān)讀取消息,實(shí)現(xiàn)消息讀取負(fù)載在多個消費(fèi)者間是均衡分布的。例如,我們執(zhí)行下列命令,讓 group2 中的 consumer1、2、3 各自讀取一條消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
保證消費(fèi)者在發(fā)生故障或者宕機(jī)再次重啟時,讓可以讀取未處理完的消息,stream會自動使用內(nèi)部隊(duì)列(PENDING List)留存消費(fèi)組里 每個消費(fèi)者讀取的消息;
直到消費(fèi)者使用XACK命令通知Streams消息已經(jīng)被處理完成。
如果消費(fèi)者沒有成功處理消息,他就不會給Stream發(fā)送XACK命令,消息仍然會留存。
此時消費(fèi)者可以在重啟后,用XPENDING 命令查看已讀取、但尚未確認(rèn)處理完成的消息。
XPEBDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
查看group2中各個消費(fèi)者已讀取,但是尚未確認(rèn)的消息個數(shù)。其中,XPENDING返回結(jié)果的第二行第三行分別表示group2中所有消費(fèi)者讀取的消息最小ID和最大ID。
XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)
consumer2 就可以使用 XACK 命令通知 Streams,然后這條消息就會被刪除。當(dāng)我們再使用 XPENDING 命令查看時,就可以看到,consumer2 已經(jīng)沒有已讀取、但尚未確認(rèn)處理的消息了。
基于List | 基于Streams | |
消息順序性 | LPUSH/RPOP | XADD/XREAD |
阻塞讀取 | BRPOP | XREAD block |
重復(fù)消息處理 | 生產(chǎn)者自行實(shí)現(xiàn)全局唯一ID | Streams自動生成全局唯一ID |
消息可靠性 | BRPOPLPUSH | 使用PENDING List自動存留消息,使用XPENDING查看,使XACK確認(rèn) |
適用場景 | Redis 5.0前版本 部署環(huán)境消息總量小 |
Redis 5.0以后版本文章來源:http://www.zghlxwxcb.cn/news/detail-407318.html 部署環(huán)境消息總量大,需要以消費(fèi)組的形式讀取數(shù)據(jù)文章來源地址http://www.zghlxwxcb.cn/news/detail-407318.html |
到了這里,關(guān)于Redis核心技術(shù)與實(shí)戰(zhàn)-學(xué)習(xí)筆記(十五):消息隊(duì)列(Redis的解決方案)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!