国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ - 基于 SpringAMQP 帶你實(shí)現(xiàn)五種消息隊(duì)列模型

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ - 基于 SpringAMQP 帶你實(shí)現(xiàn)五種消息隊(duì)列模型。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

一、SpringAMQP

1.1、概念

1.2、前置知識(shí)(實(shí)現(xiàn)案例前必看?。?/p>

1.2.1、創(chuàng)建隊(duì)列

1.2.2、創(chuàng)建交換機(jī)

1.2.3、創(chuàng)建綁定

1.2.4、@RabbitListener 注解

a)情況一:queue 存在

b)情況二:queue 不存在?

1.2.5、為什么更建議使用 @Bean 注解創(chuàng)建,而不是 @RabbitListener 注解創(chuàng)建?

1.3、案例實(shí)現(xiàn)

1.3.1、SpringAMQP 實(shí)現(xiàn)基礎(chǔ)消息隊(duì)列(BasicQueue)

1.3.2、SpringAMQP 實(shí)現(xiàn)工作消息隊(duì)列((Work Queue)

1.3.3、SpringAMQP 實(shí)現(xiàn)廣播交換機(jī)消息隊(duì)列(Fanout Exchange)

1.3.4、Spring AMQP 實(shí)現(xiàn)路由交換機(jī)消息隊(duì)列(Direct Exchange)

1.3.5、SpringAMQP 實(shí)現(xiàn)主題交換機(jī)消息模型(Topic Exchange)


一、SpringAMQP


1.1、概念

什么是 AMQP?

全稱?Advanced Message Queuing Protocol,是用于在應(yīng)用程序之間傳遞業(yè)務(wù)消息的開放標(biāo)準(zhǔn)。該協(xié)議與語言和平臺(tái)無關(guān),更符合微服務(wù)中獨(dú)立性的要求。

什么是 SpringAMQP?

Spring AMQP是基于AMQP協(xié)議定義的一套API規(guī)范,提供了模板用來發(fā)送和接收消息。包含兩部分,其中spring-amqp是基礎(chǔ)抽象,spring-rabbit是底層的默認(rèn)實(shí)現(xiàn)。

1.2、前置知識(shí)(實(shí)現(xiàn)案例前必看?。?/h3>

1.2.1、創(chuàng)建隊(duì)列

實(shí)際開發(fā)中,有兩種方式創(chuàng)建交換機(jī)和隊(duì)列,其中 Bean 方式用的最多,也是最推薦的,還有一種最后講.

隊(duì)列的創(chuàng)建,源碼中提供了如下幾種方法:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

  • name:隊(duì)列名.
  • durable:是否持久化.
  • exclusive:是否獨(dú)占.??true表示隊(duì)列只能被一個(gè)消費(fèi)者使用, false表示大家都能用.
  • autoDelete:自動(dòng)刪除,為 true 表示沒有人使用以后自動(dòng)刪除.
  • arguments:擴(kuò)展參數(shù),自定義選項(xiàng),例如實(shí)現(xiàn),隊(duì)列過期、隊(duì)列消息過期、死信隊(duì)列...

例如,創(chuàng)建名為 simple.queue 的隊(duì)列,并且支持持久化:在 Configuration 層中使用 @Bean 注入隊(duì)列即可(@Configuration 注解不能少).

?
@Configuration
public class MqConfig {

    /**
     * 創(chuàng)建隊(duì)列
     * @return
     */
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple.queue", true);
    }

}

?

Ps:建議將來開發(fā)的時(shí)候,將設(shè)置隊(duì)列名、交換機(jī)名都寫成固定的常量(static final)

1.2.2、創(chuàng)建交換機(jī)

a)直接交換機(jī)和扇出交換機(jī)

直接交換機(jī)的創(chuàng)建,源碼中提供了如下幾種方法:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

扇出交換機(jī)和主題交換機(jī)也是如此:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

  • name:交換機(jī)名.
  • durable:是否持久化.
  • autoDelete:自動(dòng)刪除,若當(dāng)前交換機(jī)沒人使用了,就會(huì)自動(dòng)刪除.
  • arguments:表示創(chuàng)建交換機(jī)時(shí)可以指定一些額外的選項(xiàng),例如延時(shí)消息、過期時(shí)間、死信交換機(jī).

例如,創(chuàng)建名為? direct 的交換機(jī),支持持久化和自動(dòng)刪除.

@Configuration
public class MqConfig {

    /**
     *  創(chuàng)建直接交換機(jī)
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct", true, true);
    }

}

1.2.3、創(chuàng)建綁定

通過 BindingBuilder 對(duì)象就可以將交換機(jī)和隊(duì)列綁定,綁定方式是 BindingBuilder.bind(調(diào)用創(chuàng)建隊(duì)列方法).to(調(diào)用創(chuàng)建交換機(jī)方法).with(bindingKey).

@Configuration
public class MqConfig {

    /**
     * 創(chuàng)建隊(duì)列
     * @return
     */
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple.queue", true);
    }

    /**
     *  創(chuàng)建直接交換機(jī)
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct", true, true);
    }

    /**
     * 創(chuàng)建綁定
     * @return
     */
    @Bean
    public Binding simpleBinding() {
        return BindingBuilder.bind(simpleQueue()).to(directExchange()).with("simple");
    }


}

Ps:如果是 fanout 扇出交換機(jī)則不用通過 with 指定 bindingKey.

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

1.2.4、@RabbitListener 注解

a)情況一:queue 存在

如果 queue 隊(duì)列存在,以下使用就可以通過 @RabbitListener 注解監(jiān)聽隊(duì)列消息,成為消費(fèi)者.

用法:直接指定注解中的queues參數(shù)即可,參數(shù)值為對(duì)列名(queueName)

例如如下

    @RabbitListener(queues = "simple.queue")
    public void SimpleConsumer(String msg) {
        System.out.println("消費(fèi)者收到消息: " + msg);
    }

Ps:當(dāng) queue 的 autoDelete 屬性為 false 時(shí),這種使用情況還是比較合適了;但是,當(dāng)這個(gè)屬性為 true 時(shí),沒有消費(fèi)者隊(duì)列就會(huì)自動(dòng)刪除了,可能會(huì)引發(fā)異常!

b)情況二:queue 不存在?

如果隊(duì)列存在,以下使用就可以通過 @RabbitListener 注解創(chuàng)建交換機(jī)、隊(duì)列、綁定關(guān)系.

例如,

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("queue1"),
            exchange = @Exchange(name = "exchange1", type = ExchangeTypes.TOPIC),
            key = "test"
    ))
    public void testListener(String msg) {
        System.out.println("消費(fèi)者收到消息: " + msg);
    }
  • value: @Queue 注解,用于聲明隊(duì)列,value 為 queueName, durable 表示隊(duì)列是否持久化, autoDelete 表示沒有消費(fèi)者之后隊(duì)列是否自動(dòng)刪除
  • exchange: @Exchange 注解,用于聲明 exchange, type 就是交換機(jī)類型.
  • key: 在 topic 方式下,這個(gè)就是我們熟知的 routingKey

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

1.2.5、為什么更建議使用 @Bean 注解創(chuàng)建,而不是 @RabbitListener 注解創(chuàng)建?

建議使用 @Bean 而不是@RabbitListener的原因如下:

  1. 使用 @Bean 創(chuàng)建相比于?@RabbitListener 實(shí)現(xiàn)了配置和業(yè)務(wù)上的解耦合(隊(duì)列、交換機(jī)、綁定的創(chuàng)建 和 業(yè)務(wù)代碼 解耦合).
  2. @Bean可以更好地控制RabbitMQ監(jiān)聽器的創(chuàng)建和銷毀,而@RabbitListener則需要在運(yùn)行時(shí)動(dòng)態(tài)創(chuàng)建和銷毀監(jiān)聽器,這可能會(huì)導(dǎo)致一些性能問題和內(nèi)存泄漏問題。
  3. 使用@Bean還可以更好地管理RabbitMQ連接和通道,從而提高應(yīng)用程序的可靠性和性能。

1.3、案例實(shí)現(xiàn)

1.3.1、SpringAMQP 實(shí)現(xiàn)基礎(chǔ)消息隊(duì)列(BasicQueue)

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

在mq上創(chuàng)建隊(duì)列:

Configuration 這一層創(chuàng)建隊(duì)列,否則不生效.

@Configuration
public class MqConfig {

    /**
     * 創(chuàng)建隊(duì)列
     * @return
     */
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple.queue", true);
    }

}

.?

服務(wù)生產(chǎn)者?publisher:

a)由于生產(chǎn)者和消費(fèi)者服務(wù)都需要 amqp 依賴,因此這里直接將依賴放到父工程中

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

b)在 publisher (消息發(fā)送)服務(wù)中編寫application.yml,添加mq連接信息:

spring:
  rabbitmq:
    host: 193.168.150.185 # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī) 
    username: root # 用戶名
    password: 1234 # 密碼

Ps:云服務(wù)器記得放行 5672 端口?。?!

c)在publisher服務(wù)中新建一個(gè)測(cè)試類,編寫測(cè)試方法:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessageSimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }

}

以上可得知 SpringAMQP 發(fā)送消息需要實(shí)現(xiàn)以下幾點(diǎn):?

  1. 引入 amqp 的 starter 依賴;
  2. 配置 RabbitMQ 地址;
  3. 利用 RabbitTemplate 的 convertAndSend 方法來指定隊(duì)列發(fā)送消息。

服務(wù)消費(fèi)者 consumer:

a)在consumer服務(wù)中編寫application.yml,添加mq連接信息:

spring:
  rabbitmq:
    host: 193.168.150.185 # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī) 
    username: root # 用戶名
    password: 1234 # 密碼

b)在consumer服務(wù)中新建一個(gè)類,編寫消費(fèi)邏輯:

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void ListenQueue(String msg) {
        System.out.println("消費(fèi)者接收到 simple.queue 的消息:" + msg);
    }

}

以上可得知 SpringAMQP 接收消息需要實(shí)現(xiàn)以下幾點(diǎn):?

  1. 引入amqp的starter依賴
  2. 配置RabbitMQ地址
  3. 定義類,添加@Component注解
  4. 類中聲明方法,添加@RabbitListener注解,方法參數(shù)就時(shí)消息

Ps:消息一旦消費(fèi)就會(huì)從隊(duì)列刪除,RabbitMQ沒有消息回溯功能

最終執(zhí)行效果如下:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

1.3.2、SpringAMQP 實(shí)現(xiàn)工作消息隊(duì)列((Work Queue)

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

Work queue工作隊(duì)列,可以提高消息處理速度,避免隊(duì)列消息堆積;

遇到的問題:當(dāng)消息進(jìn)入 queue 時(shí),消費(fèi)者1 和 消費(fèi)者2 都會(huì)先做一個(gè)操作叫做“消息預(yù)取”,也就是先取消息,但不執(zhí)行操作,等從 queue 中拿完(如果不設(shè)置,默認(rèn)是無限次拿)了以后再執(zhí)行具體的消費(fèi)操作,但這樣會(huì)導(dǎo)致一個(gè)問題——當(dāng) 消費(fèi)者1 的消費(fèi)處理速度是?消費(fèi)者2 處理速度的兩倍時(shí),但是由于“消息預(yù)取”,他們兩都會(huì)取到相等的消息數(shù)量,因此就會(huì)導(dǎo)致 消費(fèi)者1 提前處理完,消費(fèi)者2 還有很多沒有處理的情況,降低了總體處理的效率。

解決辦法:為了解決這種問題的發(fā)生,也可以通過修改消息預(yù)取的數(shù)量,設(shè)置為 1 ,就是拿一個(gè)處理完了以后再取下一個(gè),就可以避免以上問題(設(shè)置為1,就是根據(jù)自身處理能力,拿去相應(yīng)的數(shù)據(jù))。

具體實(shí)現(xiàn)如下

a)在 Configuration 中創(chuàng)建好隊(duì)列

@Configuration
public class MqConfig {

    /**
     * 創(chuàng)建隊(duì)列
     * @return
     */
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple.queue", true);
    }

}

b)在?publisher?服務(wù)中添加一個(gè)測(cè)試方法,循環(huán)發(fā)送?50?條消息到?simple.queue?隊(duì)列(控制時(shí)間為1秒內(nèi)發(fā)送完)

    @Test
    public void testSendMessageWorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, workQueue!";
        for(int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }

c)在consumer服務(wù)中創(chuàng)建兩個(gè)消費(fèi)者,也監(jiān)聽simple.queue:

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void ListenQueue(String msg) throws InterruptedException {
        System.err.println("消費(fèi)者接收到 simple.queue 的消息:" + msg + LocalDate.now());
        Thread.sleep(100);
    }

    @RabbitListener(queues = "simple.queue")
    public void ListenQueue2(String msg) throws InterruptedException {
        System.out.println("消費(fèi)者接收到 simple.queue 的消息:" + msg + LocalDate.now());
        Thread.sleep(25);
    }

}

d)消費(fèi)預(yù)取限制:修改application.yml文件,設(shè)置preFetch這個(gè)值,可以控制預(yù)取消息的上限

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 170.193.116.135
    port: 5672
    username: root
    password: 1234
    virtual-host: /
    listener:
      simple:
        prefetch: 1

執(zhí)行結(jié)果如下:?

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

1.3.3、SpringAMQP 實(shí)現(xiàn)廣播交換機(jī)消息隊(duì)列(Fanout Exchange)

發(fā)布訂閱模式與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個(gè)消費(fèi)者。實(shí)現(xiàn)方式是加入了exchange(交換機(jī))。

常見exchange類型包括:

1. Fanout:廣播
2. Direct:路由
3. Topic:話題

這里我們先來理解一下廣播消息隊(duì)列模型~

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

Fanout Exchange:會(huì)將接收到的消息廣播到每一個(gè)跟其綁定的queue,最后交給對(duì)應(yīng)的消費(fèi)者,值得注意的是,exchange負(fù)責(zé)消息路由,而不是存儲(chǔ),路由失敗則消息丟失。

具體實(shí)現(xiàn)如下:

a)在consumer服務(wù)創(chuàng)建一個(gè)類,添加@Configuration注解,創(chuàng)建隊(duì)列和交換機(jī),并聲明FanoutExchange、Queue和綁定關(guān)系對(duì)象?Binding ,綁定方式是 BindingBuilder.bind(隊(duì)列).to(交換機(jī))

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    //itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("itcast.fanout");
    }

    //itcast.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    //綁定隊(duì)列 1 到交換機(jī)
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    //itcast.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    //綁定隊(duì)列 2 到交換機(jī)
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }

}

b)運(yùn)行以后可以通過 RabbitMQ 客戶端看到以下綁定情況:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

?spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

c)添加測(cè)試方法

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testFanoutMessage() {
        String exchangeName = "itcast.fanout";
        String message = "hello! fanout";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

}

d)這里一個(gè)交換機(jī)綁定了兩個(gè)隊(duì)列,因此運(yùn)行結(jié)果如下?:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

1.3.4、Spring AMQP 實(shí)現(xiàn)路由交換機(jī)消息隊(duì)列(Direct Exchange)

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

?Direct Exchange 會(huì)將接收到的消息根據(jù)規(guī)則路由到指定的Queue,因此稱為路由模式(routes),通常一個(gè) Queue 都會(huì)與 Exchange 設(shè)置一個(gè) BindingKey,發(fā)布者發(fā)送消息時(shí)會(huì)先指定 RoutingKey,這時(shí)交換機(jī)就會(huì)將消息路由到 BindingKey?與消息?RoutingKey?一致的隊(duì)列。

具體實(shí)現(xiàn)如下:

a)在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽direct.queue1和direct.queue2,并利用@RabbitListener聲明Queue、Exchange、BoutingKey

Ps:之氣我們使用 Bean 的方式創(chuàng)建交換機(jī)和隊(duì)列,實(shí)際上也可以通過 @RabbitListener 注解去聲明.? 這就要求我們必須要在 @RabbitListener 中通過 @QueueBinding 指定綁定關(guān)系,并通過 @Queue 和 @Exchange 去創(chuàng)建隊(duì)列和交換機(jī)

但建議將來微服務(wù)開發(fā)的時(shí)候都是用 Bean 的方式創(chuàng)建交換機(jī)和隊(duì)列,將業(yè)務(wù)代碼和配置代碼解耦合.

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"), //綁定隊(duì)列
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), //綁定交換機(jī),type 可以不寫,默認(rèn)為 Direct
            key = {"java", "C++"} // Bindingkey
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消費(fèi)者 1 收到 Direct 消息:" + msg);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"), //綁定隊(duì)列
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), //綁定交換機(jī),type 可以不寫,默認(rèn)為 Direct
            key = {"java", "GO"} // Bindingkey
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println("消費(fèi)者 2 收到 Direct 消息:" + msg);
    }

b)運(yùn)行后可以觀察綁定情況:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

c)編寫測(cè)試方法:

    @Test
    public void testDirectMessage() {
        String exchangeName = "itcast.direct";
        String message = "hello! direct";
        rabbitTemplate.convertAndSend(exchangeName, "C++", message);
    }

d)運(yùn)行結(jié)果如下:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

1.3.5、SpringAMQP 實(shí)現(xiàn)主題交換機(jī)消息模型(Topic Exchange)

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

TopicExchange?與?DirectExchange 十分類似,區(qū)別在于routingKey必須是多個(gè)單詞的列表,并且以 . 分割。

Queue與Exchange指定BindingKey時(shí)可以使用以下通配符:

#:代指0個(gè)或多個(gè)單詞
*:代指一個(gè)單詞

例如: china.# 就相當(dāng)于 china.new 或者 china.difa 或者 china.alhglag ......等等一切情況

具體實(shí)現(xiàn)如下(這里不具體說明了,除了通配符差異,用法跟 Direct 消息隊(duì)列一模一樣~):

@Component
public class SpringRabbitListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg) {
        System.out.println("消費(fèi)者 1 收到 topic 消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.good"
    ))
    public void listenTopicQueue2(String msg) {
        System.out.println("消費(fèi)者 2 收到 topic 消息:" + msg);
    }

}
    @Test
    public void testTopicMessage() {
        String exchangeName = "itcast.topic";
        String message = "hello! topic";
        rabbitTemplate.convertAndSend(exchangeName, "china.good", message);
    }

運(yùn)行結(jié)果:

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq

spring消息隊(duì)列,RabbitMQ(從理論到根據(jù)源碼實(shí)現(xiàn)),rabbitmq文章來源地址http://www.zghlxwxcb.cn/news/detail-848877.html

到了這里,關(guān)于RabbitMQ - 基于 SpringAMQP 帶你實(shí)現(xiàn)五種消息隊(duì)列模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型

    MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型

    書接上文,展示一下五種模型我使用的是spring could 微服務(wù)的框架 文章說明: ? ? ? ? 本文章我會(huì)分享總結(jié)5種實(shí)用的rabbitMQ的實(shí)用模型 1、hello world簡(jiǎn)單模型 2、work queues工作隊(duì)列 3、Publish/Subscribe發(fā)布訂閱模型 4、Routing路由模型 5、Topics 主題模型 (贈(zèng)送) 6、消息轉(zhuǎn)換器 Rabbi

    2024年02月05日
    瀏覽(40)
  • SpringBoot實(shí)現(xiàn)RabbitMQ的簡(jiǎn)單隊(duì)列(SpringAMQP 實(shí)現(xiàn)簡(jiǎn)單隊(duì)列)

    SpringBoot實(shí)現(xiàn)RabbitMQ的簡(jiǎn)單隊(duì)列(SpringAMQP 實(shí)現(xiàn)簡(jiǎn)單隊(duì)列)

    SpringAMQP 是基于 RabbitMQ 封裝的一套模板,并且還利用 SpringBoot 對(duì)其實(shí)現(xiàn)了自動(dòng)裝配,使用起來非常方便。 SpringAmqp 的官方地址:https://spring.io/projects/spring-amqp 說明 : 1.Spring AMQP 是對(duì) Spring 基于 AMQP 的消息收發(fā)解決方案,它是一個(gè)抽象層,不依賴于特定的 AMQP Broker 實(shí)現(xiàn)和客戶端

    2024年04月13日
    瀏覽(20)
  • SpringCloud-實(shí)現(xiàn)基于RabbitMQ的消息隊(duì)列

    消息隊(duì)列是現(xiàn)代分布式系統(tǒng)中常用的通信機(jī)制,用于在不同的服務(wù)之間傳遞消息。在Spring Cloud框架中,我們可以利用RabbitMQ實(shí)現(xiàn)強(qiáng)大而可靠的消息隊(duì)列系統(tǒng)。本篇博客將詳細(xì)介紹如何在Spring Cloud項(xiàng)目中集成RabbitMQ,并創(chuàng)建一個(gè)簡(jiǎn)單的消息隊(duì)列。 這里是一個(gè)簡(jiǎn)單的RabbitMQ消息隊(duì)列

    2024年03月11日
    瀏覽(24)
  • 手寫消息隊(duì)列(基于RabbitMQ)

    手寫消息隊(duì)列(基于RabbitMQ)

    提到消息隊(duì)列是否喚醒了你腦海深處的記憶?回看前面的這篇文章:《Java 多線程系列Ⅳ(單例模式+阻塞式隊(duì)列+定時(shí)器+線程池)》,其中我們?cè)诮榻B阻塞隊(duì)列時(shí)說過,阻塞隊(duì)列最大的用途就是實(shí)現(xiàn) 生產(chǎn)者消費(fèi)者模型 。 我們知道對(duì)于生產(chǎn)者消費(fèi)者模型來說,它具有兩個(gè)十分

    2024年02月05日
    瀏覽(21)
  • RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    假設(shè)有一個(gè)業(yè)務(wù)場(chǎng)景:超過30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)? RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。 隊(duì)列有一個(gè)消息過期屬性。就像豐巢超過24小時(shí)就收費(fèi)一樣,通過設(shè)置這個(gè)屬性,超過了指定事件的消息將會(huì)被丟棄。 這個(gè)屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • 基于RabbitMQ的模擬消息隊(duì)列需求文檔

    基于RabbitMQ的模擬消息隊(duì)列需求文檔

    什么是消息隊(duì)列? 消息隊(duì)列就是,基于阻塞隊(duì)列,封裝成一個(gè)獨(dú)立的服務(wù)器程序,實(shí)現(xiàn)跨主機(jī)使用生產(chǎn)者-消費(fèi)者模型。生產(chǎn)者生產(chǎn)消息到消息隊(duì)列,消費(fèi)者從消息隊(duì)列消費(fèi)數(shù)據(jù)。 1.核心概念 生產(chǎn)者(Producer):生產(chǎn)消息的客戶端 消費(fèi)者 (Consumer) :消費(fèi)消息的客戶端 中間人

    2024年02月10日
    瀏覽(29)
  • 【圖解RabbitMQ-7】圖解RabbitMQ五種隊(duì)列模型(簡(jiǎn)單模型、工作模型、發(fā)布訂閱模型、路由模型、主題模型)及代碼實(shí)現(xiàn)

    【圖解RabbitMQ-7】圖解RabbitMQ五種隊(duì)列模型(簡(jiǎn)單模型、工作模型、發(fā)布訂閱模型、路由模型、主題模型)及代碼實(shí)現(xiàn)

    ?????作者名稱:DaenCode ??作者簡(jiǎn)介:CSDN實(shí)力新星,后端開發(fā)兩年經(jīng)驗(yàn),曾擔(dān)任甲方技術(shù)代表,業(yè)余獨(dú)自創(chuàng)辦智源恩創(chuàng)網(wǎng)絡(luò)科技工作室。會(huì)點(diǎn)點(diǎn)Java相關(guān)技術(shù)棧、帆軟報(bào)表、低代碼平臺(tái)快速開發(fā)。技術(shù)尚淺,閉關(guān)學(xué)習(xí)中······ ??人生感悟:嘗盡人生百味,方知世間冷暖。

    2024年02月07日
    瀏覽(26)
  • 基于RabbitMQ的模擬消息隊(duì)列之四——內(nèi)存管理

    基于RabbitMQ的模擬消息隊(duì)列之四——內(nèi)存管理

    針對(duì)交換機(jī)、隊(duì)列、綁定、消息、待確認(rèn)消息設(shè)計(jì)數(shù)據(jù)結(jié)構(gòu)。 交換機(jī)集合 exchangeMap 數(shù)據(jù)結(jié)構(gòu):ConcurrentHashMap key:交換機(jī)name value:交換機(jī)對(duì)象 隊(duì)列集合 queueMap 數(shù)據(jù)結(jié)構(gòu): ConcurrentHashMap key:隊(duì)列name value:隊(duì)列對(duì)象 綁定集合 bindingsMap 數(shù)據(jù)結(jié)構(gòu): ConcurrentHashMap 嵌套 ConcurrentHashMap key

    2024年02月10日
    瀏覽(17)
  • 消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    1、延遲隊(duì)列概念 延時(shí)隊(duì)列內(nèi)部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時(shí)屬性 上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來說, 延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列。 延遲隊(duì)列使用場(chǎng)景: 訂單在十分鐘之內(nèi)未支付則

    2024年02月22日
    瀏覽(20)
  • 參考RabbitMQ實(shí)現(xiàn)一個(gè)消息隊(duì)列

    參考RabbitMQ實(shí)現(xiàn)一個(gè)消息隊(duì)列

    消息隊(duì)列的本質(zhì)就是阻塞隊(duì)列,它的最大用途就是用來實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型,從而實(shí)現(xiàn) 解耦合 以及 削峰填谷 。 在分布式系統(tǒng)中不再是單個(gè)服務(wù)器而是服務(wù)器“集群”,如果我們我們直接A服務(wù)器給B服務(wù)器發(fā)送請(qǐng)求,B服務(wù)器給A服務(wù)器返回響應(yīng),這樣的話我們AB的耦合較大

    2024年02月14日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包