Redis系列1:深刻理解高性能Redis的本質(zhì)
Redis系列2:數(shù)據(jù)持久化提高可用性
Redis系列3:高可用之主從架構(gòu)
Redis系列4:高可用之Sentinel(哨兵模式)
Redis系列5:深入分析Cluster 集群模式
追求性能極致:Redis6.0的多線程模型
追求性能極致:客戶端緩存帶來的革命
Redis系列8:Bitmap實現(xiàn)億萬級數(shù)據(jù)計算
Redis系列9:Geo 類型賦能億級地圖位置計算
Redis系列10:HyperLogLog實現(xiàn)海量數(shù)據(jù)基數(shù)統(tǒng)計
Redis系列11:內(nèi)存淘汰策略
Redis系列12:Redis 的事務機制
Redis系列13:分布式鎖實現(xiàn)
1 介紹
在分布式系統(tǒng)中,很重要的一個能力就是消息中間件。我們通過消息隊列實現(xiàn) 功能解耦、消息有序性、消息路由、異步處理、流量削峰 等能力。
目前主流的Mq主要有 RabbitMQ 、RocketMQ、kafka,可以參考這篇《MQ系列2:消息中間件技術(shù)選型》。
那除了這些主流MQ之外,咱們的這一節(jié)要說的Redis也具備實現(xiàn)消息隊列的能力。
我們來看看消息隊列主要要實現(xiàn)哪些能力,原理是什么,以及如何在 Redission 中應用。
2 關(guān)于消息隊列
2.1 什么是消息隊列
消息中間件是指在分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件。
消息中間件也可以稱消息隊列(Message Queue / MQ),用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成。通過提供消息傳遞和消息隊列模型,可以在分布式環(huán)境下擴展進程的通信。
簡而言之,互聯(lián)網(wǎng)場景中經(jīng)常使用消息中間件進行消息路由、訂閱發(fā)布、異步處理等操作,來緩解系統(tǒng)的壓力。
- Broker: 消息服務器,作為Server提供消息核心服務,一般會包含多個Q。
- Producer: 消息生產(chǎn)者,業(yè)務的發(fā)起方,負責生產(chǎn)消息傳輸給broker,
- Consumer: 消息消費者,業(yè)務的處理方,負責從broker獲取消息并進行業(yè)務邏輯處理
2.2 它解決了我們哪些問題
1、解耦: 比如說系統(tǒng)A會交給系統(tǒng)B去處理一些事情,但是A不想直接跟B有關(guān)聯(lián),避免耦合太強,就可以通過在A,B中間加入消息隊列,A將要任務的事情交給消息隊列 ,B訂閱消息隊列來執(zhí)行任務。
這種場景很常見,比如A是訂單系統(tǒng),B是庫存系統(tǒng),可以通過消息隊列把削減庫存的工作交予B系統(tǒng)去處理。如果A系統(tǒng)同時想讓B、C、D...多個系統(tǒng)處理問題的時候,這種優(yōu)勢就更加明顯了。
2、有序性: 先進先出原理,先來先處理,比如一個系統(tǒng)處理某件事需要很長一段時間,但是在處理這件事情時候,有其他人也發(fā)出了請求,可以把請求放在消息隊里,一個一個來處理。
對數(shù)據(jù)的順序性和一致性有強需求的業(yè)務,比如同一張銀行卡同時被多個入口使用,需要保證入賬出賬的順序性,避免出現(xiàn)數(shù)據(jù)不一致。
3、消息路由: 按照不同的規(guī)則,將隊列中消息發(fā)送到不同的其他隊列中
通過消息隊列將不同染色的請求發(fā)送到不同的服務去操作。這樣達成了流量按照業(yè)務拆分的目的。
4、異步處理: 處理一項任務的時候,有3個步驟A、B、C,需要先完成A操作, 然后做B、C 操作。任務執(zhí)行成功與否強依賴A的結(jié)果,但不依賴B、C 的結(jié)果。
如果我們使用串行的執(zhí)行方式,那處理任務的周期就會變長,系統(tǒng)的整體吞吐能力也會降低(在同一個系統(tǒng)中做異步其實也是比較大的開銷),所以使用消息隊列是比較好的辦法。
登錄操作就是典型的場景:A:執(zhí)行登錄并得到結(jié)果、B:記錄登錄日志、C:將用戶信息和Token寫入緩存。 執(zhí)行完A就可以從登錄頁跳到首頁了,B、C讓服務慢慢去消化,不阻塞當前操作。
5、削峰: 將峰值期間的操作削減,比如A同學的整個操作流程包含12個步驟,后續(xù)的11個步驟是不需要強關(guān)注結(jié)果的數(shù)據(jù),可以放在消息隊列中。
詳細可參考筆者這篇《MQ系列1:消息中間件執(zhí)行原理》。
2.3 消息隊列滿足的業(yè)務特性
2.3.1 消息有序性
正如上面提到的有序性一樣,他能夠保證消息按照生產(chǎn)的順序進行處理和消費,避免消息被無序處理的情況發(fā)生。
2.3.2 消息去重
同樣的,生產(chǎn)和消費的消息需要保證冪等性原理。避免出現(xiàn)重復執(zhí)行的情況,
而消息隊列的去重機制,也需要確保避免消息被重復消費的問題。
2.3.3 消息的可靠性傳輸
消息隊列的數(shù)據(jù)可以實現(xiàn)重試、持久化存儲、死信隊列記錄等,以避免消息無法成功傳遞所產(chǎn)生的不一致現(xiàn)象。
當消息服務器或者消費者恢復健康的時候,可以繼續(xù)讀取消息進行處理,防止消息遺漏。
3 使用Redis的List實現(xiàn)消息隊列
稍微學過數(shù)據(jù)結(jié)構(gòu)都知道。我們經(jīng)常說Queue(隊列),他的存儲和使用規(guī)則是【先進先出】,棧的存儲和使用規(guī)則是【先進后出】。
所以List本質(zhì)上是一個線性的有序結(jié)構(gòu),也就是Queue的存儲關(guān)系,它能夠保證消費的有序性,按照順序進行處理。
3.1 入列操作 LPUSH
即進行消息生產(chǎn),入列操作語法:
LPUSH key element[element...]
如果key存在,Producer 通過 LPUSH 將消息插入該隊列的頭部;如果 key 不存在,則是先創(chuàng)建一個空隊列,然后在進行數(shù)據(jù)插入。
下面舉個例子,往隊列中插入幾個消息,然后得到的返回值是插入消息的個數(shù)。
> LPUSH msg_queue msg1 msg2 msg3
(integer) 3
這邊往 key 為 msg_queue 的隊列中插入了三個消息 msg1、msg2、msg3。
3.2 出列操作 RPOP
即進行消息消費,消費的順序是先進先出(先生產(chǎn)先消費),出列使用的語法如下:
> RPOP msg_queue
"msg1"
> RPOP msg_queue
"msg2"
> RPOP msg_queue
"msg3"
> RPOP msg_queue
(nil)
都消費完成之后,就是nil了。
3.3 消費及時性問題
不同于常規(guī)的MQ,具備訂閱模式,消費者可以感知到有新的消息生產(chǎn)出來了,再進行消費。
List的問題在于,生產(chǎn)者向隊列插入數(shù)據(jù)的時候,List 并不會主動通知消費者,所以消費者做不到及時消費。
為了保證消費的及時,可能需要做一個心跳包(1秒執(zhí)行一次),不斷地執(zhí)行 RPOP 指令,當探測到有新消息就會取出消息進行消費,沒有消息的時候就返回nil。
但是這種也存在明顯的短板,就是不斷的調(diào)用 RPOP 指令,占用 I/O 資源和CPU資源。
比較好的解決辦法就是在隊列為空隊列的時候,暫停讀取,等有消息入列的時候,恢復取數(shù)和消費的工作,這樣也避免了無效的資源浪費。
Redis 提供了 BLPOP、BRPOP ,無數(shù)據(jù)的時候自動阻塞讀取的命令,有新消息進入的時候,恢復消息取數(shù),如下:
# BRPOP key timeout
BRPOP msg_queue 0
命令最后一個參數(shù) timeout 是超時時間,單位是秒,如果 timeout 大于0,則到達指定的秒數(shù)即使沒有彈出成功也會返回,如果 timeout 的值為0,則會一直阻塞等待其他連接向列表中插入元素, timeout 參數(shù)不允許為負數(shù)。
3.4 消息的重復消費問題
目前 List 沒有純冪等的鑒別能力,但是可以通過以下兩種方法來實現(xiàn):
- List為每一條消息生成一個 Glocal ID,重復的Glocal ID 不進行重復消費。
- Producer在生產(chǎn)消息的時候在消息中創(chuàng)建一個Glocal ID,當消費的時候把Glocal ID Record一下,后續(xù)的消費先判斷再消費,避免重復消費同一個消息。
這樣就保證了對于同一條消息,消費者始終只處理一次,結(jié)果始終保持一致。
3.5 消息的可靠性傳輸問題
可靠性傳輸我們在MQ篇章用了一整節(jié)來介紹持久化存儲、消息ACK 、二次記錄保障。這邊我們也來看看Redis List中的可靠性傳輸?shù)谋U稀?br>
Redis中缺少了一個消息確認(ACK)的機制,如果消費數(shù)據(jù)的時候運行崩潰了,沒有確認機制,很可能這條消息就被錯過了,無法保證數(shù)據(jù)的一致性。
解決方案:Redis 提供了 RPOPLPUSH
指令,當List讀取消息的時候,會同步的把該消息復制到另外一個List以作備份。
整個操作過程是具備原子性的,避免讀取消息了,但是同步備份不成功。
如果出現(xiàn)處理消息出現(xiàn)故障的情況,在故障回復之后,可以從備份的List中復制消息繼續(xù)消費。操作如下:
# 生產(chǎn)消息 msg1 msg2
> LPUSH list_queue msg1 msg2
(integer) 2
# 消費消息并同步到備份
> RPOPLPUSH list_queue list_queue_bak
"msg1"
# 當發(fā)生故障的時候去消費備份的數(shù)據(jù),可以消費到
> RPOP list_queue_bak
"msg1"
如果消費成功則把 list_queue_bak 消息刪除即可,如果發(fā)生故障,則可以繼續(xù)從 list_queue_bak 再次讀取消息處理。文章來源:http://www.zghlxwxcb.cn/news/detail-471891.html
4 使用 Redission 實現(xiàn)隊列能力
這邊以Java SpringBoot為例子進行說明,可以參考官方文檔。文章來源地址http://www.zghlxwxcb.cn/news/detail-471891.html
4.1 添加maven依賴 和 配置基本連接
# maven信息
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.8</version>
</dependency>
# 基本配置
spring:
application:
name: redission_test
redis:
host: x.x.x.x
port: 6379
ssl: false
password: xxxx.xxxx
4.2 Java程序?qū)崿F(xiàn)
@Slf4j
@Service
public class RedisQueueService {
@Autowired
private RedissonClient redissonClient;
private static final String REDIS_QUEUE = "listQueue";
/**
* 消息生產(chǎn)
*
* @param msg
*/
public void msgProduce(String msg) {
RBlockingDeque<String> blockDeque = redissonClient.getBlockingDeque(REDIS_QUEUE);
try {
blockDeque.putFirst(msg); // 消息寫入隊列頭部
} catch (InterruptedException e) {
log.error(e.printStackTrace());
}
}
/**
* 消息消費:阻塞
*/
public void msgConsume() {
RBlockingDeque<String> blockDeque = redissonClient.getBlockingDeque(REDIS_QUEUE);
Boolen isCheck = true;
while (isCheck) {
try {
String msg = blockDeque.takeLast(); // 從隊列中取出消息
} catch (InterruptedException e) {
log.error(e.printStackTrace());
}
}
}
5 總結(jié)
- Redis中使用List 數(shù)據(jù)結(jié)構(gòu)實現(xiàn)消息隊列,滿足FIFO的處理機制,使用 RPOP 進行消息讀取。
- 使用 BRPOP 指令處理消費及時性問題
- 使用 BRPOPLPUSH 命令進行消息數(shù)據(jù)備份,解決消息可靠性傳輸問題。
- 相對于專業(yè)的MQ,如kafka和RocketMQ,處理能力會差很多。所以在在消息量不大的場景中使用,可以作為一個比較不錯的消息隊列解決方案。但是過于復雜的場景容易造成消息堆積。
到了這里,關(guān)于Redis系列14:使用List實現(xiàn)消息隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!