国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ高級特性解析:消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制探究

這篇具有很好參考價值的文章主要介紹了RabbitMQ高級特性解析:消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制探究。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

問題

消息可靠性

消息冪等性

mq高可用


代碼準(zhǔn)備

1. 新建生產(chǎn)者

https://gitee.com/lixiaogou/rabbitmq/tree/master/extensions-spring-rabbitmq-producer

2. 新建消費(fèi)者

https://gitee.com/lixiaogou/rabbitmq/tree/master/extensions-spring-rabbitmq-consumer

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制


RabbitMQ 高級特性

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

1. 消息的可靠投遞☆

在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。

1.1 兩種模式

  • confirm 確認(rèn)模式

  • return 退回模式

RabbitMQ 整個消息投遞的路徑為:producer—>rabbitmq broker—>exchange—>queue—>consumer

  1. 消息從 producer --> exchange 會返回一個 confirmCallback 。

  2. 消息從 exchange --> queue 投遞失敗則會返回一個 returnCallback 。

我們將利用這兩個callback控制消息的可靠性投遞l

1.2 測試confirm 確認(rèn)模式

開啟確認(rèn)模式

定義ConfirmCallBack回調(diào)函數(shù)

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

ConfirmCallBack源碼

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制


1.3 測試return 退回模式

開啟回退模式

定義ReturnCallback回調(diào)函數(shù)

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

ReturnCallback源碼

1.4 小結(jié)

確認(rèn)模式

  • 消息從 producer --> exchange 會返回一個 confirmCallback 。

  • 設(shè)置 ConnectionFactory 的 publisher-confirms="true" 開啟 確認(rèn)模式

  • 使用 rabbitTemplate.setConfirmCallback 設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。

退回模式

  • 消息從 exchange --> queue 投遞失敗會返回一個 returnCallback 。

  • 設(shè)置 ConnectionFactory 的 publisher-returns="true" 開啟 退回模式。

  • 使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),設(shè)置rabbitTemplate.setMandatory(true)參數(shù),當(dāng)消息從exchange路由到queue失敗后,會將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。

事務(wù)

  • 在RabbitMQ中也提供了事務(wù)機(jī)制,但是性能較差,此處不做講解。

  • 使用channel下列方法,完成事務(wù)控制:

    • txSelect(),用于將當(dāng)前channel設(shè)置成transaction模式

    • txCommit(),用于提交事務(wù)

    • txRollback(),用于回滾事務(wù)


2. Consumer ACK☆

ack指Acknowledge,確認(rèn)。 表示消費(fèi)端收到消息后的確認(rèn)方式。

2.1 三種ACK

  • 自動確認(rèn):acknowledge=“none” (默認(rèn))

當(dāng)消息一旦被Consumer接收到,則自動確認(rèn)收到,并將相應(yīng)消息從 RabbitMQ 的消息緩存中移除。在實際業(yè)務(wù)處理中,很可能消息接收到,但是業(yè)務(wù)處理出現(xiàn)異常,而此時消息已經(jīng)從緩存中移除,那么該消息就會丟失

  • 手動確認(rèn):acknowledge=“manual”

在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),進(jìn)行手動簽收,如果業(yè)務(wù)出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓生產(chǎn)者自動重新發(fā)送消息。

  • 根據(jù)異常情況確認(rèn):acknowledge=“auto”,(這種方式使用麻煩,不常用

2.2 測試手動ACK

  1. 生產(chǎn)者配置文件

  2. 生產(chǎn)者測試代碼

  3. 消費(fèi)者配置文件

  4. 消費(fèi)者監(jiān)聽器

  5. 消費(fèi)者測試類

2.3 小結(jié)

  • 消費(fèi)者配置文件中,將rabbit:listener-container標(biāo)簽中設(shè)置acknowledge=“manual”

  • 如果在消費(fèi)端沒有異常,則調(diào)用channel.basicAck(deliveryTag,false)方法,手動確認(rèn)簽收消息

  • 如果在消費(fèi)端出現(xiàn)異常,則在catch中調(diào)用 basicNack或 basicReject,拒絕消息,讓MQ重發(fā)消息。

2.4 消息可靠性總結(jié)

  1. 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  2. 生產(chǎn)方確認(rèn)Confirm

    • confirm 確認(rèn)模式

    • return 退回模式

  3. 消費(fèi)方確認(rèn)Ack

    • 自動確認(rèn):acknowledge=“none”

    • 手動確認(rèn):acknowledge=“manual”

    • 根據(jù)異常情況確認(rèn):acknowledge=“auto”

  4. Broker高可用(集群)

3. 消費(fèi)端限流

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

測試消費(fèi)端限流

消費(fèi)端配置文件prefetch="xx"

監(jiān)聽器代碼

消費(fèi)端測試類

發(fā)送端測試類

小結(jié)

  • 消費(fèi)端rabbit:listener-container 中配置 prefetch屬性設(shè)置消費(fèi)端一次拉取多少消息

  • 消費(fèi)端的確認(rèn)模式一定為手動確認(rèn)。acknowledge=“manual”

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

4. TTL(Time To Live)

TTL 全稱 Time To Live(存活時間/過期時間)。當(dāng)消息到達(dá)存活時間后,還沒有被消費(fèi),會被自動清除。RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

控制臺添加ttl隊列

test_queue_ttl

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

添加交換機(jī)

test_exchange_ttl

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制


RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

綁定交換機(jī)和隊列

消息發(fā)布

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

等待10秒過了存活時間,消息沒有被消費(fèi),自動清除

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

代碼演示

生產(chǎn)者配置文件

生產(chǎn)者發(fā)送消息

每次測試之后都查看控制臺

小結(jié)

  • 設(shè)置隊列過期時間:x-message-ttl,單位:ms(毫秒),會對整個隊列消息統(tǒng)一過期。

  • 設(shè)置消息過期時間:expiration。單位:ms(毫秒),當(dāng)該消息在隊列頭部時(消費(fèi)時),會單獨(dú)判斷這一消息是否過期,根據(jù)是否過去決定是否需要移除。

  • 如果兩者都進(jìn)行了設(shè)置,以時間短的為準(zhǔn)。

5.死信隊列DLX☆

死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機(jī)),當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個交換機(jī),這個交換機(jī)就是DLX。

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

消息成為死信的三種情況☆

  1. 隊列消息長度到達(dá)限制;

  2. 消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,且消息不重新放入原目標(biāo)隊列,requeue=false;

  3. 原隊列存在消息過期設(shè)置,消息到達(dá)過期時間未被消費(fèi);

隊列綁定死信交換機(jī)

給隊列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

代碼測試

步驟

  1. 生產(chǎn)者配置聲明死信隊列(queue_dlx)和死信交換機(jī)(exchange_dlx)

  2. 生產(chǎn)者配置聲明正常的隊列(test_queue_dlx)和交換機(jī)(test_exchange_dlx)
    3. 生產(chǎn)者配置正常隊列綁定死信交換機(jī) 設(shè)置兩個參數(shù):
    1. x-dead-letter-exchange:死信交換機(jī)名稱
    2. x-dead-letter-routing-key:發(fā)送給死信交換機(jī)的routingkey

生產(chǎn)者配置文件

生產(chǎn)者發(fā)送消息

消費(fèi)者配置文件監(jiān)聽死信隊列

測試結(jié)果
  1. 發(fā)送到正常隊列,正常隊列消息過期后,消息自動進(jìn)入到死信隊列,死信隊列有1條消息

    RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

  2. 正常隊列有十條消息,死信隊列也有十條,正常隊列消息過期后,死信隊列有20條消息

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

  1. 正常隊列有1條消息,然后消費(fèi)端拒絕接收之后,死信隊列就有1條消息

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

小結(jié)

  1. 死信交換機(jī)、死信隊列 和 普通的沒有區(qū)別

  2. 當(dāng)消息成為死信后,如果該隊列綁定了死信交換機(jī),則消息會被死信交換機(jī)重新路由到死信隊列

  3. 消息成為死信的三種情況:

    1. 隊列消息長度到達(dá)限制;

    2. 消費(fèi)者拒接消費(fèi)消息,并且不重回隊列;

    3. 原隊列存在消息過期設(shè)置,消息到達(dá)超時時間未被消費(fèi);

6.延遲隊列☆

延遲隊列,即消息進(jìn)入隊列后不會立即被消費(fèi),只有到達(dá)指定時間后,才會被消費(fèi)。

  • 需求:

    1. 下單后,30分鐘未支付,取消訂單,回滾庫存。

    2. 新用戶注冊成功7天后,發(fā)送短信問候。

  • 實現(xiàn)方式:

    1. 定時器

    2. 延遲隊列

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

很可惜,在RabbitMQ中并未提供延遲隊列功能。但是可以使用:TTL+DLX實現(xiàn)延遲隊列的效果。

代碼演示

生產(chǎn)者配置文件

       1. 定義死信交換機(jī)(order_exchange_dlx)和隊列(order_queue_dlx)
       2. 定義正常交換機(jī)(order_exchange)和隊列(order_queue)
       3. 綁定,設(shè)置正常隊列過期時間為30分鐘


生產(chǎn)者發(fā)送消息

消息發(fā)送結(jié)果:10秒內(nèi)位于正常隊列中,十秒之后進(jìn)入死信隊列

消費(fèi)端配置

消費(fèi)端接收消息

消息接收結(jié)果:啟動消費(fèi)端之后,私信隊列中的消息都被消費(fèi)端手動ACK了

小結(jié)

  • 延遲隊列,即消息進(jìn)入隊列后不會立即被消費(fèi),只有到達(dá)指定時間后,才會被消費(fèi)。

  • 在RabbitMQ中并未提供延遲隊列功能。但是可以使用:TTL+DLX實現(xiàn)延遲隊列的效果。


7.日志與監(jiān)控(了解)

RabbitMQ日志

RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務(wù)節(jié)點(diǎn)名稱、cookie的hash值、RabbitMQ配置文件地址、內(nèi)存限制、磁盤限制、默認(rèn)賬戶guest的創(chuàng)建以及權(quán)限配置等等。

web管控臺監(jiān)控

rabbitmqctl管理和監(jiān)控

查看隊列# rabbitmqctl list_queues查看exchanges# rabbitmqctl list_exchanges查看用戶# rabbitmqctl list_users查看連接# rabbitmqctl list_connections查看消費(fèi)者信息# rabbitmqctl list_consumers查看環(huán)境變量# rabbitmqctl environment查看未被確認(rèn)的隊列# rabbitmqctl list_queues  name messages_unacknowledged查看單個隊列的內(nèi)存使用# rabbitmqctl list_queues name memory查看準(zhǔn)備就緒的隊列# rabbitmqctl list_queues name messages_ready



8.消息追蹤

在使用任何消息中間件的過程中,難免會出現(xiàn)某條消息異常丟失的情況。對于RabbitMQ而言,可能是因為生產(chǎn)者或消費(fèi)者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機(jī)制;也有可能是因為交換器與隊列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒有與任何隊列進(jìn)行綁定,生產(chǎn)者又不感知或者沒有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個時候就需要有一個較好的機(jī)制跟蹤記錄消息的投遞過程,以此協(xié)助開發(fā)和運(yùn)維人員進(jìn)行問題的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現(xiàn)消息追蹤。

Firehose

firehose的機(jī)制是將生產(chǎn)者投遞給rabbitmq的消息,rabbitmq投遞給消費(fèi)者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個默認(rèn)的exchange的名稱為amq.rabbitmq.trace,它是一個topic類型的exchange。發(fā)送到這個exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實際exchange和queue的名稱,分別對應(yīng)生產(chǎn)者投遞到exchange的消息,和消費(fèi)者從queue上獲取的消息。

注意:打開 trace 會影響消息寫入功能,適當(dāng)打開后請關(guān)閉。

rabbitmqctl trace_on:開啟Firehose命令

rabbitmqctl trace_off:關(guān)閉Firehose命令

rabbitmq_tracing

rabbitmq_tracing和Firehose在實現(xiàn)上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。

啟用插件:rabbitmq-plugins enable rabbitmq_tracing


RabbitMQ應(yīng)用問題☆

1.消息可靠性保障

需求:100%確保消息發(fā)送成功

消息補(bǔ)償機(jī)制

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

流程:producer–>1業(yè)務(wù)數(shù)據(jù)入庫–>

  • 2發(fā)送消息到Q1–>consumer監(jiān)聽消息Q1、消費(fèi)Q1、操作consumerDB,consumer發(fā)送確認(rèn)消息到Q2–>回調(diào)檢查服務(wù)監(jiān)聽確認(rèn)消息Q2、消費(fèi)Q2、將消息寫入MDB數(shù)據(jù)庫

  • 3發(fā)送延遲消息到Q3–>回調(diào)檢查服務(wù)–>6監(jiān)聽延遲消息Q3–>消費(fèi)Q3–>比對MDB中是否有該消息,

    • 有該消息不做處理,

    • 沒有該消息–>8調(diào)用producer,重新發(fā)送消息,如此循環(huán)下去

定時檢查服務(wù)檢查數(shù)據(jù)庫數(shù)據(jù),重新調(diào)用producer發(fā)送消息

2.消息冪等性處理

冪等性指一次和多次請求某一個資源,對于資源本身應(yīng)該具有同樣的結(jié)果。也就是說,其任意多次執(zhí)行對資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

在MQ中指,消費(fèi)多條相同的消息的結(jié)果與消費(fèi)該消息一次的結(jié)果相同。

樂觀鎖解決方案

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制


RabbitMQ集群搭建

摘要:實際生產(chǎn)應(yīng)用中都會采用消息隊列的集群方案,如果選擇RabbitMQ那么有必要了解下它的集群方案原理

一般來說,如果只是為了學(xué)習(xí)RabbitMQ或者驗證業(yè)務(wù)工程的正確性那么在本地環(huán)境或者測試環(huán)境上使用其單實例部署就可以了,但是出于MQ中間件本身的可靠性、并發(fā)性、吞吐量和消息堆積能力等問題的考慮,在生產(chǎn)環(huán)境上一般都會考慮使用RabbitMQ的集群方案。

3.1 集群方案的原理

RabbitMQ這款消息隊列中間件產(chǎn)品本身是基于Erlang編寫,Erlang語言天生具備分布式特性(通過同步Erlang集群各節(jié)點(diǎn)的magic cookie來實現(xiàn))。因此,RabbitMQ天然支持Clustering。這使得RabbitMQ本身不需要像ActiveMQ、Kafka那樣通過ZooKeeper分別來實現(xiàn)HA方案和保存集群的元數(shù)據(jù)。集群是保證可靠性的一種方式,同時可以通過水平擴(kuò)展以達(dá)到增加消息吞吐量能力的目的。

RabbitMQ,RabbitMQ,高級特性,消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制

3.2 單機(jī)多實例部署

由于某些因素的限制,有時候你不得不在一臺機(jī)器上去搭建一個rabbitmq集群,這個有點(diǎn)類似zookeeper的單機(jī)版。真實生成環(huán)境還是要配成多機(jī)集群的。有關(guān)怎么配置多機(jī)集群的可以參考其他的資料,這里主要論述如何在單機(jī)中配置多個rabbitmq實例。

主要參考官方文檔:https://www.rabbitmq.com/clustering.html

首先確保RabbitMQ運(yùn)行沒有問題

[root@super ~]# rabbitmqctl statusStatus of node rabbit@super ...[{pid,10232}, {running_applications,     [{rabbitmq_management,"RabbitMQ Management Console","3.6.5"},      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.5"},      {webmachine,"webmachine","1.10.3"},      {mochiweb,"MochiMedia Web Server","2.13.1"},      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.5"},      {rabbit,"RabbitMQ","3.6.5"},      {os_mon,"CPO  CXC 138 46","2.4"},      {syntax_tools,"Syntax tools","1.7"},      {inets,"INETS  CXC 138 49","6.2"},      {amqp_client,"RabbitMQ AMQP Client","3.6.5"},      {rabbit_common,[],"3.6.5"},      {ssl,"Erlang/OTP SSL application","7.3"},      {public_key,"Public key infrastructure","1.1.1"},      {asn1,"The Erlang ASN1 compiler version 4.0.2","4.0.2"},      {ranch,"Socket acceptor pool for TCP protocols.","1.2.1"},      {mnesia,"MNESIA  CXC 138 12","4.13.3"},      {compiler,"ERTS  CXC 138 10","6.0.3"},      {crypto,"CRYPTO","3.6.3"},      {xmerl,"XML parser","1.3.10"},      {sasl,"SASL  CXC 138 11","2.7"},      {stdlib,"ERTS  CXC 138 10","2.8"},      {kernel,"ERTS  CXC 138 10","4.2"}]}, {os,{unix,linux}}, {erlang_version,     "Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"}, {memory,     [{total,56066752},      {connection_readers,0},      {connection_writers,0},      {connection_channels,0},      {connection_other,2680},      {queue_procs,268248},      {queue_slave_procs,0},      {plugins,1131936},      {other_proc,18144280},      {mnesia,125304},      {mgmt_db,921312},      {msg_index,69440},      {other_ets,1413664},      {binary,755736},      {code,27824046},      {atom,1000601},      {other_system,4409505}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,411294105}, {disk_free_limit,50000000}, {disk_free,13270233088}, {file_descriptors,     [{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]}, {processes,[{limit,1048576},{used,262}]}, {run_queue,0}, {uptime,43651}, {kernel,{net_ticktime,60}}]


停止rabbitmq服務(wù)

[root@super sbin]# service rabbitmq-server stopStopping rabbitmq-server: rabbitmq-server.


啟動第一個節(jié)點(diǎn):

[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start

              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /var/log/rabbitmq/rabbit1.log
  ######  ##        /var/log/rabbitmq/rabbit1-sasl.log
  ##########
              Starting broker...
 completed with 6 plugins.


啟動第二個節(jié)點(diǎn):

web管理插件端口占用,所以還要指定其web插件占用的端口號。

[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start

              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /var/log/rabbitmq/rabbit2.log
  ######  ##        /var/log/rabbitmq/rabbit2-sasl.log
  ##########
              Starting broker...
 completed with 6 plugins.


結(jié)束命令:

rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop


rabbit1操作作為主節(jié)點(diǎn):

[root@super ~]# rabbitmqctl -n rabbit1 stop_app  Stopping node rabbit1@super ...[root@super ~]# rabbitmqctl -n rabbit1 reset	 Resetting node rabbit1@super ...[root@super ~]# rabbitmqctl -n rabbit1 start_appStarting node rabbit1@super ...[root@super ~]#


rabbit2操作為從節(jié)點(diǎn):

[root@super ~]# rabbitmqctl -n rabbit2 stop_appStopping node rabbit2@super ...[root@super ~]# rabbitmqctl -n rabbit2 resetResetting node rabbit2@super ...[root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''內(nèi)是主機(jī)名換成自己的Clustering node rabbit2@super with rabbit1@super ...[root@super ~]# rabbitmqctl -n rabbit2 start_appStarting node rabbit2@super ...


查看集群狀態(tài):

[root@super ~]# rabbitmqctl cluster_status -n rabbit1
Cluster status of node rabbit1@super ...
[{nodes,[{disc,[rabbit1@super,rabbit2@super]}]},
 {running_nodes,[rabbit2@super,rabbit1@super]},
 {cluster_name,<<"rabbit1@super">>},
 {partitions,[]},
 {alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]


web監(jiān)控:

3.3 集群管理

rabbitmqctl join_cluster {cluster_node} [–ram]
將節(jié)點(diǎn)加入指定集群中。在這個命令執(zhí)行前需要停止RabbitMQ應(yīng)用并重置節(jié)點(diǎn)。

rabbitmqctl cluster_status
顯示集群的狀態(tài)。

rabbitmqctl change_cluster_node_type {disc|ram}
修改集群節(jié)點(diǎn)的類型。在這個命令執(zhí)行前需要停止RabbitMQ應(yīng)用。

rabbitmqctl forget_cluster_node [–offline]
將節(jié)點(diǎn)從集群中刪除,允許離線執(zhí)行。

rabbitmqctl update_cluster_nodes {clusternode}

在集群中的節(jié)點(diǎn)應(yīng)用啟動前咨詢clusternode節(jié)點(diǎn)的最新信息,并更新相應(yīng)的集群信息。這個和join_cluster不同,它不加入集群??紤]這樣一種情況,節(jié)點(diǎn)A和節(jié)點(diǎn)B都在集群中,當(dāng)節(jié)點(diǎn)A離線了,節(jié)點(diǎn)C又和節(jié)點(diǎn)B組成了一個集群,然后節(jié)點(diǎn)B又離開了集群,當(dāng)A醒來的時候,它會嘗試聯(lián)系節(jié)點(diǎn)B,但是這樣會失敗,因為節(jié)點(diǎn)B已經(jīng)不在集群中了。

rabbitmqctl cancel_sync_queue [-p vhost] {queue}
取消隊列queue同步鏡像的操作。

rabbitmqctl set_cluster_name {name}
設(shè)置集群名稱。集群名稱在客戶端連接時會通報給客戶端。Federation和Shovel插件也會有用到集群名稱的地方。集群名稱默認(rèn)是集群中第一個節(jié)點(diǎn)的名稱,通過這個命令可以重新設(shè)置。

3.4 RabbitMQ鏡像集群配置

上面已經(jīng)完成RabbitMQ默認(rèn)集群模式,但并不保證隊列的高可用性,盡管交換機(jī)、綁定這些可以復(fù)制到集群里的任何一個節(jié)點(diǎn),但是隊列內(nèi)容不會復(fù)制。雖然該模式解決一項目組節(jié)點(diǎn)壓力,但隊列節(jié)點(diǎn)宕機(jī)直接導(dǎo)致該隊列無法應(yīng)用,只能等待重啟,所以要想在隊列節(jié)點(diǎn)宕機(jī)或故障也能正常應(yīng)用,就要復(fù)制隊列內(nèi)容到集群里的每個節(jié)點(diǎn),必須要創(chuàng)建鏡像隊列。

鏡像隊列是基于普通的集群模式的,然后再添加一些策略,所以你還是得先配置普通集群,然后才能設(shè)置鏡像隊列,我們就以上面的集群接著做。

設(shè)置的鏡像隊列可以通過開啟的網(wǎng)頁的管理端Admin->Policies,也可以通過命令。

rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:“all”}’

  • Name:策略名稱

  • Pattern:匹配的規(guī)則,如果是匹配所有的隊列,是^.

  • Definition:使用ha-mode模式中的all,也就是同步所有匹配的隊列。問號鏈接幫助文檔。

3.5 負(fù)載均衡-HAProxy

HAProxy提供高可用性、負(fù)載均衡以及基于TCP和HTTP應(yīng)用的代理,支持虛擬主機(jī),它是免費(fèi)、快速并且可靠的一種解決方案,包括Twitter,Reddit,StackOverflow,GitHub在內(nèi)的多家知名互聯(lián)網(wǎng)公司在使用。HAProxy實現(xiàn)了一種事件驅(qū)動、單一進(jìn)程模型,此模型支持非常大的并發(fā)連接數(shù)。

3.5.1 安裝HAProxy

//下載依賴包
yum install gcc vim wget//上傳haproxy源碼包
//解壓tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//進(jìn)入目錄、進(jìn)行編譯、安裝cd /usr/local/haproxy-1.6.5make TARGET=linux31 PREFIX=/usr/local/haproxymake install PREFIX=/usr/local/haproxymkdir /etc/haproxy
//賦權(quán)groupadd -r -g 149 haproxyuseradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//創(chuàng)建haproxy配置文件mkdir /etc/haproxyvim /etc/haproxy/haproxy.cfg


3.5.2 配置HAProxy

配置文件路徑:/etc/haproxy/haproxy.cfg

#logging optionsglobal
	log 127.0.0.1 local0 info
	maxconn 5120
	chroot /usr/local/haproxy
	uid 99
	gid 99
	daemon
	quiet
	nbproc 20
	pidfile /var/run/haproxy.pid

defaults
	log global
	
	mode tcp

	option tcplog
	option dontlognull
	retries 3
	option redispatch
	maxconn 2000
	contimeout 5s
   
     clitimeout 60s

     srvtimeout 15s	
#front-end IP for consumers and producterslisten rabbitmq_cluster	bind 0.0.0.0:5672
	
	mode tcp	#balance url_param userid
	#balance url_param session_id check_post 64
	#balance hdr(User-Agent)
	#balance hdr(host)
	#balance hdr(Host) use_domain_only
	#balance rdp-cookie
	#balance leastconn
	#balance source //ip
	
	balance roundrobin
	
        server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
        server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2listen stats	bind 172.16.98.133:8100
	mode http
	option httplog
	stats enable
	stats uri /rabbitmq-stats
	stats refresh 5s


啟動HAproxy負(fù)載

/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy進(jìn)程狀態(tài)ps -ef | grep haproxy

訪問如下地址對mq節(jié)點(diǎn)進(jìn)行監(jiān)控
http://172.16.98.133:8100/rabbitmq-stats


代碼中訪問mq集群地址,則變?yōu)樵L問haproxy地址:5672文章來源地址http://www.zghlxwxcb.cn/news/detail-792066.html


到了這里,關(guān)于RabbitMQ高級特性解析:消息投遞的可靠性保證與消費(fèi)者ACK機(jī)制探究的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ保證消息的可靠投遞,Java實現(xiàn)RabbitMQ消息的可靠投遞,Springboot實現(xiàn)RabbitMQ消息的可靠投遞

    RabbitMQ保證消息的可靠投遞,Java實現(xiàn)RabbitMQ消息的可靠投遞,Springboot實現(xiàn)RabbitMQ消息的可靠投遞

    我們先看一串代碼,并思考一下為什么要先入庫然后發(fā)MQ: 如果先發(fā)MQ的話,如果入庫失敗,就會導(dǎo)致MQ消息無法回滾了。今天我們就好好聊一聊RabbitMQ消息可靠投遞的問題。 ① 消息從生產(chǎn)者發(fā)送到Broker 生產(chǎn)者把消息發(fā)送到Broker之后,如何知道自己的消息有沒有被Broker成功接

    2024年02月11日
    瀏覽(15)
  • 4.RabbitMQ高級特性 冪等 可靠消息 等等

    4.RabbitMQ高級特性 冪等 可靠消息 等等

    保障消息的成功發(fā)出 保障MQ節(jié)點(diǎn)的成功接收 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)確認(rèn)應(yīng)答 完善的消息進(jìn)行補(bǔ)償機(jī)制 消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個應(yīng)答。 生產(chǎn)者進(jìn)行接收應(yīng)答,用來確定這條消息是否正常的發(fā)送到了Broker,這種方式也是

    2024年02月11日
    瀏覽(25)
  • RabbitMQ --- 消息可靠性

    RabbitMQ --- 消息可靠性

    消息隊列在使用過程中,面臨著很多實際問題需要思考: ?? ? 消息從發(fā)送,到消費(fèi)者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 co

    2024年02月14日
    瀏覽(28)
  • rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制 rabbitmq在消息的發(fā)送與接收中,會經(jīng)過上面的流程,這些流程中每一步都有可能導(dǎo)致消息丟失,或者消費(fèi)失敗甚至直接是服務(wù)器宕機(jī)等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機(jī)制 生產(chǎn)者確認(rèn)機(jī)制 消息持久

    2024年02月08日
    瀏覽(37)
  • RabbitMQ消息的可靠性

    面試題: Rabbitmq怎么保證消息的可靠性? 1.消費(fèi)端消息可靠性保證: 消息確認(rèn)(Acknowledgements) : 消費(fèi)者在接收到消息后,默認(rèn)情況下RabbitMQ會自動確認(rèn)消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費(fèi)者在處理完消息后手動發(fā)送確認(rèn)(basicAck)。如果消費(fèi)

    2024年04月14日
    瀏覽(24)
  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費(fèi)者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(30)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場景 1.2 MQ導(dǎo)致消息丟失 1.3 消費(fèi)者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機(jī)制 2.2 生產(chǎn)者確認(rèn)機(jī)制 2.3 實現(xiàn)生產(chǎn)者確認(rèn) 2.3.1 配置yml開啟生產(chǎn)者確認(rèn) 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(24)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費(fèi)者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • RabbitMQ消息可靠性(一)-- 生產(chǎn)者消息確認(rèn)

    RabbitMQ消息可靠性(一)-- 生產(chǎn)者消息確認(rèn)

    目錄 前言 一、消息確認(rèn)流程圖 二、生產(chǎn)者消息確認(rèn) 1、publisher-confirm(發(fā)送者確認(rèn)) 2、publisher-return(發(fā)送者回執(zhí)) 三、代碼實現(xiàn) 1、修改application.yml 配置 2、ConfirmCallback函數(shù)和ReturnCallback函數(shù) 在項目中,引入了RabbitMQ這一中間件,必然也需要在業(yè)務(wù)中增加對數(shù)據(jù)安全性的一

    2024年02月04日
    瀏覽(23)
  • 【RabbitMQ】之消息的可靠性方案

    【RabbitMQ】之消息的可靠性方案

    一、數(shù)據(jù)丟失場景 二、數(shù)據(jù)可靠性方案 1、生產(chǎn)者丟失消息解決方案 2、MQ 隊列丟失消息解決方案 3、消費(fèi)者丟失消息解決方案 MQ 消息數(shù)據(jù)完整的鏈路為 :從 Producer 發(fā)送消息到 RabbitMQ 服務(wù)器中,再由 Broker 服務(wù)的 Exchange 根據(jù) Routing_Key 路由到指定的 Queue 隊列中,最后投送到消

    2024年02月14日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包