国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

rabbitmq筆記-rabbitmq進階-數(shù)據(jù)可靠性,rabbitmq高級特性

這篇具有很好參考價值的文章主要介紹了rabbitmq筆記-rabbitmq進階-數(shù)據(jù)可靠性,rabbitmq高級特性。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

消息何去何從

mandatory和immediate是channel.basicPublish方法的兩個參數(shù),都有消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者的功能。

mandatory參數(shù)
  • true:交換器無法根據(jù)自身的類型 和路由鍵找到符合條件的隊列,rabbitmq調(diào)用Basic.Return命令將消息返回給生產(chǎn)者
    • 生產(chǎn)者調(diào)用channel.addReturnListener添加ReturnListener監(jiān)聽器實現(xiàn)
  • false:消息直接丟棄
immediate參數(shù)

告訴服務(wù)器至少將該消息路由到一個隊列中,否則將消息返回給生產(chǎn)者。

Rabbitmq3.0去掉了對immediate參數(shù)支持,建議采用TTL和DLX方法替代

  • true:如果交換器在將消息路由到隊列時發(fā)現(xiàn)隊列上并不存在任何消費者,這條消息將不會存入隊列中,當與路由鍵匹配的所有隊列都沒有消費者時,該消息會通過Basic.Return返回至生產(chǎn)者。
備份交換器(AE)

生產(chǎn)者發(fā)送消息不設(shè)置mandatory,消息未被路由會丟失,設(shè)置了,需要添加ReturnListener。如果不想編程復(fù)雜也不想消息丟失使用備份交換器。

使用:

  • 聲明交換器時添加alternate-exchange參數(shù)實現(xiàn)
    • channel.exchangeDeclare(“myAe”,“fanout”,true,false,null);
    • channel.queueDeclare(“unroutedQueue”,true,false,false,null);
    • channel.queueBind(“unroutedQueue”,“myAe”,“”);
  • 通過策略(Policy)實現(xiàn)
    • rabbitmqctl set_policy AE “^normalExchange$” ‘{“alternate-exchange”:“myAE”}’

特殊情況:

  • 如果設(shè)置了備份交換器不存在,客戶端和RabbitMQ服務(wù)端都不會有異常出現(xiàn),此時消息會丟失
  • 如果備份交換器沒有綁定任何隊列,客戶端和rabbitmq服務(wù)端都不會有異常出現(xiàn),此時消息會丟失
  • 如果備份交換器沒有任何匹配的隊列,客戶端和rabbitmq服務(wù)端都不會有異常出現(xiàn),此時消息會丟失
  • 如果備份交換器和mandatory參數(shù)一起使用,那么mandatory參數(shù)無效

過期時間(Time to Live ,TTL)

設(shè)置消息TTL
  • 通過隊列屬性設(shè)置,隊列中所有消息都有相同的過期時間(一旦過期,就從隊列中抹去,消息已經(jīng)在隊列頭部,只要定期從隊列頭部開始掃描即可)

    • channel.queueDeclare方法中加入x-message-ttl參數(shù)實現(xiàn),單位ms

      • Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-message-ttl",6000);
        channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
        
    • 通過Policy方式設(shè)置ttl

      • rabbitmqctl set_policy TTL ".*" '{"message-ttl":6000}' --apply-to queue
        
    • 通過調(diào)用http api接口設(shè)置

  • 通過對消息本身單獨設(shè)置,每條消息的ttl可以不同(即使過期,也不會馬上抹去,是否過期是在即將投遞到消費者之前判定的)

    • 代碼設(shè)置
      • 設(shè)置AMQP.BasicProperties屬性
      • set屬性:deliveryMode(持久化消息),expiration(ttl時間)
    • 通過 http api接口設(shè)置
  • 如果兩個方法一起使用,消息的ttl以兩者之間較小的數(shù)值為準,消息在隊列中一旦超過設(shè)置的ttl時,就會變成死信,消費者將無法再收到該消息

  • 不設(shè)置ttl,表示此消息不會過期,ttl=0,表示除非此時可以直接將消息投遞到消費者,否則立即丟棄。

設(shè)置隊列TTL

channel.queueDeclare方法中的x-expires參數(shù)可以控制隊列被自動刪除前處于未使用狀態(tài)的時間(未使用:隊列上沒有任何消費者,隊列也沒有被重新聲明,并在過期時間段內(nèi)也未調(diào)用過Basic.Get命令)

Map<String, Object> args = new HashMap<String,Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue",false,false,false,args);

死信隊列

當消息在一個隊列中變成死信后,能被重新被發(fā)送到另一個交換器中,這個交換器就是DLX,死信交換器,綁定DLX的隊列就是死信隊列

消息變成死信情況

  • 消息被拒絕
  • 消息過期
  • 隊列達到最大長度

當隊列中存在死信時,rabbitmq會自動將這個消息重新發(fā)布到設(shè)置的DLX上去,進而被路由到另一個隊列,死信隊列,可以監(jiān)聽這個隊列的消息進行相應(yīng)處理

設(shè)置方法

  • 代碼設(shè)置:
    • channel.queueDeclare方法中設(shè)置x-dead-letter-exchange為隊列添加DLX
  • 通過Policy方式設(shè)置

延遲隊列

延遲隊列存儲的對象是對應(yīng)的延遲消息(當消息被發(fā)送以后,并不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費)。

場景:

  • 訂單系統(tǒng),30min內(nèi)未支付,進行異常處理
  • 手機遙控設(shè)備指定時間工作

通過DLX 和TTL模擬延遲隊列的功能。

假設(shè)一個應(yīng)用中需要將每條消息都設(shè)置為10秒延遲,生產(chǎn)者通過exchange.normal交換器將發(fā)送的消息存儲在queue.normal隊列,消費者訂閱的是queue.dlx隊列,當消息從queue.normal整個隊列中過期之后被存入queue.dlx隊列,消費者恰巧消費到了延遲10秒的這條消息。

優(yōu)先級隊列

實現(xiàn):通過設(shè)置隊列的x-max-priority參數(shù)實現(xiàn)

默認最低優(yōu)先級為0,越高越優(yōu)先消費

前提:如果在消費速度大于生成者的速度且broker中沒有消息堆積的情況下,對發(fā)送的消息設(shè)置優(yōu)先級就沒什么意義了。

RPC實現(xiàn)

客戶端發(fā)送請求消息,服務(wù)端回復(fù)響應(yīng)的消息,為了接收響應(yīng)的消息,需要在請求消息中發(fā)送一個回調(diào)隊列

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());
  • replyTo:用來設(shè)置一個回調(diào)隊列
  • correlationId:用來關(guān)聯(lián)請求和其調(diào)用RPC之后的回復(fù),每一個請求設(shè)置一個唯一的correlationId

可以為每一個客戶端創(chuàng)建一個單一的回調(diào)隊列。

持久化

  • 交換器持久化:通過在聲明隊列是將durable參數(shù)置為true實現(xiàn)的。如果不持久化,rabbitmq服務(wù)重啟后,相關(guān)的交換器元數(shù)據(jù)會丟失,消息不丟失,只是不能將消息發(fā)送到這個交換器中了。
  • 隊列持久化:通過在聲明隊列時將durable置為true,如果不設(shè)置持久化,rabbitmq重啟后,相關(guān)隊列元數(shù)據(jù)會丟失,此時數(shù)據(jù)也會丟失。
  • 消息持久化:通過將消息的投遞模式BasicProperties中的diliveryMode屬性設(shè)置為2即可實現(xiàn)消息的持久化
  • 設(shè)置了隊列和消息的持久化,rabbitmq服務(wù)重啟后,消息依舊存在。

將交換器、隊列、消息都設(shè)置持久化后不能保證數(shù)據(jù)百分百丟失。

生產(chǎn)者確認

確定消息到底有沒有正確到達服務(wù)器??梢酝ㄟ^事務(wù)機制和發(fā)送方確認機制

事務(wù)機制

rabbitmq客戶端與事務(wù)機制相關(guān)方法

  • channel.txSelect:用于當前的信道設(shè)置成事務(wù)模式
  • channel.txCommit:用于提交事務(wù)
  • channel.txRollback:用于事務(wù)回滾

開啟事務(wù)流程

  • 客戶端發(fā)送Tx.select,將信道置為事務(wù)模式
  • Broker回復(fù)Tx.Select-Ok,確認已將信道置為事務(wù)模式
  • 在發(fā)送完消息后,客戶端發(fā)送Tx.Commit提交事務(wù)
  • Broker回復(fù)Tx.Commit-Ok,確認提交事務(wù)
  • 如果發(fā)生異常,在捕獲異常后,channel.txRollback()回滾

缺點:會有性能損失

發(fā)送方確認機制
  • 生產(chǎn)者將信道設(shè)置成confirm模式(channel.confirmSelect),rabbitmq同意:Confirm.Select-Ok;
  • 一旦信道進入confirm模式,所有在該信道上發(fā)布的消息都會被指派一個唯一id
  • 一旦消息被投遞到匹配的隊列后,rabbitmq會發(fā)送一個確認給生產(chǎn)者(包含消息唯一id),使得生產(chǎn)者知曉消息已經(jīng)正確到達了目的地。

事務(wù)機制在一條消息發(fā)送后會使發(fā)送端阻塞,等待rabbitmq回應(yīng)后才發(fā)下一條消息,而發(fā)送發(fā)確認機制最大好處是異步的。生產(chǎn)者通過回調(diào)方法處理該確認消息。如果rabbitmq因自身內(nèi)部錯誤導(dǎo)致消息丟失,會發(fā)送一條nack命令,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理nack命令。

publisher confirm優(yōu)勢

  • 批量confirm方法,每發(fā)送一批消息后,調(diào)用channel.waitForConfirms方法,等待服務(wù)器的確認返回
  • 異步confirm方法:提供一個回調(diào)方法,服務(wù)端確認了一條或多條消息后客戶端會回調(diào)這個方法進行處理。

消費端要點介紹

消息分發(fā)

rabbitmq隊列擁有多個消費者時,隊列收到的消息將以輪詢的分發(fā)方式發(fā)送給消費者,每條消息只會發(fā)送給訂閱列表里的一個消費者。

  • 問題:如果某些空閑,某些忙碌造成整體下降
  • 方法:channel.basicQos方法允許限制信道上的消費者所能保持的最大未確認消息的數(shù)量。如果達到上限,就不會向這個消費者再發(fā)送任何消息,知道消費者確認了某條消息后,相應(yīng)計數(shù)減1,之后消費者可以繼續(xù)接受消息。
消息順序性

指消費者消費到的消息和發(fā)送者發(fā)布的消息的順序是一致的。

打破順序性的情形

  • 如果生產(chǎn)者使用了事務(wù)機制,發(fā)送消息遇到異常進行了事務(wù)回滾,需重新補償發(fā)送,如果是另一個線程實現(xiàn),則出現(xiàn)亂序。
  • 如果生產(chǎn)者發(fā)送的消息設(shè)置了不同的超時時間,并設(shè)置了死信隊列,順序不一致。
  • 設(shè)置了優(yōu)先級,也不是順序的。

要保證消息的順序性,需要業(yè)務(wù)方使用rabbitmq之后進一步處理,例如在消息體內(nèi)添加全局有序標識實現(xiàn)。

棄用QueueingConsumer

缺陷

  • 內(nèi)存溢出問題:隊列堆積較多的消息,導(dǎo)致消費者客戶端內(nèi)存溢出假死,不斷堆積
    • 使用Basic.Qos限制某個消費者所保持未確認消息的數(shù)量。
  • 會拖累同一個connection下的所有信道,性能降低
  • 同步遞歸調(diào)用QueueingConsumer會產(chǎn)生死鎖
  • rabbitmq的自動連接恢復(fù)機制不支持Queueing Consumer這種形式
  • QueueingConsumer不是事件驅(qū)動的

消息傳輸保障

一般消息中間件消息傳輸保障分為三個層級

  • 最多一次
  • 最少一次
  • 恰好一次

rabbitmq支持其中的最多一次和最少一次,其中最少一次投遞實現(xiàn)需要考慮

  • 消息生產(chǎn)者需要開啟事務(wù)機制或publisher confirm機制,以確保消息可以可靠地傳輸?shù)絩abbitmq中
  • 消息生產(chǎn)者需要配合使用mandatory參數(shù)或者備份交換器來確保消息能夠從交換器路由到隊列中,進而能夠保存下來而不會被丟棄
  • 消息和隊列都需要進行持久化處理,以確保rabbitmq服務(wù)器在遇到異常情況時不會造成消息丟失
  • 消費者在消費消息的同時需要將autoAck設(shè)置為false,然后通過手動確認的方式去確認已經(jīng)正確消費的消息,以避免在消費端引起不必要的消息丟失。

參考:《RabbitMQ實戰(zhàn)指南》文章來源地址http://www.zghlxwxcb.cn/news/detail-687828.html

到了這里,關(guān)于rabbitmq筆記-rabbitmq進階-數(shù)據(jù)可靠性,rabbitmq高級特性的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ --- 消息可靠性

    RabbitMQ --- 消息可靠性

    消息隊列在使用過程中,面臨著很多實際問題需要思考: ?? ? 消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 co

    2024年02月14日
    瀏覽(29)
  • RabbitMQ消息的可靠性

    面試題: Rabbitmq怎么保證消息的可靠性? 1.消費端消息可靠性保證: 消息確認(Acknowledgements) : 消費者在接收到消息后,默認情況下RabbitMQ會自動確認消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費者在處理完消息后手動發(fā)送確認(basicAck)。如果消費

    2024年04月14日
    瀏覽(25)
  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(31)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場景 1.2 MQ導(dǎo)致消息丟失 1.3 消費者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機制 2.2 生產(chǎn)者確認機制 2.3 實現(xiàn)生產(chǎn)者確認 2.3.1 配置yml開啟生產(chǎn)者確認 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(25)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • RabbitMQ----生產(chǎn)者可靠性

    RabbitMQ----生產(chǎn)者可靠性

    生產(chǎn)者可靠性主要分為兩個方面: 生產(chǎn)者重連 生產(chǎn)者確認 ????????有的時候由于網(wǎng)絡(luò)波動,可能會出現(xiàn)客戶端連接MO失敗的情況。通過配置我們可以開啟連接失敗后的重連機制: 注意: ????????當網(wǎng)絡(luò)不穩(wěn)定的時候,利用重試機制可以有效提高消息發(fā)送的成功率。不

    2024年01月21日
    瀏覽(35)
  • RabbitMQ之MQ可靠性

    RabbitMQ之MQ可靠性

    RabbitMQ實現(xiàn)數(shù)據(jù)持久化包括3個方面 (1)交換機持久化 (2)隊列持久化 (3)消息持久化 注:開啟持久化和生產(chǎn)者確認時,RabbitMQ只有在消息持久化完成后才會給生產(chǎn)者返回ACK回執(zhí) 從RabbitMQ的3.6.0版本開始,就增加了Lazy Queue的概念,也就是惰性隊列 注:從3.12版本后,所有隊

    2024年01月21日
    瀏覽(25)
  • RabbitMQ-生產(chǎn)者可靠性

    RabbitMQ-生產(chǎn)者可靠性

    ? ? ? ? 由于網(wǎng)絡(luò)波動導(dǎo)致客戶端無法連接上MQ,這是可以開啟MQ的失敗后重連機制。 ? ? ? ? 注意: ? ? ? ? ? ? ? ? 是連接失敗的重試,而不是消息發(fā)送失敗后的重試。 ? ? ? ? 這種超時重連的方式是 阻塞式 的,后面的代碼沒辦法執(zhí)行,如果說業(yè)務(wù)要求比較嚴格,則需

    2024年01月21日
    瀏覽(30)
  • RabbitMQ之消息的可靠性傳遞

    提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動添加 RabbitMQ之消息的可靠性傳遞 提示:寫完文章后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內(nèi)容: 在當今的信息化時代,消息傳遞在企業(yè)級應(yīng)用和分布式

    2024年01月19日
    瀏覽(22)
  • RabbitMQ消息可靠性問題及解決

    RabbitMQ消息可靠性問題及解決

    說明:在RabbitMQ消息傳遞過程中,有以下問題: 消息沒發(fā)到交換機 消息沒發(fā)到隊列 MQ宕機,消息在隊列中丟失 消息者接收到消息后,未能正常消費(程序報錯),此時消息已在隊列中移除 針對以上問題,提供以下解決方案: 消息確認:確認消息是否發(fā)送到交換機、隊列;

    2024年02月16日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包