国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

服務(wù)異步通信-高級(jí)篇(RabbitMQ)

這篇具有很好參考價(jià)值的文章主要介紹了服務(wù)異步通信-高級(jí)篇(RabbitMQ)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1. 消息可靠性

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:

  • 發(fā)送時(shí)丟失:
    • 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
    • 消息到達(dá)exchange后未到達(dá)queue
  • MQ宕機(jī),queue將消息丟失
  • consumer接收到消息后未消費(fèi)就宕機(jī)

RabbitMQ分別給出了解決方案:

  • 生產(chǎn)者發(fā)送確認(rèn)機(jī)制
  • mq持久化
  • 消費(fèi)者消費(fèi)確認(rèn)機(jī)制
  • 失敗重試機(jī)制

1.1. 生產(chǎn)者消息確認(rèn)

RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。

這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。

返回結(jié)果有兩種方式:

  • publisher-confirm,發(fā)送者確認(rèn)
    • 消息成功投遞到交換機(jī),返回ack
    • 消息未投遞到交換機(jī),返回nack
  • publisher-return,發(fā)送者回執(zhí)
    • 消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。

注意:確認(rèn)機(jī)制在發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一的id,以區(qū)分不同的消息,避免ack沖突

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

1.1.1. 修改配置

修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:

spring:
  rabbitmq:
    # 啟用發(fā)送者確認(rèn),生產(chǎn)者到交換機(jī)
    publisher-confirm-type: correlated
    # 啟用發(fā)送者確認(rèn),從交換機(jī)到隊(duì)列
    publisher-returns: true
  	# 啟用失敗的回調(diào)
    template:
      mandatory: true

說明:

  • publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
    • simple:同步等待confirm結(jié)果,直到超時(shí)
    • correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
  • publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
  • template.mandatory:定義消息路由失敗時(shí)的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息

1.1.2. 定義ConfirmCallback

對(duì)發(fā)送者把消息發(fā)送到交換機(jī)進(jìn)行確認(rèn)

在發(fā)送消息時(shí)指定

@Test
    public void test() throws InterruptedException {
        String exchanges = "itcast.direct";
        String routingKey = "phone";
        String message = "hello, spring amqp!";
        CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
        //發(fā)送的通知回調(diào)
        correlation.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息發(fā)送異常",ex);
            }
            //生產(chǎn)者正常把消息發(fā)出來了
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                boolean ack = result.isAck();
                if (ack){
                    log.info("交換正常收到消息");
                }else {
                    log.info("交換機(jī)沒有收到消息");
                }
            }
        });
        rabbitTemplate.convertAndSend(exchanges, routingKey, message,correlation);
    }

1.1.3. 定義Return回調(diào)

從交換機(jī)到隊(duì)列的確認(rèn)

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)配置:

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        //發(fā)送到隊(duì)列失敗
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("交換機(jī)路由到隊(duì)列失敗,message:{} replyCode:{}  replyText:{} exchange:{} routingKey:{}",
                     message, replyCode, replyText, exchange, routingKey);
        }
    });
}

1.2. 消息持久化

生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。

要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。

  • 交換機(jī)持久化
  • 隊(duì)列持久化
  • 消息持久化

1.2.1. 交換機(jī)持久化

RabbitMQ中交換機(jī)默認(rèn)是持久化的。

SpringAMQP中可以通過代碼指定交換機(jī)持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除
    return new DirectExchange("simple.direct", true, false);
}

默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。

可以在RabbitMQ控制臺(tái)看到持久化的交換機(jī)都會(huì)帶上D的標(biāo)示:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

1.2.2. 隊(duì)列持久化

RabbitMQ中隊(duì)列默認(rèn)是持久化的。

SpringAMQP中可以通過代碼指定交換機(jī)持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder構(gòu)建隊(duì)列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。

1.2.3. 消息持久化

利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。

1.3. 消費(fèi)者消息確認(rèn)

RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。

RabbitMQ是通過消費(fèi)者回執(zhí)來確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。

回執(zhí)分為3種類型:ack、nack、reject。如果消費(fèi)者返回ack,MQ會(huì)把消息從隊(duì)列刪除;如果返回nack或者reject,如果requeue是true的話,MQ會(huì)把消息重新投遞給消費(fèi)者,如果requeue是false的話,MQ會(huì)把消息刪掉。

1.3.1. 場(chǎng)景:

  • 1)RabbitMQ投遞消息給消費(fèi)者
  • 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除消息
  • 4)消費(fèi)者宕機(jī),消息尚未處理

這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。

SpringAMQP則允許配置三種確認(rèn)模式:

  • manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。消息投遞是不可靠的,可能丟失。自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack。(不推薦)
  • auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack,并且requeue是true。類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒有異常,返回ack。
  • none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除(不推薦)

1.3.2. 演示none模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 關(guān)閉ack

修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理異常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
	log.info("消費(fèi)者接收到simple.queue的消息:【{}】", msg);
	// 模擬異常
	System.out.println(1 / 0);
	log.debug("消息處理完成!");
}

測(cè)試可以發(fā)現(xiàn),當(dāng)消息處理拋異常時(shí),消息依然被RabbitMQ刪除了。

1.3.3. 演示auto模式

把確認(rèn)機(jī)制修改為auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 關(guān)閉ack

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

1.4. 消費(fèi)失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

1.4.1. 本地重試

可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制的requeue到mq隊(duì)列。

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費(fèi)者失敗重試
          initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長為1秒
          multiplier: 3 # 失敗的等待時(shí)長倍數(shù),下次等待時(shí)長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false
          
 第一次  1秒
 第二次  3*1000 = 3秒
 第三次  3*3000 = 9秒
 第四次  3*9000 = 27秒

重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):

  • 在重試3次后,SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException,說明本地重試觸發(fā)了
  • 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是reject,并且不重新入隊(duì),mq刪除消息了

結(jié)論:

  • 開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
  • 重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄

1.4.2. 失敗策略

在之前的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。

在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer

失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    // 定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

1.5. 總結(jié)

如何確保RabbitMQ消息的可靠性?

  • 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
  • 開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
  • 開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
  • 開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理

2. 死信交換機(jī)

2.1. 什么是死信交換機(jī)

當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):

  • 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
  • 消息是一個(gè)過期消息,超時(shí)無人消費(fèi)
  • 要投遞的隊(duì)列消息滿了,無法投遞

如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,檢查DLX)。

一個(gè)消息被消費(fèi)者拒絕了,變成了死信。因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī)。如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列。

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:

  • 死信交換機(jī)名稱
  • 死信交換機(jī)與死信隊(duì)列綁定的RoutingKey

2.2. TTL

一個(gè)隊(duì)列中的消息如果超時(shí)未消費(fèi),則會(huì)變?yōu)樗佬?,超時(shí)分為兩種情況:

  • 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間
  • 消息本身設(shè)置了超時(shí)時(shí)間

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

2.2.1. 接收超時(shí)死信的死信交換機(jī)

在consumer服務(wù)的SpringRabbitListener中,定義一個(gè)新的消費(fèi)者,并且聲明 死信交換機(jī)、死信隊(duì)列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.exchange"),
    key = "dl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}

2.2.2. 聲明一個(gè)隊(duì)列,并且指定TTL

要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化
        .ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒
        .deadLetterExchange("dl.ttl.exchange") // 指定死信交換機(jī)
        .deadLetterRoutingKey("dl") // 指定死信交換機(jī)綁定key
        .build();
}

這個(gè)隊(duì)列設(shè)定了死信交換機(jī)為dl.ttl.exchange

聲明交換機(jī),將ttl與交換機(jī)綁定:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

發(fā)送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {
    // 創(chuàng)建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 記錄日志
    log.debug("發(fā)送消息成功");
}

發(fā)送消息的日志:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

查看下接收消息的日志:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

2.2.3. 發(fā)送消息時(shí),設(shè)定TTL

@Test
public void testTTLMsg() {
    // 創(chuàng)建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("發(fā)送消息成功");
}

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

這次,發(fā)送與接收的延遲只有5秒。說明當(dāng)隊(duì)列、消息都設(shè)置了TTL時(shí),任意一個(gè)到期就會(huì)成為死信。

2.2.4. 總結(jié)

消息超時(shí)的兩種方式是?

  • 給隊(duì)列設(shè)置ttl屬性,進(jìn)入隊(duì)列后超過ttl時(shí)間的消息變?yōu)樗佬?/li>
  • 給消息設(shè)置ttl屬性,隊(duì)列接收到消息超過ttl時(shí)間后變?yōu)樗佬?/li>

如何實(shí)現(xiàn)發(fā)送一個(gè)消息20秒后消費(fèi)者才收到消息?

  • 給消息的目標(biāo)隊(duì)列指定死信交換機(jī)
  • 將消費(fèi)者監(jiān)聽的隊(duì)列綁定到死信交換機(jī)
  • 發(fā)送消息時(shí)給消息設(shè)置超時(shí)時(shí)間為20秒

死信交換機(jī)和ttl實(shí)現(xiàn)消息延遲發(fā)送的缺點(diǎn)?

  • 存在隊(duì)頭阻塞問題

2.3. 延遲交換機(jī)

利用TTL結(jié)合死信交換機(jī),實(shí)現(xiàn)了消費(fèi)者延遲收到消息的效果。這種消息模式就稱為延遲隊(duì)列(Delay Queue)模式。

但是死信交換機(jī)存在隊(duì)頭阻塞問題,因此并不推薦使用。

因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了 DelayExchange 插件,原生支持延遲隊(duì)列效果。

2.3.1. 安裝DelayExchange插件

上傳插件

查看數(shù)據(jù)卷:

docker volume inspect mq-plugins

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

將插件上傳到這個(gè)目錄即可

安裝插件

進(jìn)入MQ容器內(nèi)部來執(zhí)行安裝。我的容器名為mq,所以執(zhí)行下面命令:

docker exec -it mq bash

進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開啟插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

2.3.2. DelayExchange原理

2.3.2.1. 聲明DelayExchange交換機(jī)

插件的使用也非常簡單:聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊(duì)列與其綁定即可。

基于注解方式(推薦):

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

也可以基于@Bean的方式:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

2.3.2.2. 發(fā)送消息

發(fā)送消息時(shí),一定要攜帶x-delay屬性,指定延遲的時(shí)間:

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

2.3.3. 總結(jié)

延遲隊(duì)列插件的使用步驟包括哪些?

  • 聲明一個(gè)交換機(jī),添加delayed屬性為true
  • 發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間

3. 惰性隊(duì)列

3.1. 消息堆積問題

當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

解決消息堆積有兩種思路:

  • 增加更多消費(fèi)者,提高消費(fèi)速度。也就是我們之前說的work queue模式
  • 擴(kuò)大隊(duì)列容積,提高堆積上限

3.2. 惰性隊(duì)列

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:

  • 接收到消息后直接存入磁盤而非內(nèi)存
  • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存
  • 支持?jǐn)?shù)百萬條的消息存儲(chǔ)

3.2.1. 基于命令行設(shè)置lazy-queue

而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可。

可以通過命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個(gè)策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字
  • '{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式
  • --apply-to queues:策略的作用對(duì)象,是所有的隊(duì)列

3.2.2. 基于@Bean聲明lazy-queue

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

3.2.3. Queue的管理控制臺(tái)

服務(wù)異步通信-高級(jí)篇(RabbitMQ),java,運(yùn)維

ready:已發(fā)送到隊(duì)列,但還未發(fā)送給消費(fèi)者的消息,也就是僅執(zhí)行了publish的消息。

unacked:已發(fā)送給消費(fèi)者但還未收到消費(fèi)者ack的信息,即執(zhí)行了publish和delivery的消息。

total:ready與unacked消息的總和。

in memory:表示在內(nèi)存中進(jìn)行緩存的消息數(shù)。

persistent:表示在磁盤上存儲(chǔ)的持久化的消息數(shù)。

transient, paged out:表示非持久化的消息,但實(shí)際存儲(chǔ)到磁盤上的消息數(shù)(可能因?yàn)閮?nèi)存達(dá)到一定水位觸發(fā)的置換,也可能是隊(duì)列為lazy模式,即便是非持久化的消息也存儲(chǔ)在磁盤上了)

3.2.4. 總結(jié)

消息堆積問題的解決方案?

  • 隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度
  • 使用惰性隊(duì)列,可以再mq中保存更多消息

惰性隊(duì)列的優(yōu)點(diǎn)有哪些?

  • 基于磁盤存儲(chǔ),消息上限高
  • 沒有間歇性的page-out,性能比較穩(wěn)定

惰性隊(duì)列的缺點(diǎn)有哪些?文章來源地址http://www.zghlxwxcb.cn/news/detail-786769.html

  • 基于磁盤存儲(chǔ),消息時(shí)效性會(huì)降低
  • 性能受限于磁盤的IO

到了這里,關(guān)于服務(wù)異步通信-高級(jí)篇(RabbitMQ)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 微服務(wù)——服務(wù)異步通訊RabbitMQ

    微服務(wù)——服務(wù)異步通訊RabbitMQ

    ?前置文章 消息隊(duì)列——RabbitMQ基本概念+容器化部署和簡單工作模式程序_北嶺山腳鼠鼠的博客-CSDN博客 消息隊(duì)列——rabbitmq的不同工作模式_北嶺山腳鼠鼠的博客-CSDN博客 消息隊(duì)列——spring和springboot整合rabbitmq_北嶺山腳鼠鼠的博客-CSDN博客 目錄 Work queues 工作隊(duì)列模式? 案例

    2024年02月15日
    瀏覽(22)
  • 服務(wù)異步通訊——RabbitMQ

    服務(wù)異步通訊——RabbitMQ

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊 :就像打電話,需要實(shí)時(shí)響應(yīng)。 異步通訊 :就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是一個(gè)人卻不能跟多個(gè)人同時(shí)通話。而發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延

    2024年01月18日
    瀏覽(21)
  • SpringCloud學(xué)習(xí)路線(9)——服務(wù)異步通訊RabbitMQ

    SpringCloud學(xué)習(xí)路線(9)——服務(wù)異步通訊RabbitMQ

    一、初見MQ (一)什么是MQ? MQ(MessageQueue) ,意思是 消息隊(duì)列 ,也就是事件驅(qū)動(dòng)架構(gòu)中的Broker。 (二)同步調(diào)用 1、概念: 同步調(diào)用是指,某一服務(wù)需要多個(gè)服務(wù)共同參與,但多個(gè)服務(wù)之間有一定的執(zhí)行順序,當(dāng)每一個(gè)服務(wù)都需要等待前面一個(gè)服務(wù)完成才能繼續(xù)執(zhí)行。

    2024年02月15日
    瀏覽(19)
  • 微服務(wù)之異步消息通信

    微服務(wù)之異步消息通信

    Informal Essay By English I’m sorry that I haven’t updated the article lately because the blogger has been busy with interviews and summarizing their experience. I will create a special article to describe the recent events. Next, let’s get to the topic! 參考書籍: “鳳凰架構(gòu)” “微服務(wù)架構(gòu)設(shè)計(jì)模式” 引言 以往的文章中我們

    2024年02月05日
    瀏覽(19)
  • 微服務(wù)通信[HTTP|RPC同步通信、MQ異步通信]

    微服務(wù)通信[HTTP|RPC同步通信、MQ異步通信]

    A服務(wù)調(diào)用B服務(wù),B服務(wù)調(diào)C服務(wù),C服務(wù)調(diào)D服務(wù),即微服務(wù)之間的通信(也可以叫微服務(wù)之間的調(diào)用) 一種輕量級(jí)的通信協(xié)議,常用于在不同的微服務(wù)之間進(jìn)行通信,也是最簡單的通信方式 使用REST ful為開發(fā)規(guī)范,將服務(wù)對(duì)外暴露的HTTP調(diào)用方式為REST API(如GET、POST、PUT、DELETE等),已經(jīng)成

    2024年02月09日
    瀏覽(49)
  • 微服務(wù)中間件--MQ服務(wù)異步通信

    微服務(wù)中間件--MQ服務(wù)異步通信

    MQ的一些常見問題 消息可靠性問題 消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī)

    2024年02月11日
    瀏覽(25)
  • 微服務(wù)RabbitMQ高級(jí)篇

    微服務(wù)RabbitMQ高級(jí)篇

    目錄 一.消息可靠性傳遞概述 二.生產(chǎn)者消息確認(rèn)機(jī)制 三.publisher-comfirm 四.publisher-return 五.消息持久化 六.消費(fèi)者消息確認(rèn)機(jī)制 七.如何確保RabbitMQ消息的可靠性? 八.死信交換機(jī) 九.延遲隊(duì)列 十.惰性隊(duì)列 十一.MQ集群 生產(chǎn)者發(fā)送消息到交換機(jī),交換機(jī)將消息路由到隊(duì)列,消費(fèi)者

    2024年02月20日
    瀏覽(17)
  • Java中如何使用消息隊(duì)列實(shí)現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息隊(duì)列實(shí)現(xiàn)異步處理。下面是一個(gè)簡單的示例代碼,用于說明如何使用 ActiveMQ 實(shí)現(xiàn)消息隊(duì)列異步處理: 添加 ActiveMQ 依賴 在 pom.xml 文件中添加以下依賴: 創(chuàng)建消息隊(duì)列 創(chuàng)建一個(gè)名為 “TestQueue” 的消息隊(duì)列,并配置 ActiveMQ 連接信息: 創(chuàng)建消息消費(fèi)者

    2024年02月16日
    瀏覽(33)
  • 探索 XMLHttpRequest:網(wǎng)頁與服務(wù)器的異步通信之道(下)

    探索 XMLHttpRequest:網(wǎng)頁與服務(wù)器的異步通信之道(下)

    ?? 前端開發(fā)工程師、技術(shù)日更博主、已過CET6 ?? 阿珊和她的貓_ CSDN 博客專家、23年度博客之星前端領(lǐng)域TOP1 ?? ???高級(jí)專題作者、打造專欄《前端面試必備》 、《2024面試高頻手撕題》 ?? 藍(lán)橋云課 簽約作者、上架課程《Vue.js 和 Egg.js 開發(fā)企業(yè)級(jí)健康管理項(xiàng)目》、《帶你

    2024年02月20日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包