国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Redis系列14:使用List實現(xiàn)消息隊列

這篇具有很好參考價值的文章主要介紹了Redis系列14:使用List實現(xiàn)消息隊列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Redis系列1:深刻理解高性能Redis的本質(zhì)
Redis系列2:數(shù)據(jù)持久化提高可用性
Redis系列3:高可用之主從架構(gòu)
Redis系列4:高可用之Sentinel(哨兵模式)
Redis系列5:深入分析Cluster 集群模式
追求性能極致:Redis6.0的多線程模型
追求性能極致:客戶端緩存帶來的革命
Redis系列8:Bitmap實現(xiàn)億萬級數(shù)據(jù)計算
Redis系列9:Geo 類型賦能億級地圖位置計算
Redis系列10:HyperLogLog實現(xiàn)海量數(shù)據(jù)基數(shù)統(tǒng)計
Redis系列11:內(nèi)存淘汰策略
Redis系列12:Redis 的事務機制
Redis系列13:分布式鎖實現(xiàn)

1 介紹

在分布式系統(tǒng)中,很重要的一個能力就是消息中間件。我們通過消息隊列實現(xiàn) 功能解耦、消息有序性、消息路由、異步處理、流量削峰 等能力。
目前主流的Mq主要有 RabbitMQ 、RocketMQ、kafka,可以參考這篇《MQ系列2:消息中間件技術(shù)選型》。
那除了這些主流MQ之外,咱們的這一節(jié)要說的Redis也具備實現(xiàn)消息隊列的能力。
我們來看看消息隊列主要要實現(xiàn)哪些能力,原理是什么,以及如何在 Redission 中應用。

2 關(guān)于消息隊列

2.1 什么是消息隊列

消息中間件是指在分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件。
消息中間件也可以稱消息隊列(Message Queue / MQ),用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成。通過提供消息傳遞和消息隊列模型,可以在分布式環(huán)境下擴展進程的通信。
簡而言之,互聯(lián)網(wǎng)場景中經(jīng)常使用消息中間件進行消息路由、訂閱發(fā)布、異步處理等操作,來緩解系統(tǒng)的壓力。

Redis系列14:使用List實現(xiàn)消息隊列

  • Broker: 消息服務器,作為Server提供消息核心服務,一般會包含多個Q。
  • Producer: 消息生產(chǎn)者,業(yè)務的發(fā)起方,負責生產(chǎn)消息傳輸給broker,
  • Consumer: 消息消費者,業(yè)務的處理方,負責從broker獲取消息并進行業(yè)務邏輯處理

2.2 它解決了我們哪些問題

1、解耦: 比如說系統(tǒng)A會交給系統(tǒng)B去處理一些事情,但是A不想直接跟B有關(guān)聯(lián),避免耦合太強,就可以通過在A,B中間加入消息隊列,A將要任務的事情交給消息隊列 ,B訂閱消息隊列來執(zhí)行任務。

這種場景很常見,比如A是訂單系統(tǒng),B是庫存系統(tǒng),可以通過消息隊列把削減庫存的工作交予B系統(tǒng)去處理。如果A系統(tǒng)同時想讓B、C、D...多個系統(tǒng)處理問題的時候,這種優(yōu)勢就更加明顯了。

Redis系列14:使用List實現(xiàn)消息隊列

2、有序性: 先進先出原理,先來先處理,比如一個系統(tǒng)處理某件事需要很長一段時間,但是在處理這件事情時候,有其他人也發(fā)出了請求,可以把請求放在消息隊里,一個一個來處理。

對數(shù)據(jù)的順序性和一致性有強需求的業(yè)務,比如同一張銀行卡同時被多個入口使用,需要保證入賬出賬的順序性,避免出現(xiàn)數(shù)據(jù)不一致。

Redis系列14:使用List實現(xiàn)消息隊列

3、消息路由: 按照不同的規(guī)則,將隊列中消息發(fā)送到不同的其他隊列中

通過消息隊列將不同染色的請求發(fā)送到不同的服務去操作。這樣達成了流量按照業(yè)務拆分的目的。

4、異步處理: 處理一項任務的時候,有3個步驟A、B、C,需要先完成A操作, 然后做B、C 操作。任務執(zhí)行成功與否強依賴A的結(jié)果,但不依賴B、C 的結(jié)果。
如果我們使用串行的執(zhí)行方式,那處理任務的周期就會變長,系統(tǒng)的整體吞吐能力也會降低(在同一個系統(tǒng)中做異步其實也是比較大的開銷),所以使用消息隊列是比較好的辦法。

登錄操作就是典型的場景:A:執(zhí)行登錄并得到結(jié)果、B:記錄登錄日志、C:將用戶信息和Token寫入緩存。 執(zhí)行完A就可以從登錄頁跳到首頁了,B、C讓服務慢慢去消化,不阻塞當前操作。

Redis系列14:使用List實現(xiàn)消息隊列

5、削峰: 將峰值期間的操作削減,比如A同學的整個操作流程包含12個步驟,后續(xù)的11個步驟是不需要強關(guān)注結(jié)果的數(shù)據(jù),可以放在消息隊列中。

詳細可參考筆者這篇《MQ系列1:消息中間件執(zhí)行原理》。

2.3 消息隊列滿足的業(yè)務特性

2.3.1 消息有序性

正如上面提到的有序性一樣,他能夠保證消息按照生產(chǎn)的順序進行處理和消費,避免消息被無序處理的情況發(fā)生。

2.3.2 消息去重

同樣的,生產(chǎn)和消費的消息需要保證冪等性原理。避免出現(xiàn)重復執(zhí)行的情況,
而消息隊列的去重機制,也需要確保避免消息被重復消費的問題。

2.3.3 消息的可靠性傳輸

消息隊列的數(shù)據(jù)可以實現(xiàn)重試、持久化存儲、死信隊列記錄等,以避免消息無法成功傳遞所產(chǎn)生的不一致現(xiàn)象。
當消息服務器或者消費者恢復健康的時候,可以繼續(xù)讀取消息進行處理,防止消息遺漏。

3 使用Redis的List實現(xiàn)消息隊列

稍微學過數(shù)據(jù)結(jié)構(gòu)都知道。我們經(jīng)常說Queue(隊列),他的存儲和使用規(guī)則是【先進先出】,棧的存儲和使用規(guī)則是【先進后出】。
所以List本質(zhì)上是一個線性的有序結(jié)構(gòu),也就是Queue的存儲關(guān)系,它能夠保證消費的有序性,按照順序進行處理。

3.1 入列操作 LPUSH

即進行消息生產(chǎn),入列操作語法:

 LPUSH key element[element...] 

如果key存在,Producer 通過 LPUSH 將消息插入該隊列的頭部;如果 key 不存在,則是先創(chuàng)建一個空隊列,然后在進行數(shù)據(jù)插入。
下面舉個例子,往隊列中插入幾個消息,然后得到的返回值是插入消息的個數(shù)。

> LPUSH msg_queue msg1 msg2 msg3
(integer) 3

這邊往 key 為 msg_queue 的隊列中插入了三個消息 msg1、msg2、msg3。

3.2 出列操作 RPOP

即進行消息消費,消費的順序是先進先出(先生產(chǎn)先消費),出列使用的語法如下:

> RPOP msg_queue
"msg1"
> RPOP msg_queue
"msg2"
> RPOP msg_queue
"msg3"
> RPOP msg_queue
(nil)

都消費完成之后,就是nil了。
Redis系列14:使用List實現(xiàn)消息隊列

3.3 消費及時性問題

不同于常規(guī)的MQ,具備訂閱模式,消費者可以感知到有新的消息生產(chǎn)出來了,再進行消費。
List的問題在于,生產(chǎn)者向隊列插入數(shù)據(jù)的時候,List 并不會主動通知消費者,所以消費者做不到及時消費。
為了保證消費的及時,可能需要做一個心跳包(1秒執(zhí)行一次),不斷地執(zhí)行 RPOP 指令,當探測到有新消息就會取出消息進行消費,沒有消息的時候就返回nil。
但是這種也存在明顯的短板,就是不斷的調(diào)用 RPOP 指令,占用 I/O 資源和CPU資源。

比較好的解決辦法就是在隊列為空隊列的時候,暫停讀取,等有消息入列的時候,恢復取數(shù)和消費的工作,這樣也避免了無效的資源浪費。
Redis 提供了 BLPOP、BRPOP ,無數(shù)據(jù)的時候自動阻塞讀取的命令,有新消息進入的時候,恢復消息取數(shù),如下:

# BRPOP  key  timeout 
BRPOP  msg_queue  0

命令最后一個參數(shù) timeout 是超時時間,單位是秒,如果 timeout 大于0,則到達指定的秒數(shù)即使沒有彈出成功也會返回,如果 timeout 的值為0,則會一直阻塞等待其他連接向列表中插入元素, timeout 參數(shù)不允許為負數(shù)。

3.4 消息的重復消費問題

目前 List 沒有純冪等的鑒別能力,但是可以通過以下兩種方法來實現(xiàn):

  • List為每一條消息生成一個 Glocal ID,重復的Glocal ID 不進行重復消費。
  • Producer在生產(chǎn)消息的時候在消息中創(chuàng)建一個Glocal ID,當消費的時候把Glocal ID Record一下,后續(xù)的消費先判斷再消費,避免重復消費同一個消息。
    這樣就保證了對于同一條消息,消費者始終只處理一次,結(jié)果始終保持一致。

3.5 消息的可靠性傳輸問題

可靠性傳輸我們在MQ篇章用了一整節(jié)來介紹持久化存儲、消息ACK 、二次記錄保障。這邊我們也來看看Redis List中的可靠性傳輸?shù)谋U稀?br> Redis中缺少了一個消息確認(ACK)的機制,如果消費數(shù)據(jù)的時候運行崩潰了,沒有確認機制,很可能這條消息就被錯過了,無法保證數(shù)據(jù)的一致性。
解決方案:Redis 提供了 RPOPLPUSH 指令,當List讀取消息的時候,會同步的把該消息復制到另外一個List以作備份。
整個操作過程是具備原子性的,避免讀取消息了,但是同步備份不成功。

如果出現(xiàn)處理消息出現(xiàn)故障的情況,在故障回復之后,可以從備份的List中復制消息繼續(xù)消費。操作如下:

# 生產(chǎn)消息 msg1 msg2
> LPUSH list_queue msg1 msg2  
(integer) 2
# 消費消息并同步到備份
> RPOPLPUSH list_queue list_queue_bak
"msg1"
# 當發(fā)生故障的時候去消費備份的數(shù)據(jù),可以消費到
> RPOP list_queue_bak
"msg1"

如果消費成功則把 list_queue_bak 消息刪除即可,如果發(fā)生故障,則可以繼續(xù)從 list_queue_bak 再次讀取消息處理。
Redis系列14:使用List實現(xiàn)消息隊列

4 使用 Redission 實現(xiàn)隊列能力

這邊以Java SpringBoot為例子進行說明,可以參考官方文檔。文章來源地址http://www.zghlxwxcb.cn/news/detail-471891.html

4.1 添加maven依賴 和 配置基本連接

# maven信息
<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.8</version>
</dependency>
# 基本配置
spring:
  application:
    name: redission_test
  redis:
    host: x.x.x.x
    port: 6379
    ssl: false
    password: xxxx.xxxx

4.2 Java程序?qū)崿F(xiàn)

@Slf4j
@Service
public class RedisQueueService {

    @Autowired
    private RedissonClient redissonClient;

    private static final String REDIS_QUEUE = "listQueue";

    /**
     * 消息生產(chǎn)
     *
     * @param msg
     */
    public void msgProduce(String msg) {
        RBlockingDeque<String> blockDeque = redissonClient.getBlockingDeque(REDIS_QUEUE);
        try {
            blockDeque.putFirst(msg); // 消息寫入隊列頭部
        } catch (InterruptedException e) {
            log.error(e.printStackTrace());
        }
    }

    /**
     * 消息消費:阻塞
     */
    public void msgConsume() {
        RBlockingDeque<String> blockDeque = redissonClient.getBlockingDeque(REDIS_QUEUE);
		Boolen isCheck = true;
        while (isCheck) {
            try {
                String msg = blockDeque.takeLast();  // 從隊列中取出消息
            } catch (InterruptedException e) {
                log.error(e.printStackTrace());
            }
        }
    }

5 總結(jié)

  • Redis中使用List 數(shù)據(jù)結(jié)構(gòu)實現(xiàn)消息隊列,滿足FIFO的處理機制,使用 RPOP 進行消息讀取。
  • 使用 BRPOP 指令處理消費及時性問題
  • 使用 BRPOPLPUSH 命令進行消息數(shù)據(jù)備份,解決消息可靠性傳輸問題。
  • 相對于專業(yè)的MQ,如kafka和RocketMQ,處理能力會差很多。所以在在消息量不大的場景中使用,可以作為一個比較不錯的消息隊列解決方案。但是過于復雜的場景容易造成消息堆積。

到了這里,關(guān)于Redis系列14:使用List實現(xiàn)消息隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • redis實現(xiàn)消息隊列

    redis實現(xiàn)消息隊列

    消息隊列(Message Queue)是一種常見的軟件架構(gòu)模式,用于在分布式系統(tǒng)中傳遞和處理異步消息。它解耦了發(fā)送消息的應用程序和接收消息的應用程序之間的直接依賴關(guān)系,使得消息的發(fā)送者和接收者可以獨立地演化和擴展。 消息隊列的基本原理是發(fā)送者將消息發(fā)送到一個中

    2024年02月09日
    瀏覽(23)
  • 基于Redis實現(xiàn)消息隊列的實踐

    基于Redis實現(xiàn)消息隊列的實踐

    消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應用和分布式系統(tǒng)設(shè)計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應用系統(tǒng)必備的技術(shù)之一。目前,針對不同的業(yè)務場景,比較成熟可靠的消息中間件產(chǎn)品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實

    2024年02月07日
    瀏覽(16)
  • SpringBoot+Redis stream實現(xiàn)消息隊列

    目錄 一、前言 二、下載Redis及引入Redis依賴 三、配置消費者及消費組 四,配置Redsi及初始化stream、消費組、消費者 相較于?RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等重量級的消息隊列中間件,Redis在需求量小的情況下,也可以作為消息中間件來使用。Redis作為消息隊列使

    2024年02月16日
    瀏覽(29)
  • 【Redis實戰(zhàn)】有MQ為啥不用?用Redis作消息隊列?。縍edis作消息隊列使用方法及底層原理高級進階

    【Redis實戰(zhàn)】有MQ為啥不用?用Redis作消息隊列!?Redis作消息隊列使用方法及底層原理高級進階

    ?????歡迎光臨???? ??我是蘇澤,一位對技術(shù)充滿熱情的探索者和分享者。???? ??特別推薦給大家我的最新專欄 《Redis實戰(zhàn)與進階》 本專欄純屬為愛發(fā)電永久免費?。?! 這是蘇澤的個人主頁可以看到我其他的內(nèi)容哦???? 努力的蘇澤 http://suzee.blog.csdn.net/ 我們用的是云

    2024年02月20日
    瀏覽(16)
  • Spring Boot 整合Redis實現(xiàn)消息隊列

    ??本篇文章主要來講Spring Boot 整合Redis實現(xiàn)消息隊列,實現(xiàn)redis用作消息隊列有多種方式,比如: 基于 List 的 rpush+lpop 或 lpush+rpop 基于 List 的 rpush+blpop 或 lpush+brpop (阻塞式獲取消息) 基于 Sorted Set 的優(yōu)先級隊列 Redis Stream (Redis5.0版本開始) Pub/Sub 機制 ??不過這里講的是

    2024年02月13日
    瀏覽(34)
  • 5. Redis優(yōu)化秒殺、Redis消息隊列實現(xiàn)異步秒殺

    5. Redis優(yōu)化秒殺、Redis消息隊列實現(xiàn)異步秒殺

    承接Redis - 優(yōu)惠券秒殺、庫存超賣、分布式鎖、Redisson文章 代碼中有大量數(shù)據(jù)庫的操作,整個業(yè)務性能并不是很好 平均耗時達到了497毫秒 首先回顧一下之前秒殺業(yè)務的流程 前端發(fā)起請求到達我們的Nginx,然后Nginx會把我們的請求負載均衡到我們的tomcat 而在tomcat中執(zhí)行各種邏輯

    2024年02月13日
    瀏覽(17)
  • (六)、Springboot+Redis實現(xiàn)通用消息隊列stater

    其實除了主流的各大消息中間件ActiveMQ, RocketMQ,RabbitMQ,Kafka之外,其實Redis也是支持消息隊列功能的。 而有時候我們不需要引入消息隊列中間件,跟緩存中間件Redis一起一起共用一個Redis作為消息中間件也是可以的,這樣就少用了一個組件。 1)、使用stream實現(xiàn)點對點消息模式

    2024年02月10日
    瀏覽(21)
  • 基于redis stream實現(xiàn)一個可靠的消息隊列

    我們使用的庫為redisson。 添加元素到隊列很簡單,用RStream.add方法即可。 如何從隊列獲取元素?由于我們打算實現(xiàn)kafka那樣的consumer group機制,所以,讀操作要用RStream.readGroup函數(shù)(XREADGROUP命令),該命令有阻塞和非阻塞版本,簡單起見,我們使用非阻塞版本(不帶BLOCK參數(shù))

    2024年02月12日
    瀏覽(29)
  • Redis的發(fā)布訂閱模式:實現(xiàn)消息隊列和實時數(shù)據(jù)推送的利器

    當涉及到實時數(shù)據(jù)推送和消息隊列時,Redis的發(fā)布訂閱模式是一種非常有用的工具。Redis是一個開源的內(nèi)存數(shù)據(jù)庫,被廣泛用于緩存、隊列和實時數(shù)據(jù)處理等方面。 在本博客中,我們將重點介紹Redis的發(fā)布訂閱模式,并且提供一些示例代碼來幫助讀者更好地理解這個模式以及如

    2024年02月12日
    瀏覽(24)
  • Redis Stream 流的深度解析與實現(xiàn)高級消息隊列【一萬字】

    Redis Stream 流的深度解析與實現(xiàn)高級消息隊列【一萬字】

    詳細介紹了 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)Stream的使用方式以及原理,如何實現(xiàn)更加可靠的消息隊列。 基于Reids的消息隊列實現(xiàn)有很多種,比如基于PUB/SUB(訂閱/發(fā)布)模式、基于List的 PUSH和POP一系列命令的實現(xiàn)、基于Sorted-Set的實現(xiàn)。雖然它們都有各自的特點,比如List支持阻

    2024年02月15日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包