国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

服務(wù)異步通訊——RabbitMQ

這篇具有很好參考價值的文章主要介紹了服務(wù)異步通訊——RabbitMQ。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

RabbitMQ

微服務(wù)間通訊有同步和異步兩種方式:

同步通訊:就像打電話,需要實時響應(yīng)。

異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是一個人卻不能跟多個人同時通話。而發(fā)送郵件可以同時與多個人收發(fā)郵件,但是往往響應(yīng)會有延遲。

同步調(diào)用的優(yōu)點:

  • 時效性較強(qiáng),可以立即得到結(jié)果

同步調(diào)用的問題:

  • 耦合度高:每次加入新的需求,都要修改原來的代碼。
  • 性能和吞吐能力下降:調(diào)用者需要等待服務(wù)提供者響應(yīng),如果調(diào)用鏈過長則響應(yīng)時間等于每次調(diào)用的時間之和。
  • 有額外的資源消耗:調(diào)用鏈中的每個服務(wù)在等待響應(yīng)過程中,不能釋放請求占用的資源,高并發(fā)場景下會極度浪費系統(tǒng)資源。
  • 有級聯(lián)失敗問題:如果服務(wù)提供者出現(xiàn)問題,所有調(diào)用方都會跟著出問題,如同多米諾骨牌一樣,迅速導(dǎo)致整個微服務(wù)群故障。

異步通訊

異步調(diào)用則可以避免上述問題:

以購買商品為例,用戶支付后需要調(diào)用訂單服務(wù)完成訂單狀態(tài)修改,調(diào)用物流服務(wù),從倉庫分配響應(yīng)的庫存并準(zhǔn)備發(fā)貨。

在事件模式中,支付服務(wù)是事件發(fā)布者(publisher),在支付完成后只需要發(fā)布一個支付成功的事件(event),事件中帶上訂單id。

訂單服務(wù)和物流服務(wù)是事件訂閱者(Consumer),訂閱支付成功的事件,監(jiān)聽到事件后完成自己業(yè)務(wù)即可。

為了解除事件發(fā)布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個中間人(Broker)。發(fā)布者發(fā)布事件到Broker,不關(guān)心誰來訂閱事件。訂閱者從Broker訂閱事件,不關(guān)心誰發(fā)來的消息。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

Broker 是一個類似于數(shù)據(jù)總線一樣的東西,所有的服務(wù)要接收數(shù)據(jù)和發(fā)送數(shù)據(jù)都發(fā)到這個總線上,這個總線就像協(xié)議一樣,讓服務(wù)間的通訊變得標(biāo)準(zhǔn)和可控。

異步通訊的優(yōu)點:

  • 吞吐量提升:無需等待訂閱者處理完成,響應(yīng)更快速

  • 故障隔離:服務(wù)沒有直接調(diào)用,不存在級聯(lián)失敗問題

  • 調(diào)用間沒有阻塞,不會造成無效的資源占用

  • 耦合度極低,每個服務(wù)都可以靈活插拔,可替換

  • 流量削峰:不管發(fā)布事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件

缺點:

  • 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好管理
  • 需要依賴于Broker的可靠、安全、性能。

技術(shù)對比

現(xiàn)在開源軟件或云平臺上 Broker 的軟件是非常成熟的,比較常見的一種就是MQ技術(shù)。

MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅(qū)動架構(gòu)中的Broker。

比較常見的MQ實現(xiàn):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

幾種常見MQ的對比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社 區(qū) Rabbit Apache 阿里 Apache
開發(fā)語言 Erlang Java Java Scala&Java
協(xié)議支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協(xié)議 自定義協(xié)議
可用性 一般
單機(jī)吞吐量 一般 非常高
消息延遲 微秒級 毫秒級 毫秒級 毫秒以內(nèi)
消息可靠性 一般 一般

如果追求可用性,可以選擇Kafka、 RocketMQ 、RabbitMQ

如果追求可靠性,可以選擇RabbitMQ、RocketMQ

如果追求吞吐能力,選擇RocketMQ、Kafka

追求消息低延遲,選擇RabbitMQ、Kafka

為什么選擇RabbitMQ而不是其它的MQ?

kafka是以吞吐量高而聞名,不過其數(shù)據(jù)穩(wěn)定性一般,而且無法保證消息有序性。RabbitMQ基于面向并發(fā)的語言Erlang開發(fā),吞吐量不如Kafka,但消息可靠性較好,并且消息延遲極低,集群搭建比較方便。支持多種協(xié)議,并且有各種語言的客戶端,比較靈活。Spring對RabbitMQ的支持也比較好,使用起來比較方便。


安裝RabbitMQ

在Centos7虛擬機(jī)中使用Docker來安裝。

下載鏡像

在線拉取鏡像

docker pull rabbitmq:3-management

安裝MQ

執(zhí)行下面的命令來運行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq1 \  
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

RabbitMQ的基本結(jié)構(gòu):

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

RabbitMQ中的一些角色:

  • publisher:生產(chǎn)者
  • consumer:消費者
  • exchange:交換機(jī),負(fù)責(zé)消息路由
  • queue:隊列,存儲消息
  • Virtual Host:虛擬主機(jī),隔離不同租戶的exchange、queue、消息的隔離

RabbitMQ消息模型

MQ的官方文檔中給出了5個MQ的Demo示例,對應(yīng)了幾種不同的用法:

  • 基本消息隊列(BasicQueue)

  • 工作消息隊列(WorkQueue)

發(fā)布訂閱(Publish、Subscribe),又根據(jù)交換機(jī)類型不同分為三種:

  • Fanout Exchange:廣播

  • Direct Exchange:路由

  • Topic Exchange:主題

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

入門案例

簡單隊列模式的模型圖:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

官方的HelloWorld是基于最基礎(chǔ)的消息隊列模型來實現(xiàn)的,只包括三個角色:

  • publisher:消息發(fā)布者,將消息發(fā)送到隊列queue
  • queue:消息隊列,負(fù)責(zé)接受并緩存消息
  • consumer:訂閱隊列,處理隊列中的消息

publisher實現(xiàn)

實現(xiàn)思路:

  • 建立連接
  • 創(chuàng)建Channel
  • 聲明隊列
  • 發(fā)送消息
  • 關(guān)閉連接和channel

代碼實現(xiàn):

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
        factory.setHost("192.168.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();

        // 3.創(chuàng)建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.發(fā)送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("發(fā)送消息成功:【" + message + "】");

        // 5.關(guān)閉通道和連接
        channel.close();
        connection.close();

    }
}

consumer實現(xiàn)

實現(xiàn)思路:

  • 建立連接
  • 創(chuàng)建Channel
  • 聲明隊列
  • 訂閱消息

代碼實現(xiàn):

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
        factory.setHost("192.168.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();

        // 3.創(chuàng)建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.訂閱消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.處理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

總結(jié)

基本消息隊列的消息發(fā)送流程:

  1. 建立connection

  2. 創(chuàng)建channel

  3. 利用channel聲明隊列

  4. 利用channel向隊列發(fā)送消息

基本消息隊列的消息接收流程:

  1. 建立connection

  2. 創(chuàng)建channel

  3. 利用channel聲明隊列

  4. 定義consumer的消費行為handleDelivery()

  5. 利用channel將消費者與隊列綁定


SpringAMQP

SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

SpringAMQP提供了三個功能:

  • 自動聲明隊列、交換機(jī)及其綁定關(guān)系
  • 基于注解的監(jiān)聽器模式,異步接收消息
  • 封裝了RabbitTemplate工具,用于發(fā)送消息

Basic Queue 簡單隊列模型

在父工程mq-demo中引入依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

消息發(fā)送

首先配置MQ地址,在publisher服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.xxx.xxx # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī)
    username: admin # 用戶名
    password: 123456 # 密碼

然后在publisher服務(wù)中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現(xiàn)消息發(fā)送:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 隊列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息接收

首先配置MQ地址,在consumer服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.xxx.xxx # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī)
    username: admin # 用戶名
    password: 123456 # 密碼

然后在consumer服務(wù)的listener包中新建一個類SpringRabbitListener,代碼如下:

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消費者接收到消息:【" + msg + "】");
    }
}

啟動consumer服務(wù),然后在publisher服務(wù)中運行測試代碼,發(fā)送MQ消息。

WorkQueue

Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

當(dāng)消息處理比較耗時的時候,可能生產(chǎn)消息的速度會遠(yuǎn)遠(yuǎn)大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,多個消費者共同處理消息處理,效率就能大大提高了。

消息發(fā)送

利用循環(huán)發(fā)送,來模擬大量消息堆積現(xiàn)象。

在publisher服務(wù)中的SpringAmqpTest類中添加一個測試方法:

/**
     * workQueue
     * 向隊列中不停發(fā)送消息,模擬消息堆積。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 隊列名稱
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

要模擬多個消費者綁定同一個隊列,我們在consumer服務(wù)的SpringRabbitListener中添加2個新的方法:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到這個消費者sleep了1000秒,模擬任務(wù)耗時。

測試

啟動ConsumerApplication后,在執(zhí)行publisher服務(wù)中剛剛編寫的發(fā)送測試方法testWorkQueue。

可以看到消費者1很快完成了自己的25條消息。消費者2卻在緩慢的處理自己的25條消息。

也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。這樣顯然是有問題的。

在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務(wù)的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

總結(jié)

Work模型的使用過程:

  • 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
  • 通過設(shè)置prefetch來控制消費者預(yù)取的消息數(shù)量

發(fā)布/訂閱

發(fā)布訂閱的模型如圖:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊列中,而是發(fā)給X(交換機(jī))
  • Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊列
    • Direct:定向,把消息交給符合指定routing key 的隊列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
  • Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
  • Queue:消息隊列也與以前一樣,接收消息、緩存消息。

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!

Fanout

Fanout,英文翻譯是扇出,在MQ中譯為廣播更合適。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

在廣播模式下,消息發(fā)送流程是這樣的:

  1. 可以有多個隊列

  2. 每個隊列都要綁定到Exchange(交換機(jī))

  3. 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個隊列,生產(chǎn)者無法決定

  4. 交換機(jī)把消息發(fā)送給綁定過的所有隊列

  5. 訂閱隊列的消費者都能拿到消息

案例測試:

  • 創(chuàng)建一個交換機(jī) itcast.fanout,類型是Fanout
  • 創(chuàng)建兩個隊列fanout.queue1和fanout.queue2,綁定到交換機(jī)itcast.fanout

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

聲明隊列和交換機(jī)

Spring提供了一個接口Exchange,來表示所有不同類型的交換機(jī):

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

在consumer中創(chuàng)建一個類,聲明隊列和交換機(jī):

@Configuration
public class FanoutConfig {
    /**
     * 聲明交換機(jī)
     * @return Fanout類型交換機(jī)
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 第1個隊列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 綁定隊列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2個隊列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 綁定隊列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

@Test
public void testFanoutExchange() {
    // 隊列名稱
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收

在consumer服務(wù)的SpringRabbitListener中添加兩個方法,作為消費者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}

小結(jié)

交換機(jī)的作用是什么?

  • 接收publisher發(fā)送的消息。
  • 將消息按照規(guī)則路由到與之綁定的隊列。
  • 不能緩存消息,路由失敗,消息丟失。
  • FanoutExchange的會將消息路由到每個綁定的隊列。

聲明隊列、交換機(jī)、綁定關(guān)系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

Direct

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

在Direct模型下:

  • 隊列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

案例需求如下

  1. 利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2

  3. 在publisher中編寫測試方法,向itcast. direct發(fā)送消息。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

基于注解聲明隊列和交換機(jī)

基于@Bean的方式聲明隊列和交換機(jī)比較麻煩,Spring還提供了基于注解方式來聲明。

在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來聲明隊列和交換機(jī):

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】");
}

消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

@Test
public void testSendDirectExchange() {
    // 交換機(jī)名稱
    String exchangeName = "itcast.direct";
    // 消息
    String message = "紅色警報!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

總結(jié)

描述下Direct交換機(jī)與Fanout交換機(jī)的差異?

  • Fanout交換機(jī)將消息路由給每一個與之綁定的隊列。
  • Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個隊列。
  • 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似。

基于@RabbitListener注解聲明隊列和交換機(jī)有哪些常見注解?

  • @Queue
  • @Exchange

Topic

說明

Topic類型的ExchangeDirect相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

舉例:

item.#:能夠匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

圖示

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

解釋:

  • Queue1:綁定的是china.# ,因此凡是以 china.開頭的routing key 都會被匹配到。包括china.news和china.weather
  • Queue2:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會被匹配。包括china.news和japan.news

實現(xiàn)思路如下:

  1. 并利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽topic.queue1和topic.queue2

  3. 在publisher中編寫測試方法,向item. topic發(fā)送消息。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

/**
     * topicExchange
     */
@Test
public void testSendTopicExchange() {
    // 交換機(jī)名稱
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜報!孫悟空大戰(zhàn)哥斯拉,勝!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】");
}

總結(jié)

描述下Direct交換機(jī)與Topic交換機(jī)的差異?

  • Topic交換機(jī)接收的消息RoutingKey必須是多個單詞,以 **.** 分割
  • Topic交換機(jī)與隊列綁定時的bindingKey可以指定通配符
  • #:代表0個或多個詞
  • *:代表1個詞

消息轉(zhuǎn)換器

Spring會把發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時候,還會把字節(jié)反序列化為Java對象。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

只不過,默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 數(shù)據(jù)體積過大
  • 有安全漏洞
  • 可讀性差

默認(rèn)轉(zhuǎn)換器

修改消息發(fā)送的代碼,發(fā)送一個Map對象:

@Test
public void testSendMap() throws InterruptedException {
    // 準(zhǔn)備消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "Jack");
    msg.put("age", 21);
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("simple.queue","", msg);
}

停止consumer服務(wù)

發(fā)送消息后查看控制臺:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

配置JSON轉(zhuǎn)換器

顯然,JDK序列化方式并不合適。可以使用JSON方式來做序列化和反序列化,使得消息體的體積更小、可讀性更高。

在publisher和consumer兩個服務(wù)中都引入依賴:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

配置消息轉(zhuǎn)換器。

在啟動類中添加一個Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

消息隊列在使用過程中,面臨著很多實際問題需要思考:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

消息可靠性

消息從發(fā)送,到消費者接收,會經(jīng)理多個過程:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:

  • 發(fā)送時丟失:
    • 生產(chǎn)者發(fā)送的消息未送達(dá)exchange
    • 消息到達(dá)exchange后未到達(dá)queue
  • MQ宕機(jī),queue將消息丟失
  • consumer接收到消息后未消費就宕機(jī)

針對這些問題,RabbitMQ分別給出了解決方案:

  • 生產(chǎn)者確認(rèn)機(jī)制
  • mq持久化
  • 消費者確認(rèn)機(jī)制
  • 失敗重試機(jī)制

生產(chǎn)者消息確認(rèn)

RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個消息指定一個唯一ID。消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否處理成功。

返回結(jié)果有兩種方式:

  • publisher-confirm,發(fā)送者確認(rèn)
    • 消息成功投遞到交換機(jī),返回ack
    • 消息未投遞到交換機(jī),返回nack
  • publisher-return,發(fā)送者回執(zhí)
    • 消息投遞到交換機(jī)了,但是沒有路由到隊列。返回ACK,及路由失敗原因。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

注意:確認(rèn)機(jī)制發(fā)送消息時,需要給每個消息設(shè)置一個全局唯一id,以區(qū)分不同消息,避免ack沖突。

修改配置

首先,修改生產(chǎn)者服務(wù)中的application.yml文件,添加下面的內(nèi)容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
    • simple:同步等待confirm結(jié)果,直到超時
    • correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
  • publish-returns:開啟publish-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback
  • template.mandatory:定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息

定義Return回調(diào)

每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置。

在生產(chǎn)者服務(wù)中添加一個Config配置類

@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 設(shè)置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投遞失敗,記錄日志
            log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有業(yè)務(wù)需要,可以重發(fā)消息
        });
    }
}

定義ConfirmCallback

ConfirmCallback可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同。

在生產(chǎn)者服務(wù)的測試類中,定義一個單元測試方法

public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息體
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失敗
                log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息發(fā)送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.發(fā)送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一會兒,等待ack回執(zhí)
    Thread.sleep(2000);
}

消息持久化

生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。

要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。

  • 交換機(jī)持久化
  • 隊列持久化
  • 消息持久化

必須保證上述三點都持久化,才是真正的消息持久化。

交換機(jī)持久化

RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失。

SpringAMQP中可以通過代碼指定交換機(jī)持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三個參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時是否自動刪除
    return new DirectExchange("simple.direct", true, false);
}

默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。

可以在RabbitMQ控制臺看到持久化的交換機(jī)都會帶上D的標(biāo)示:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

隊列持久化

RabbitMQ中隊列默認(rèn)是非持久化的,mq重啟后就丟失。

SpringAMQP中可以通過代碼指定交換機(jī)持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder構(gòu)建隊列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

默認(rèn)情況下,由SpringAMQP聲明的隊列都是持久化的。

可以在RabbitMQ控制臺看到持久化的隊列都會帶上D的標(biāo)示:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

消息持久化

利用SpringAMQP發(fā)送消息時,可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代碼指定

@Test
public void testDurableMessage(){
    // 創(chuàng)建消息
    Message message = MessageBuilder
        .withBody("hello,TTL Queue".getBytes(StandardCharsets.UTF_8))
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        .build();
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("simple.queue",message,correlationData);
    // 記錄日志
    log.debug("發(fā)送消息成功!");
}

默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。

消費者消息確認(rèn)

RabbitMQ是閱后即焚機(jī)制,即RabbitMQ確認(rèn)消息被消費者消費后會立刻刪除此消息。

而RabbitMQ是通過消費者回執(zhí)來確認(rèn)消費者是否成功處理消息的:消費者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。

如果RabbitMQ投遞消息給消費者,消費者獲取消息后,并返回ACK給RabbitMQ,RabbitMQ刪除消息之后,此時消費者宕機(jī),接收到的消息還未來得及處理。這樣,消息就丟失了,因此消費者返回ACK的時機(jī)相當(dāng)重要。

而SpringAMQP則允許配置三種確認(rèn)模式:

?manual:手動ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack(根據(jù)業(yè)務(wù)情況,自行判斷什么時候該返回ack)。

?auto:自動ack,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack(類似事務(wù)機(jī)制,出現(xiàn)異常時返回nack,消息回滾到mq;沒有異常,返回ack)。

?none:關(guān)閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除(消息投遞是不可靠的,可能丟失)。

配置確認(rèn)模式

修改消費者服務(wù)的application.yml文件,添加下面內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自動ACK

修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個消息處理異常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    log.info("消費者接收到simple.queue的消息:【{}】", msg);
    // 模擬異常
    System.out.println(1 / 0);
    log.debug("消息處理完成!");
}

在異常位置打斷點,再次發(fā)送消息,程序卡在斷點時,可以發(fā)現(xiàn)此時消息狀態(tài)為unack(未確定狀態(tài))

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

拋出異常后,因為Spring會自動返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

消費失敗重試機(jī)制

當(dāng)消費者出現(xiàn)異常后,消息會不斷requeue(重入隊)到隊列,再重新發(fā)送給消費者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

此時有兩種解決辦法:1、本地重試。2、失敗策略。

本地重試

可以利用Spring的retry機(jī)制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列。

修改消費者服務(wù)的application.yml文件,添加內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 # 初識的失敗等待時長為1秒
          multiplier: 1 # 失敗的等待時長倍數(shù),下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟消費者服務(wù),重復(fù)之前的測試??梢园l(fā)現(xiàn):

  • 在重試3次后,SpringAMQP會拋出異常AmqpRejectAndDontRequeueException,說明觸發(fā)本地重試。
  • 查看RabbitMQ控制臺,發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是ack,mq將消息刪除了。

由此可見當(dāng)開啟本地重試時,消息處理過程中拋出異常,不會requeue到隊列,而是在消費者本地重試。重試達(dá)到最大次數(shù)后,Spring會返回ack,消息會被丟棄。

失敗策略

在之前的測試中,達(dá)到最大重試次數(shù)后,消息會被丟棄,這是由Spring內(nèi)部機(jī)制決定的。

在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到一個指定的,專門存放異常消息的隊列,后續(xù)由人工集中處理。

配置失敗策略

  1. 在消費者服務(wù)中定義處理失敗消息的交換機(jī)和隊列
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
  1. 定義一個RepublishMessageRecoverer,關(guān)聯(lián)隊列和交換機(jī)
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代碼

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

由此可以得出,確保RabbitMQ消息的可靠性的措施有:

  1. 開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊列。
  2. 開啟持久化功能,確保消息未消費前在隊列中不會丟失。
  3. 開啟消費者確認(rèn)機(jī)制為auto,由spring確定消息處理成功之后完成ACK。
  4. 開啟消費者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。

死信交換機(jī)

定義

當(dāng)一個隊列中的消息滿足下列情況之一時,可以成為死信(dead letter):

  • 消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數(shù)設(shè)置為false
  • 消息是一個過期消息,超時無人消費
  • 要投遞的隊列消息滿了,無法投遞

如果這個包含死信的隊列配置了dead-letter-exchange屬性,指定了一個交換機(jī),那么隊列中的死信就會投遞到這個交換機(jī)中,而這個交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡稱DLX)。

一個消息被消費者拒絕了,變成了死信:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

因為simple.queue綁定了死信交換機(jī) dl.direct,因此死信會投遞給這個交換機(jī):

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

如果這個死信交換機(jī)也綁定了一個隊列,則消息最終會進(jìn)入這個存放死信的隊列:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

另外,隊列將死信投遞給死信交換機(jī)時,必須知道兩個信息:

  • 死信交換機(jī)名稱
  • 死信交換機(jī)與死信隊列綁定的RoutingKey

這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊列。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

利用死信交換機(jī)接收死信

在失敗重試策略中,默認(rèn)的RejectAndDontRequeueRecoverer會在本地重試次數(shù)耗盡后,發(fā)送reject給RabbitMQ,消息變成死信,被丟棄。

可以給simple.queue添加一個死信交換機(jī),給死信交換機(jī)綁定一個隊列。這樣消息變成死信后也不會丟棄,而是最終投遞到死信交換機(jī),路由到與死信交換機(jī)綁定的隊列。

在消費者服務(wù)中,定義一組死信交換機(jī)、死信隊列

// 聲明普通的 simple.queue隊列,并且為其指定死信交換機(jī):dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定隊列名稱,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交換機(jī)
        .build();
}
// 聲明死信交換機(jī) dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 聲明存儲死信的隊列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 將死信隊列 與 死信交換機(jī)綁定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

使用場景:

  1. 隊列綁定了死信交換機(jī),死信會投遞到死信交換機(jī)。
  2. 利用死信交換機(jī)收集所有消費者處理失敗的消息(死信),交由人工處理,進(jìn)一步提高消息隊列的可靠性。

TTL

一個隊列中的消息如果超時未消費,則會變?yōu)樗佬?,超時分為兩種情況:

  • 消息所在的隊列設(shè)置了超時時間
  • 消息本身設(shè)置了超時時間

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

聲明隊列時指定TTL

在消費者服務(wù)的SpringRabbitListener中,定義一個新的消費者,并且聲明死信交換機(jī)、死信隊列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);
}

要給隊列設(shè)置超時時間,需要在聲明隊列時配置x-message-ttl屬性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定隊列名稱,并持久化
        .ttl(10000) // 設(shè)置隊列的超時時間,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交換機(jī)
        .build();
}

注意,這個隊列設(shè)定了死信交換機(jī)為dl.ttl.direct

聲明交換機(jī),將TTL與交換機(jī)綁定:

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

發(fā)送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {
    // 創(chuàng)建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 記錄日志
    log.debug("發(fā)送消息成功");
}

發(fā)送消息時指定TTL

在發(fā)送消息時,也可以指定TTL:

@Test
public void testTTLMsg() {
    // 創(chuàng)建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("10000")
        .build();
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("發(fā)送消息成功");
}

延遲隊列

利用TTL結(jié)合死信交換機(jī),我們實現(xiàn)了消息發(fā)出后,消費者延遲收到消息的效果。這種消息模式就稱為延遲隊列(Delay Queue)模式。

延遲隊列的使用場景包括:

  • 用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動取消
  • 預(yù)約工作會議,10分鐘后自動通知所有參會人員

因為延遲隊列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲隊列效果。

這個插件就是DelayExchange插件。

參考RabbitMQ的插件列表頁面:https://www.rabbitmq.com/community-plugins.html

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

使用方式可以參考官網(wǎng)地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

安裝DelayExchange插件

可以去對應(yīng)的GitHub頁面下載3.8.9版本的插件,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

這個對應(yīng)RabbitMQ的3.8.5以上版本。

上傳插件

因為是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應(yīng)的數(shù)據(jù)卷。但是前面安裝MQ時并未掛載數(shù)據(jù)卷,所以這里重新創(chuàng)建一個MQ容器。

先移除之前創(chuàng)建的容器

docker rm -f mq

創(chuàng)建MQ容器

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

使用下面命令查看數(shù)據(jù)卷

docker volume inspect mq-plugins

可以得知掛載的位置在/var/lib/docker/volumes/mq-plugins/_data

將下載好的插件上傳至該目錄即可。

安裝插件

安裝需要進(jìn)入MQ容器內(nèi)部執(zhí)行安裝,執(zhí)行以下命令

docker exec -it mq bash

進(jìn)入容器內(nèi)部后,執(zhí)行下面命令開啟插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安裝成功結(jié)果如下:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

DelayExchange原理

DelayExchange需要將一個交換機(jī)聲明為delayed類型。當(dāng)有消息發(fā)送到delayExchange時,流程如下:

  1. 接收消息

  2. 判斷消息是否具備x-delay屬性

  3. 如果有x-delay屬性,說明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時間

  4. 返回routing not found結(jié)果給消息發(fā)送者

  5. x-delay時間到期后,重新投遞消息到指定隊列

使用DelayExchange

插件的使用也非常簡單:聲明一個交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊列與其綁定即可。

聲明DelayExchange交換機(jī)

基于注解方式(推薦)

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.direct"),
    key = "dl" ))public void listenDlQueue(String msg){
    log.info("接收到 dl.queue的延遲消息:{}", msg);
}

基于@Bean方式

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定隊列名稱,并持久化 
        .ttl(10000) // 設(shè)置隊列的超時時間,10秒
        .deadLetterExchange("dl.direct") // 指定死信交換機(jī)
            .deadLetterRoutingKey("dl") // 指定死信RoutingKey
        .build();
}
@Bean
public Binding simpleBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

發(fā)送消息時,一定要攜帶x-delay屬性,并指定延遲的時間。

@Test
public void testTTLMsg() {
    // 創(chuàng)建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000") 
        .build();
    // 消息ID,需要封裝到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
}

惰性隊列

消息堆積問題

當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費者處理消息的速度,就會導(dǎo)致隊列中的消息堆積,直到隊列存儲消息達(dá)到上限。之后發(fā)送的消息就會成為死信,可能會被丟棄,這就是消息堆積問題。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

解決消息堆積有兩種思路:

  • 增加更多消費者,提高消費速度。也就是我們之前說的work queue模式
  • 擴(kuò)大隊列容積,提高堆積上限

惰性隊列

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊列。惰性隊列的特征如下:

  • 接收到消息后直接存入磁盤而非內(nèi)存
  • 消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存
  • 支持?jǐn)?shù)百萬條的消息存儲

基于命令行設(shè)置lazy-queue

要設(shè)置一個隊列為惰性隊列,只需要在聲明隊列時,指定x-queue-mode屬性為lazy即可??梢酝ㄟ^命令行將一個運行中的隊列修改為惰性隊列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達(dá)式匹配隊列的名字
  • '{"queue-mode":"lazy"}' :設(shè)置隊列模式為lazy模式
  • --apply-to queues :策略的作用對象,是所有的隊列

基于@Bean聲明lazy-queue

@Bean
public Queue lazyQueue(){
    return QueueBuilder
        .durable("lazy.queue")
        .lazy() // 開啟x-queue-mode為lazy
        .build();
}

基于@RabbitListener聲明LazyQueue

@RabbitListener(queuesToDeclare = @Queue(
	name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String message){
    log.info("接收到 lazy.queue 的消息:{}",message);
}

惰性隊列的優(yōu)點:

  • 基于磁盤存儲,消息上限高
  • 沒有間歇性的page-out,性能比較穩(wěn)定

惰性隊列的缺點:

  • 基于磁盤存儲,消息時效性會降低
  • 性能受限于磁盤的IO

MQ集群

集群的分類

RabbitMQ的是基于Erlang語言編寫,而Erlang又是一個面向并發(fā)的語言,天然支持集群模式。RabbitMQ的集群有兩種模式:

  • 普通集群:是一種分布式集群,將隊列分散到集群的各個節(jié)點,從而提高整個集群的并發(fā)能力。

  • 鏡像集群:是一種主從集群,普通集群的基礎(chǔ)上,添加了主從備份功能,提高集群的數(shù)據(jù)可用性。

鏡像集群雖然支持主從,但主從同步并不是強(qiáng)一致的,某些情況下可能有數(shù)據(jù)丟失的風(fēng)險。因此在RabbitMQ的3.8版本以后,推出了新的功能:

  • 仲裁隊列來代替鏡像集群,底層采用Raft協(xié)議確保主從的數(shù)據(jù)一致性。

普通集群

普通集群,或者叫標(biāo)準(zhǔn)集群(classic cluster),具備下列特征:

  • 會在集群的各個節(jié)點間共享部分?jǐn)?shù)據(jù),包括:交換機(jī)、隊列元信息。不包含隊列中的消息。
  • 當(dāng)訪問集群某節(jié)點時,如果隊列不在該節(jié)點,會從數(shù)據(jù)所在節(jié)點傳遞到當(dāng)前節(jié)點并返回
  • 隊列所在節(jié)點宕機(jī),隊列中的消息就會丟失

結(jié)構(gòu)如下

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

部署

計劃部署三節(jié)點的mq集群:

主機(jī)名 控制臺端口 amqp通信端口
mq1 8081 —> 15672 8071 —> 5672
mq2 8082 —> 15672 8072 —> 5672
mq3 8083 —> 15672 8073 —> 5672

集群中的節(jié)點標(biāo)示默認(rèn)都是:rabbit@[hostname],因此以上三個節(jié)點的名稱分別為:

  • rabbit@mq1
  • rabbit@mq2
  • rabbit@mq3
獲取Cookie

RabbitMQ底層依賴于Erlang,而Erlang虛擬機(jī)就是一個面向分布式的語言,默認(rèn)就支持集群模式。集群模式中的每個RabbitMQ 節(jié)點使用 cookie 來確定它們是否被允許相互通信。

要使兩個節(jié)點能夠通信,它們必須具有相同的共享秘密,稱為Erlang cookie。cookie 只是一串最多 255 個字符的字母數(shù)字字符。

每個集群節(jié)點必須具有相同的 cookie。實例之間也需要它來相互通信。

先在之前啟動的mq容器中獲取一個cookie值,作為集群的cookie。執(zhí)行下面的命令

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

記錄保存獲取到的Cookie值,以備后用。

接下來,停止并刪除當(dāng)前的mq容器,重新搭建集群。

docker rm -f mq
準(zhǔn)備集群配置

在/tmp目錄新建一個配置文件 rabbitmq.conf:

cd /tmp
# 創(chuàng)建文件
touch rabbitmq.conf

文件內(nèi)容如下:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

再創(chuàng)建一個文件,記錄cookie

cd /tmp
# 創(chuàng)建cookie文件
touch .erlang.cookie
# 寫入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的權(quán)限
chmod 600 .erlang.cookie

準(zhǔn)備三個目錄,mq1、mq2、mq3:

cd /tmp
# 創(chuàng)建目錄
mkdir mq1 mq2 mq3

然后拷貝rabbitmq.conf、cookie文件到mq1、mq2、mq3:

# 進(jìn)入/tmp
cd /tmp
# 拷貝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
啟動集群

創(chuàng)建網(wǎng)絡(luò)

docker network create mq-net

運行命令

# 運行mq1
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management

# 運行mq2
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management

# 運行mq3
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

到此,一個普通的集群就部署完畢。

鏡像集群

在剛剛的案例中,一旦創(chuàng)建隊列的主機(jī)宕機(jī),隊列就會不可用。不具備高可用能力。如果要解決這個問題,必須使用官方提供的鏡像集群方案。

官方文檔地址:https://www.rabbitmq.com/ha.html

鏡像集群:本質(zhì)是主從模式,具備下面的特征。

  • 交換機(jī)、隊列、隊列中的消息會在各個mq的鏡像節(jié)點之間同步備份。
  • 創(chuàng)建隊列的節(jié)點被稱為該隊列的主節(jié)點,備份到的其它節(jié)點叫做該隊列的鏡像節(jié)點。
  • 一個隊列的主節(jié)點可能是另一個隊列的鏡像節(jié)點
  • 所有操作都是主節(jié)點完成,然后同步給鏡像節(jié)點
  • 主宕機(jī)后,鏡像節(jié)點會替代成新的主(如果在主從同步完成前,主就已經(jīng)宕機(jī),可能出現(xiàn)數(shù)據(jù)丟失)
  • 不具備負(fù)載均衡功能,因為所有操作都會有主節(jié)點完成(但是不同隊列,其主節(jié)點可以不同,可以利用這個提高吞吐量)

結(jié)構(gòu)如下

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

默認(rèn)情況下,隊列只保存在創(chuàng)建該隊列的節(jié)點上。而鏡像模式下,創(chuàng)建隊列的節(jié)點被稱為該隊列的主節(jié)點,隊列還會拷貝到集群中的其它節(jié)點,也叫做該隊列的鏡像節(jié)點。

但是,不同隊列可以在集群中的任意節(jié)點上創(chuàng)建,因此不同隊列的主節(jié)點可以不同。甚至,一個隊列的主節(jié)點可能是另一個隊列的鏡像節(jié)點。

用戶發(fā)送給隊列的一切請求,例如發(fā)送消息、消息回執(zhí)默認(rèn)都會在主節(jié)點完成,如果是從節(jié)點接收到請求,也會路由到主節(jié)點去完成。鏡像節(jié)點僅僅起到備份數(shù)據(jù)作用。

當(dāng)主節(jié)點接收到消費者的ACK時,所有鏡像都會刪除節(jié)點中的數(shù)據(jù)。

部署

鏡像模式的配置有3種模式:

hamode haparams 效果
準(zhǔn)確模式exactly 隊列的副本量count 集群中隊列副本(主服務(wù)器和鏡像服務(wù)器之和)的數(shù)量。count如果為1意味著單個副本:即隊列主節(jié)點。count值為2表示2個副本:1個隊列主和1個隊列鏡像。換句話說:count = 鏡像數(shù)量 + 1。如果群集中的節(jié)點數(shù)少于count,則該隊列將鏡像到所有節(jié)點。如果有集群總數(shù)大于count+1,并且包含鏡像的節(jié)點出現(xiàn)故障,則將在另一個節(jié)點上創(chuàng)建一個新的鏡像。
all (none) 隊列在群集中的所有節(jié)點之間進(jìn)行鏡像。隊列將鏡像到任何新加入的節(jié)點。鏡像到所有節(jié)點將對所有群集節(jié)點施加額外的壓力,包括網(wǎng)絡(luò)I / O,磁盤I / O和磁盤空間使用情況。推薦使用exactly,設(shè)置副本數(shù)為(N / 2 +1)。
nodes node names 指定隊列創(chuàng)建到哪些節(jié)點,如果指定的節(jié)點全部不存在,則會出現(xiàn)異常。如果指定的節(jié)點在集群中存在,但是暫時不可用,會創(chuàng)建節(jié)點到當(dāng)前客戶端連接到的節(jié)點。
exactly模式

進(jìn)入任意容器

docker exec -it mq1 bash

執(zhí)行命令

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定寫法
  • ha-two:策略名稱,自定義
  • "^two\.":匹配隊列的正則表達(dá)式,符合命名規(guī)則的隊列才生效,這里是任何以two.開頭的隊列名稱
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略內(nèi)容
    • "ha-mode":"exactly":策略模式,此處是exactly模式,指定副本數(shù)量
    • "ha-params":2:策略參數(shù),這里是2,就是副本數(shù)量為2,1主1鏡像。
    • "ha-sync-mode":"automatic":同步策略,默認(rèn)是manual,即新加入的鏡像節(jié)點不會同步舊的消息。如果設(shè)置為automatic,則新加入的鏡像節(jié)點會把主節(jié)點中所有消息都同步,會帶來額外的網(wǎng)絡(luò)開銷。
all模式

進(jìn)入任意容器

docker exec -it mq2 bash

執(zhí)行命令

rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名稱,自定義
  • "^all\.":匹配所有以all.開頭的隊列名
  • '{"ha-mode":"all"}':策略內(nèi)容
    • "ha-mode":"all":策略模式,此處是all模式,即所有節(jié)點都會稱為鏡像節(jié)點
nodes模式

進(jìn)入容器

docker exec -it mq3 bash

執(zhí)行命令

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy`:固定寫法
  • ha-nodes:策略名稱,自定義
  • "^nodes\.":匹配隊列的正則表達(dá)式,符合命名規(guī)則的隊列才生效,這里是任何以nodes.開頭的隊列名稱
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略內(nèi)容
    • "ha-mode":"nodes":策略模式,此處是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略參數(shù),這里指定副本所在節(jié)點名稱

仲裁隊列

從RabbitMQ 3.8版本開始,引入了新的仲裁隊列,他具備與鏡像隊里類似的功能,但使用更加方便。

添加仲裁隊列

控制臺添加

在任意控制臺添加一個隊列,一定要選擇隊列類型為Quorum類型。

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

在任意控制臺查看隊列:

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

可以看到,仲裁隊列的 + 2 字樣。代表這個隊列有2個鏡像節(jié)點。

因為仲裁隊列默認(rèn)的鏡像數(shù)為5。如果你的集群有7個節(jié)點,那么鏡像數(shù)肯定是5;而我們集群只有3個節(jié)點,因此鏡像數(shù)量就是3。

SpringAMQP創(chuàng)建

@Configuration
public class QuorumConfig{
    @Bean
    public Queue quorumQueue() {
        return QueueBuilder
            .durable("quorum.queue") // 持久化 
            .quorum() // 仲裁隊列
            .build();
    }
}

SpringAMQP連接集群,只需要在yaml中配置即可

spring:
	rabbitmq:
		addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
		username: admin 
		password: 123456
		virtual-host: /

集群擴(kuò)容

啟動一個新的MQ容器

docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

進(jìn)入容器控制臺

docker exec -it mq4 bash

停止mq進(jìn)程

rabbitmqctl stop_app

重置RabbitMQ中的數(shù)據(jù):

rabbitmqctl reset

加入mq1

rabbitmqctl join_cluster rabbit@mq1

再次啟動mq進(jìn)程

rabbitmqctl start_app

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

增加仲裁隊列副本

先查看下quorum.queue這個隊列目前的副本情況,進(jìn)入mq1容器:

docker exec -it mq1 bash

執(zhí)行命令:

rabbitmq-queues quorum_status "quorum.queue"

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

現(xiàn)在讓mq4也加入進(jìn)來

rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

再次查看副本情況

rabbitmq-queues quorum_status "quorum.queue"

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)

查看控制臺,發(fā)現(xiàn)quorum.queue的鏡像數(shù)量也從原來的 +2 變成了 +3

服務(wù)異步通訊——RabbitMQ,微服務(wù),rabbitmq,運維,linux,微服務(wù)文章來源地址http://www.zghlxwxcb.cn/news/detail-799812.html

到了這里,關(guān)于服務(wù)異步通訊——RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器)

    RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器)

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實時響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個人同時通話。發(fā)送郵件可以同時與多個人收發(fā)郵件,但是往往響應(yīng)會有延遲。 1.

    2024年02月11日
    瀏覽(16)
  • RabbitMQ服務(wù)異步通信-高級篇

    RabbitMQ服務(wù)異步通信-高級篇

    提出問題:消息投遞過程中,生產(chǎn)者—— MQ —— 消費者 中間會出現(xiàn)消息丟失問題,導(dǎo)致信息沒有及時同步 先梳理一下流程 生產(chǎn)者生產(chǎn)個消息 —— 建立連接——通道傳遞進(jìn)mq交換機(jī)——交換機(jī)傳給隊列——消費者拉取數(shù)據(jù)消費 1.生產(chǎn)者生產(chǎn)完消息,相當(dāng)于寫好代碼,寫錯了

    2024年03月24日
    瀏覽(17)
  • 服務(wù)異步通信-高級篇(RabbitMQ)

    服務(wù)異步通信-高級篇(RabbitMQ)

    每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費就宕機(jī) RabbitMQ分別給出了解決方案: 生產(chǎn)者發(fā)送確認(rèn)機(jī)制 mq持久化 消費者消費確認(rèn)機(jī)制 失敗重

    2024年02月02日
    瀏覽(22)
  • 服務(wù)器的異步通信——RabbitMQ

    服務(wù)器的異步通信——RabbitMQ

    目錄 一、同步通信 VS 異步通信 二、MQ——消息隊列 RabbitMQ? RabbitMQ安裝? RabbitMQ的整體架構(gòu) 常見消息模型? ?基本消息隊列(BasicQueue) 工作消息隊列(WorkQueue) ?發(fā)布、訂閱(Publish、Subscribe) ?Fanout Exchange Direct Exchange? Topic Exchange? SpringAMQP-消息轉(zhuǎn)換器? 同步通信 :雙方在

    2024年01月24日
    瀏覽(21)
  • ElasticSearch - 在 微服務(wù)項目 中基于 RabbitMQ 實現(xiàn) ES 和 MySQL 數(shù)據(jù)異步同步(考點)

    ElasticSearch - 在 微服務(wù)項目 中基于 RabbitMQ 實現(xiàn) ES 和 MySQL 數(shù)據(jù)異步同步(考點)

    目錄 一、數(shù)據(jù)同步 1.1、什么是數(shù)據(jù)同步 1.2、解決數(shù)據(jù)同步面臨的問題 1.3、解決辦法 1.3.1、同步調(diào)用 1.3.2、異步通知(推薦) 1.3.3、監(jiān)聽 binlog 1.3、基于 RabbitMQ 實現(xiàn)數(shù)據(jù)同步 1.3.1、需求 1.3.2、在“酒店搜索服務(wù)”中 聲明 exchange、queue、routingKey,同時開啟監(jiān)聽 1.3.3、在“酒店

    2024年02月08日
    瀏覽(31)
  • 【RabbitMQ】Linux系統(tǒng)服務(wù)器安裝RabbitMQ

    【RabbitMQ】Linux系統(tǒng)服務(wù)器安裝RabbitMQ

    首先應(yīng)該下載erlang,rabbitmq運行需要有erland環(huán)境。 官網(wǎng)地址:https://www.erlang.org/downloads 下載rabbitmq 官網(wǎng)環(huán)境:https://www.rabbitmq.com/download.html 注意:el7對應(yīng)centos7,el8對應(yīng)centos8,centos7用erlang23版本或者23以下版本,centos8用erlang24版本。 博主的系統(tǒng)是centos 7的所以下載的是el7的 1、

    2024年02月14日
    瀏覽(22)
  • 【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_消息可靠性_死信交換機(jī)_惰性隊列_MQ集群

    【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_消息可靠性_死信交換機(jī)_惰性隊列_MQ集群

    消息隊列在使用過程中,面臨著很多實際問題需要思考: 消息從發(fā)送,到消費者接收,會經(jīng)歷多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收

    2024年02月11日
    瀏覽(99)
  • Linux Ubuntu安裝RabbitMQ服務(wù)

    Linux Ubuntu安裝RabbitMQ服務(wù)

    RabbitMQ是一個在 AMQP(高級消息隊列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是當(dāng)前最主流的消息中間件之一。 由erlang開發(fā)的AMQP(Advanced Message Queue 高級消息隊列協(xié)議 )的開源實現(xiàn),由于erlang 語言的高并發(fā)特性,性能較好,本質(zhì)是個隊列,F(xiàn)IFO 先入先出,里面存放的內(nèi)容

    2024年02月16日
    瀏覽(23)
  • 【RabbitMQ】集群和運維

    【RabbitMQ】集群和運維

    對于無狀態(tài)應(yīng)用(如普通的微服務(wù))很容易實現(xiàn)負(fù)載均衡、高可用集群。而對于有狀態(tài)的系統(tǒng)(如數(shù)據(jù)庫等)就比較復(fù)雜。 1. 業(yè)界實踐: 主備模式:單活,容量對等,可以實現(xiàn)故障轉(zhuǎn)移。使用獨立存儲時需要借助復(fù)制、鏡像同步等技術(shù),數(shù)據(jù)會有延遲、不一致等問題(CAP定

    2024年01月18日
    瀏覽(22)
  • RabbitMQ 運維 & 擴(kuò)展

    RabbitMQ 運維 & 擴(kuò)展

    ????????關(guān)于Rabbitmq 集群的搭建,詳見以下文章。簡單說來就是將多個單機(jī)rabbitmq服務(wù),通過給到一致的密鑰(.erlang.cookie)并且開放rabbitmq服務(wù)的 25672 端口,允許多節(jié)點間進(jìn)行互相通訊,就完成了集群的搭建。 Rabbitmq安裝_沿途欣賞i的博客-CSDN博客 Linux下源碼安裝RabbitMQ并

    2024年02月05日
    瀏覽(13)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包