国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

(四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列

這篇具有很好參考價值的文章主要介紹了(四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列

消費端限流

(四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

之前我們講過MQ可以對請求進行“削峰填谷”,即通過消費端限流的方式限制消息的拉取速度,達到保護消費端的目的。

1、生產(chǎn)者批量發(fā)送消息

@Test
public void testSendBatch() {
    // 發(fā)送十條消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

2、消費端配置限流機制

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流機制必須開啟手動簽收
       acknowledge-mode: manual
        # 消費端最多拉取5條消息消費,簽收后不滿5條才會繼續(xù)拉取消息。
       prefetch: 5

3、消費者監(jiān)聽隊列

@Component
public class QosConsumer{
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.獲取消息
        System.out.println(new String(message.getBody()));
        // 2.模擬業(yè)務處理
        Thread.sleep(3000);
        // 3.簽收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
   }
}

利用限流實現(xiàn)不公平分發(fā)

在RabbitMQ中,多個消費者監(jiān)聽同一條隊列,則隊列默認采用的輪詢分發(fā)。但是在某種場景下這種策略并不是很好,例如消費者1處 理任務的速度非???,而其他消費者處理速度卻很慢。此時如果采用公平分發(fā),則消費者1有很大一部分時間處于空閑狀態(tài)。此時可以 采用不公平分發(fā),即誰處理的快,誰處理的消息多

1、生產(chǎn)者批量發(fā)送消息

@Test
public void testSendBatch() {
    // 發(fā)送十條消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

端配置不公平分發(fā)

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流機制必須開啟手動簽收
       acknowledge-mode: manual
        # 消費端最多拉取1條消息消費,這樣誰處理的快誰拉取下一條消息,實現(xiàn)了不公平分發(fā)
       prefetch: 1

編寫兩個消費者

@Component
public class UnfairConsumer {
    // 消費者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
     {
        //1.獲取消息
        System.out.println("消費者1:"+new String(message.getBody(),"UTF-8"));
        //2. 處理業(yè)務邏輯
        Thread.sleep(500); // 消費者1處理快
        //3. 手動簽收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
     }
    
    // 消費者2
    @RabbitListener(queues = "my_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception
     {
        //1.獲取消息
        System.out.println("消費者2:"+new String(message.getBody(),"UTF-8"));
        //2. 處理業(yè)務邏輯
        Thread.sleep(3000);// 消費者2處理慢
        //3. 手動簽收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

消息存活時間

RabbitMQ可以設(shè)置消息的存活時間(Time To Live,簡稱TTL), 當消息到達存活時間后還沒有被消費,會被移出隊列。RabbitMQ 可以對隊列的所有消息設(shè)置存活時間,也可以對某條消息設(shè)置存活時間。

設(shè)置隊列所有消息存活時間

1、在創(chuàng)建隊列時設(shè)置其存活時間:

@Configuration
public class RabbitConfig2 {
    private final String EXCHANGE_NAME="my_topic_exchange2";
    private final String QUEUE_NAME="my_queue2";
    // 1.創(chuàng)建交換機
    @Bean("bootExchange2")
    public Exchange getExchange2(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.創(chuàng)建隊列
    @Bean("bootQueue2")
    public Queue getMessageQueue2(){
        return QueueBuilder
               .durable(QUEUE_NAME)
               .ttl(10000) //隊列的每條消息存活10s
               .build();
   }
    // 3.將隊列綁定到交換機
    @Bean
    public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、生產(chǎn)者批量生產(chǎn)消息,測試存活時間

@Test
public void testSendBatch2() throws InterruptedException {
    // 發(fā)送十條消息
    for (int i = 0; i < 10; i++) {
       rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
   }
}

設(shè)置單條消息存活時間

@Test
public void testSendMessage() {
    //設(shè)置消息屬性
    MessageProperties messageProperties = new MessageProperties();
    //設(shè)置存活時間
    messageProperties.setExpiration("10000");
    // 創(chuàng)建消息對象
    Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果設(shè)置了單條消息的存活時間,也設(shè)置了隊列的存活時間,以時間短的為準。

2 消息過期后,并不會馬上移除消息,只有消息消費到隊列頂端時,才會移除該消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.創(chuàng)建消息屬性
MessageProperties messageProperties = new MessageProperties();
// 2.設(shè)置存活時間
messageProperties.setExpiration(“10000”);
// 3.創(chuàng)建消息對象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.發(fā)送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有過期時間,10s后消息并沒有 馬上被移除,但該消息已經(jīng)不會被消費了,當它到達隊列頂 端時會被移除。

優(yōu)先級隊列

假設(shè)在電商系統(tǒng)中有一個訂單催付的場景,即客戶在一段時間內(nèi)未付款會給用戶推送一條短信提醒,但是系統(tǒng)中分為大型商家和小型商家。比如像蘋果,小米這樣大商家一年能給我們創(chuàng)造很大的利潤,所以在訂單量大時,他們的訂單必須得到優(yōu)先處理,此時就需要為不同的消息設(shè)置不同的優(yōu)先級,此時我們要使用優(yōu)先級隊列。

1、創(chuàng)建隊列和交換機

@Configuration
public class RabbitConfig3 {
    private final String EXCHANGE_NAME="priority_exchange";
    private final String QUEUE_NAME="priority_queue";
    // 1.創(chuàng)建交換機
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.創(chuàng)建隊列
    @Bean(QUEUE_NAME)
    public Queue priorityQueue(){
        return QueueBuilder
               .durable(QUEUE_NAME)
                //設(shè)置隊列的最大優(yōu)先級,最大可以設(shè)置到255,官網(wǎng)推薦不要超過10,,如果設(shè)置太高比較浪費資源
               .maxPriority(10)
               .build();
   }
    // 3.將隊列綁定到交換機
    @Bean
    public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、編寫生產(chǎn)者

@Test
public void testPriority() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // i為5時消息的優(yōu)先級較高
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setPriority(9);
            Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
       } else {
           rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
       }
   }
}

3、編寫消費者文章來源地址http://www.zghlxwxcb.cn/news/detail-610949.html

@Component
public class PriorityConsumer {
    @RabbitListener(queues = "priority_queue")
    public void listenMessage(Message message, Channel channel) throws Exception
     {
        //獲取消息
        System.out.println(new String(message.getBody()));
        //手動簽收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

到了這里,關(guān)于(四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • 高級篇-rabbitmq的高級特性

    高級篇-rabbitmq的高級特性

    ? ? ?啟動MQ 創(chuàng)建Queues:? 兩種Callback: 1.ReturnCallback:全局callback? ?2.ComfirmCallback:?發(fā)送信息時候設(shè)置 ? ?執(zhí)行成功: ?監(jiān)控頁面: 模擬失?。??1.投遞到交互機失敗 2.投遞到交換機了,但是沒有進入隊列? ? ?注意: ? 演示數(shù)據(jù)是否默認持久化:? ? ? ?重啟mq: ?1. 交互機、

    2024年02月09日
    瀏覽(26)
  • RabbitMQ(四):RabbitMQ高級特性

    RabbitMQ(四):RabbitMQ高級特性

    消息隊列在使用過程中,面臨著很多實際問題需要思考: 消息可靠性問題:如何確保發(fā)送的消息至少被消費—次 延遲消息問題:如何實現(xiàn)消息的延遲投遞 消息堆積問題:如何解決數(shù)百萬消息堆積,無法及時消費的問題 高可用問題:如何避免單點的MQ故障而導致的不可用問題

    2024年01月19日
    瀏覽(15)
  • RabbitMQ消息隊列高級特性

    在線上生產(chǎn)環(huán)境中,RabbitMQ可能會產(chǎn)生消息丟失或者是投遞失敗的一個場景,RabbitMQ為了避免這種場景的發(fā)生,提供了兩種方式來控制消息傳遞的可靠性。 Confirm確認模式 消息從生產(chǎn)者到MQ的Exchange過程中,如果消息成功到達,則會返回一個ConfirmCallback的確認函數(shù)。 Return退回模

    2024年02月12日
    瀏覽(22)
  • 學習RabbitMQ高級特性

    學習RabbitMQ高級特性

    了解熟悉RabbitMQ的高級特性 1、消息可靠性投遞 【confirm 確認模式、return 退回模式】 2、Consumer ACK 【acknowledge】 3、消費端限流 【prefetch】 4、TTL過期時間 【time to live】 5、死信隊列 【Dead Letter Exchange】 6、延遲隊列 【rabbitmq-delayed-message-exchange】 7、優(yōu)先級隊列 【x-max-priority】

    2024年02月04日
    瀏覽(20)
  • RabbitMQ之高級特性

    RabbitMQ之高級特性

    提示:以下是本篇文章正文內(nèi)容,RabbitMQ 系列學習將會持續(xù)更新 官網(wǎng) :https://www.rabbitmq.com RabbitMQ 消息確定主要分為兩部分: 第一種是 消息發(fā)送確認 。這種是用來確認生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。 確認發(fā)送的第一步是確認是

    2023年04月10日
    瀏覽(15)
  • RabbitMQ的高級特性及其特點

    RabbitMQ的高級特性及其特點

    1、應用解耦 提高系統(tǒng)容錯性和可維護性 在訂單系統(tǒng)中,可以通過遠程調(diào)用直接調(diào)用庫存系統(tǒng),支付系統(tǒng),物流系統(tǒng)。 但是這三個系統(tǒng)耦合度太高了,因為訂單系統(tǒng)下完訂單首先去庫存系統(tǒng)將庫存-1,然后將返回值返回給訂單系統(tǒng),然后通過訂單系統(tǒng)的返回結(jié)果來在支付系統(tǒng)

    2024年02月08日
    瀏覽(26)
  • rabbitmq筆記-rabbitmq進階-數(shù)據(jù)可靠性,rabbitmq高級特性

    消息何去何從 mandatory和immediate是channel.basicPublish方法的兩個參數(shù),都有消息傳遞過程中不可達目的地時將消息返回給生產(chǎn)者的功能。 mandatory參數(shù) true:交換器無法根據(jù)自身的類型 和路由鍵找到符合條件的隊列,rabbitmq調(diào)用Basic.Return命令將消息返回給生產(chǎn)者 生產(chǎn)者調(diào)用channel.

    2024年02月10日
    瀏覽(45)
  • 4.RabbitMQ高級特性 冪等 可靠消息 等等

    4.RabbitMQ高級特性 冪等 可靠消息 等等

    保障消息的成功發(fā)出 保障MQ節(jié)點的成功接收 發(fā)送端收到MQ節(jié)點(Broker)確認應答 完善的消息進行補償機制 消息的確認,是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個應答。 生產(chǎn)者進行接收應答,用來確定這條消息是否正常的發(fā)送到了Broker,這種方式也是

    2024年02月11日
    瀏覽(25)
  • RabbitMQ養(yǎng)成記 (10.高級特性:死信隊列,延遲隊列)

    RabbitMQ養(yǎng)成記 (10.高級特性:死信隊列,延遲隊列)

    這個概念 在其他MQ產(chǎn)品里面也是有的,只不過在Rabbitmq中稍微特殊一點 什么叫私信隊列呢? 就是當消息成為 dead message之后,可以重新發(fā)到另外一臺交換機,這個交換機就是DLX。 注意這里的有翻譯歧義, 這里的DLX 指的是 交換機 ,而不是一個隊列。 隊列的消息長度 到達限制

    2024年02月05日
    瀏覽(18)
  • RabbitMQ高級特性2 、TTL、死信隊列和延遲隊列

    RabbitMQ高級特性2 、TTL、死信隊列和延遲隊列

    設(shè)置 消費者 測試 添加多條消息 拉取消息 每隔20秒拉取一次 一次拉取五條 然后在20秒內(nèi)一條一條消費 Time To Live(存活時間/過期時間)。 當消息到達存活時間后,還沒有被消費,會被自動清除。 RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。 可

    2024年01月16日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包