国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

提示:文章寫完后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔


SpringAMQP

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

1.SpringBoot 的支持

  • SpringBoot 已經(jīng)提供了對(duì) AMQP 協(xié)議完全支持的 spring-boot-starter-amqp 依賴,引入此依賴即可快速方便的在 SpringBoot 中使用 RabbitMQ。
https://spring.io/projects/spring-amqp

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

2.RabbitTemplate

  • RabbitTemplate 是 SpringBoot AMQP 提供的快速發(fā) RabbitMQ 消息的模板類,與 RestTemplate 有類似之處,意指方便、簡(jiǎn)單、快速的發(fā) RabbitMQ 消息。
@Slf4j
@Component
public class ClientReportTopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String ROUTING_KEY = "report";

    public void send(String param) {
        rabbitTemplate.send(TopicConst.CLIENT_REPORT_TOPIC, ROUTING_KEY, new Message(param.getBytes(), new MessageProperties()));
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。

convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,然后將其發(fā)送到指定的交換機(jī)和路由鍵中。

sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。

convertSendAndReceive:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息。

convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。

convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。

sendWithMessagePostProcessor:發(fā)送消息,并在發(fā)送之前進(jìn)行處理。

execute:執(zhí)行Rabbit操作并返回一個(gè)結(jié)果。

receive:從隊(duì)列接收一條消息。

receiveAndConvert:從隊(duì)列接收一條消息,并將其轉(zhuǎn)換為Java對(duì)象。

receiveAndReply:從隊(duì)列接收一條請(qǐng)求消息,并發(fā)送一個(gè)響應(yīng)消息。

convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。

convertSendAndReceiveAsType:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,發(fā)送請(qǐng)求消息,并接收響應(yīng)消息,并將響應(yīng)消息轉(zhuǎn)換為指定類型的Java對(duì)象。

convertSendAndReceiveAndReplyHeader:將Java對(duì)象轉(zhuǎn)換為請(qǐng)求消息,并發(fā)送請(qǐng)求消息。接收到請(qǐng)求消息后,將其轉(zhuǎn)換為響應(yīng)消息,并設(shè)置響應(yīng)消息的頭信息。

convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。

convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理。

convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型。

convertAndSend:將Java對(duì)象轉(zhuǎn)換為消息,并發(fā)送消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型和交換機(jī)。

send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。

send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。在發(fā)送之前,先對(duì)消息進(jìn)行處理。

send:將消息發(fā)送到指定的交換機(jī)和路由鍵中。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型。

sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。

sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理。

sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型。

sendAndReceive:發(fā)送一個(gè)請(qǐng)求消息并接收一個(gè)響應(yīng)消息。在發(fā)送之前,先對(duì)消息進(jìn)行處理,并指定響應(yīng)消息的類型和交換機(jī)。

setConnectionFactory:設(shè)置RabbitMQ連接工廠。

getConnectionFactory:獲取RabbitMQ連接工廠。

setExchange:設(shè)置默認(rèn)的交換機(jī)。

getExchange:獲取默認(rèn)的交換機(jī)。

setRoutingKey:設(shè)置默認(rèn)的路由鍵。

getRoutingKey:獲取默認(rèn)的路由鍵。

setQueue:設(shè)置默認(rèn)的隊(duì)列。

getQueue:獲取默認(rèn)的隊(duì)列。

setMandatory:設(shè)置消息是否強(qiáng)制路由到隊(duì)列。

isMandatory:檢查消息是否強(qiáng)制路由到隊(duì)列。

setReplyTimeout:設(shè)置接收響應(yīng)消息的超時(shí)時(shí)間。

getReplyTimeout:獲取接收響應(yīng)消息的超時(shí)時(shí)間。

setChannelTransacted:設(shè)置通道是否應(yīng)該在事務(wù)中使用。

isChannelTransacted:檢查通道是否應(yīng)該在事務(wù)中使用。

setConfirmCallback:設(shè)置確認(rèn)回調(diào)。

getConfirmCallback:獲取確認(rèn)回調(diào)。

setReturnCallback:設(shè)置返回回調(diào)。

getReturnCallback:獲取返回回調(diào)。

setBeforePublishPostProcessor:設(shè)置發(fā)布之前的后處理器。

getBeforePublishPostProcessor:獲取發(fā)布之前的后處理器。

setAfterReceivePostProcessor:設(shè)置接收后的后處理器。

getAfterReceivePostProcessor:獲取接收后的后處理器。

setUsePublisherConnection:設(shè)置是否應(yīng)該使用發(fā)布者連接。

isUsePublisherConnection:檢查是否應(yīng)該使用發(fā)布者連接。

setApplicationContext:設(shè)置應(yīng)用程序上下文。

3.@RabbitListener(終極監(jiān)聽(tīng)方案)

使用此方案做監(jiān)聽(tīng)消息功能,就可以把之前的 SimpleMessageListenerContainer 進(jìn)行監(jiān)聽(tīng)的方案舍棄掉了,就是這么的喜新厭舊,不過(guò)之前的 SimpleMessageListenerContainer 也不是一無(wú)是處,學(xué)過(guò)之后可以更好的理解內(nèi)部的一些邏輯。

@RabbitListener 的特點(diǎn):

  • RabbitListener 是 SpringBoot 架構(gòu)中監(jiān)聽(tīng)消息的終極方案。
  • RabbitListener 使用注解聲明,對(duì)業(yè)務(wù)代碼無(wú)侵入。
  • RabbitListener 可以在 SpringBoot 配置文件中進(jìn)行配置。

@RabbitListener 本身是 Java 中的注解,可以搭配其他注解一起使用:

  • @Exchange:自動(dòng)聲明 Exchange。
  • @Queue:自動(dòng)聲明隊(duì)列。
  • @QueueBinding:自動(dòng)聲明綁定關(guān)系。
package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage1(String msg){
        System.out.println("consume1接收到的消息:"+msg);
    }
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage2(String msg){
        System.out.println("consume2接收到的消息:"+msg);
    }
}

4.RabbitConfig—rabbitmq配置類

聲明式實(shí)現(xiàn)(推薦)

@Slf4j
@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "exchange.cat.dog";
    public static final String EXCHANGE_DLX  = "exchange.dlx";
    public static final String QUEUE_NAME    = "queue.cat";
    public static final String QUEUE_DLX     = "queue.dlx";
    public static final String KEY_NAME      = "key.yingduan";
    public static final String KEY_DLX       = "#";

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
        return connectionFactory;
    }

    @Bean
    RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    Exchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    Binding binding() {
        // 目的地名稱、目的地類型、綁定交換機(jī)、綁定 key、參數(shù)
        return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
    }

     //死信隊(duì)列機(jī)制  死信隊(duì)列需要在創(chuàng)建 Queue 時(shí)指定對(duì)應(yīng)屬性:
     @Bean
    Queue queue() {
          // 配置聲明隊(duì)列時(shí)使用的參數(shù)
          Map<String, Object> args = new HashMap<>(1);
          // 設(shè)置死信隊(duì)列指向的交換機(jī)
          args.put("x-dead-letter-exchange", EXCHANGE_DLX);
         return new Queue(QUEUE_NAME, true, false, false, args);
    }

}

注意,以上配置再啟動(dòng) SpringBoot 并不會(huì)立馬創(chuàng)建交換機(jī)、隊(duì)列、綁定,SpringBoot AMQP 有懶加載,需要等到使用 connection 時(shí)才會(huì)創(chuàng)建。什么是使用 connection 呢?

  • 比如創(chuàng)建 connection
@Bean
ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setUsername("admin");
  connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
  connectionFactory.createConnection();
  return connectionFactory;
}

  • 再比如監(jiān)聽(tīng)了隊(duì)列
@RabbitListener(queues = {"test"})
void test() {
  log.info("【測(cè)試監(jiān)聽(tīng)消息】");
}

SpringBoot集成RabbitMQ 案例

配置

導(dǎo)入maven坐標(biāo)
<!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
yml配置
spring:
  rabbitmq:
    addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672
    username: admin
    password: admin
    #開(kāi)啟消息確認(rèn)模式,新版本已經(jīng)棄用
    #publisher-confirms: true
    #開(kāi)啟消息送達(dá)提示
    publisher-returns: true
    # springboot.rabbitmq.publisher-confirm 新版本已被棄用,現(xiàn)在使用 spring.rabbitmq.publisher-confirm-type = correlated 實(shí)現(xiàn)相同效果
    publisher-confirm-type: correlated
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto #確認(rèn)模式
        prefetch: 1 #限制每次發(fā)送一條數(shù)據(jù)。
        concurrency: 3 #同一個(gè)隊(duì)列啟動(dòng)幾個(gè)消費(fèi)者
        max-concurrency: 3 #啟動(dòng)消費(fèi)者最大數(shù)量
        #重試策略相關(guān)配置
        retry:
          # 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開(kāi)啟并一直重試
          enabled: true
          # 最大重試次數(shù)
          max-attempts: 5
          # 重試間隔時(shí)間(毫秒)
          initial-interval: 3000

RabbitMQ 參數(shù)配置說(shuō)明

spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #賬號(hào)
    password: guest #密碼
    virtualHost:    #鏈接的虛擬主機(jī)
    addresses: 127.0.0.1:5672     #多個(gè)以逗號(hào)分隔,與host功能一樣。
    requestedHeartbeat: 60 #指定心跳超時(shí),單位秒,0為不指定;默認(rèn)60s
    publisherConfirms: true  #發(fā)布確認(rèn)機(jī)制是否啟用
    #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
    #publisher-confirm-type參數(shù)有三個(gè)可選值:
    #SIMPLE:會(huì)觸發(fā)回調(diào)方法,相當(dāng)于單個(gè)確認(rèn)(發(fā)一條確認(rèn)一條)。
    #CORRELATED:消息從生產(chǎn)者發(fā)送到交換機(jī)后觸發(fā)回調(diào)方法。
    #NONE(默認(rèn)):關(guān)閉發(fā)布確認(rèn)模式。
    #publisher-confirm-type: correlated #發(fā)布確認(rèn)機(jī)制是否啟用 高版本Springboot使用替換掉publisher-confirms:true
    publisherReturns: true #發(fā)布返回是否啟用
    connectionTimeout: #鏈接超時(shí)。單位ms。0表示無(wú)窮大不超時(shí)
    ### ssl相關(guān)
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路徑
      keyStoreType: #key store類型 默認(rèn)PKCS12
      keyStorePassword: #指定訪問(wèn)key store的密碼
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默認(rèn)JKS
      trustStorePassword: #訪問(wèn)密碼
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否開(kāi)啟hostname驗(yàn)證
    ### cache相關(guān)
    cache:
      channel: 
        size: #緩存中保持的channel數(shù)量
        checkoutTimeout: #當(dāng)緩存數(shù)量被設(shè)置時(shí),從緩存中獲取一個(gè)channel的超時(shí)時(shí)間,單位毫秒;如果為0,則總是創(chuàng)建一個(gè)新channel
      connection:
        mode: #連接工廠緩存模式:CHANNEL 和 CONNECTION
        size: #緩存的連接數(shù),只有是CONNECTION模式時(shí)生效
    ### listener
    listener:
       type: #兩種類型,SIMPLE,DIRECT
       ## simple類型
       simple:
         concurrency: #最小消費(fèi)者數(shù)量
         maxConcurrency: #最大的消費(fèi)者數(shù)量
         transactionSize: #指定一個(gè)事務(wù)處理的消息數(shù)量,最好是小于等于prefetch的數(shù)量
         missingQueuesFatal: #是否停止容器當(dāng)容器中的隊(duì)列不可用
         ## 與direct相同配置部分
         autoStartup: #是否自動(dòng)啟動(dòng)容器
         acknowledgeMode: #表示消息確認(rèn)方式,其有三種配置方式,分別是none、manual和auto;默認(rèn)auto
         prefetch: #指定一個(gè)請(qǐng)求能處理多少個(gè)消息,如果有事務(wù)的話,必須大于等于transaction數(shù)量
         defaultRequeueRejected: #決定被拒絕的消息是否重新入隊(duì);默認(rèn)是true(與參數(shù)acknowledge-mode有關(guān)系)
         idleEventInterval: #container events發(fā)布頻率,單位ms
         ##重試機(jī)制
         retry: 
           stateless: #有無(wú)狀態(tài)
           enabled:  #是否開(kāi)啟
           maxAttempts: #最大重試次數(shù),默認(rèn)3
           initialInterval: #重試間隔
           multiplier: #對(duì)于上一次重試的乘數(shù)
           maxInterval: #最大重試時(shí)間間隔
       direct:
         consumersPerQueue: #每個(gè)隊(duì)列消費(fèi)者數(shù)量
         missingQueuesFatal:
         #...其余配置看上方公共配置
     ## template相關(guān)
     template:
       mandatory: #是否啟用強(qiáng)制信息;默認(rèn)false
       receiveTimeout: #`receive()`接收方法超時(shí)時(shí)間
       replyTimeout: #`sendAndReceive()`超時(shí)時(shí)間
       exchange: #默認(rèn)的交換機(jī)
       routingKey: #默認(rèn)的路由
       defaultReceiveQueue: #默認(rèn)的接收隊(duì)列
       ## retry重試相關(guān)
       retry: 
         enabled: #是否開(kāi)啟
         maxAttempts: #最大重試次數(shù)
         initialInterval: #重試間隔
         multiplier: #失敗間隔乘數(shù)
         maxInterval: #最大間隔


1.基本消息隊(duì)列

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

1、創(chuàng)建隊(duì)列

  • 訪問(wèn)接口:http://localhost:15672,賬號(hào)密碼都為guest

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

2、發(fā)布消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads() {
        String queue="MqTest1";
        String message="message1";
        rabbitTemplate.convertAndSend(queue,message);
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

3、接受消息

package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage(String msg){
        System.out.println("接收到的消息:"+msg);
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

2.工作消息隊(duì)列(Work Queue)

  • 可以提高消息處理速度,避免隊(duì)列消息堆積

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

1、發(fā)布消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads() {
        String queue="MqTest1";
        String message="message1";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(queue,message);
        }
    }
 
}

2、接受消息

package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage1(String msg){
        System.out.println("consume1接收到的消息:"+msg);
    }
    @RabbitListener(queues = "MqTest1")
    public void listenSimpleQueueMessage2(String msg){
        System.out.println("consume2接收到的消息:"+msg);
    }
}

3、控制臺(tái)輸出結(jié)果

consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1

4、消息預(yù)取問(wèn)題

  • 但是此時(shí)有一個(gè)問(wèn)題就是消息預(yù)取,比如隊(duì)列有10條消息,兩個(gè)消費(fèi)者各自直接先預(yù)取5個(gè)消息,如果一個(gè)消費(fèi)者接受消息的速度慢,一個(gè)快,就會(huì)導(dǎo)致一個(gè)消費(fèi)者已經(jīng)完成工作,另一個(gè)還在慢慢處理,會(huì)造成消息堆積消費(fèi)者身上,要解決這個(gè)問(wèn)題需要在yml文件配置相關(guān)配置
  rabbitmq:
    host: 43.140.244.236
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能取一個(gè),處理完才能取下一個(gè)消息

3.發(fā)布訂閱模式之模式(Fanout)

exchange是交換機(jī),負(fù)責(zé)消息路由,但不存儲(chǔ)消息,路由失敗則消息丟失

生產(chǎn)者將消息發(fā)送到fanout交換器
  • fanout交換機(jī)非常簡(jiǎn)單。它只是將接收到的所有消息廣播給它所知道的所有隊(duì)列

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

1、Fanout配置類(@Bean聲明)

package com.rabbitmqdemoconsumer.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class FanountConfig {
    //交換機(jī)聲明
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("FanountExchange");
    }
    //聲明隊(duì)列1
    @Bean
    public Queue Fanount_Qeueue1(){
        return new Queue("Fanount_Qeueue1");
    }
    //聲明隊(duì)列2
    @Bean
    public Queue Fanount_Qeueue2(){
        return new Queue("Fanount_Qeueue2");
    }
    //綁定交換機(jī)和隊(duì)列
    @Bean
    public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

2、發(fā)送消息

首先發(fā)送10條消息,經(jīng)過(guò)交換機(jī)轉(zhuǎn)發(fā)到隊(duì)列
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="FanountExchange";
        String message="message";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

3、接受消息

 //監(jiān)聽(tīng)交換機(jī)Fanount_Qeueue1
    @RabbitListener(queues = "Fanount_Qeueue1")
    public void listenFanountQeueue1(String msg){
        System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
    }
    //監(jiān)聽(tīng)交換機(jī)Fanount_Qeueue2
    @RabbitListener(queues = "Fanount_Qeueue2")
    public void listenFanountQeueue2(String msg){
        System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
    }

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

4.路由模式(Direct)

  • 會(huì)將消息根據(jù)規(guī)則路由到指定的隊(duì)列
生產(chǎn)者將消息發(fā)送到direct交換器

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

1、聲明(基于@RabbitListener聲明)

package com.rabbitmqdemoconsumer.rabbitmq;
 
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class SpringRabbitLeistener {
 
    /**
     * 綁定交換機(jī)和隊(duì)列,并為key賦值
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "DirectQueue1"),
        exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("listenDirectQueue1接收到的消息:"+msg);
    }
 
 
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "DirectQueue2"),
        exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
        key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("listenDirectQueue2接收到的消息:"+msg);
    }
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

2、發(fā)送給blue

發(fā)送消息
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

 
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="DirectExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"blue",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

3、發(fā)送給red

發(fā)送消息

@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="DirectExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"red",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

5.主題模式(Topic)

生產(chǎn)者將消息發(fā)送到 topic交換器

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

Queue與Exchange指定BindingKey可以使用通配符:

#:代指0個(gè)或多個(gè)單詞

*:代指一個(gè)單詞

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

1、聲明

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "TopicQueue1"),
        exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
        key = {"china.#"}
    ))
public void listenTopicQueue1(String msg){
    System.out.println("listenTopicQueue1接收到的消息:"+msg);
}
 
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "TopicQueue2"),
    exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
    key = {"#.news"}
))
public void listenTopicQueue2(String msg){
    System.out.println("listenTopicQueue2接收到的消息:"+msg);
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot
RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

2、發(fā)送消息(測(cè)試1)

package com.rabbitmqdemo;
 
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="TopicExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"china.news",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot

3、發(fā)送消息(測(cè)試2)

package com.rabbitmqdemo;
 
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
 
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    void contextLoads2() {
        String exchange="TopicExchange";
        String message="HelloWorld";
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(exchange,"china.weather",message);
        }
    }
 
}

RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ),MQ消息隊(duì)列,java-rabbitmq,rabbitmq,spring boot文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-841962.html

到了這里,關(guān)于RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • MQ-消息隊(duì)列-RabbitMQ

    MQ-消息隊(duì)列-RabbitMQ

    MQ(Message Queue) 消息隊(duì)列 ,是基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)中“ 先進(jìn)先出 ”的一種 數(shù)據(jù)結(jié)構(gòu) 。指把要傳輸?shù)臄?shù)據(jù)(消息)放在隊(duì)列中,用隊(duì)列機(jī)制來(lái)實(shí)現(xiàn)消息傳遞——生產(chǎn)者產(chǎn)生消息并把消息放入隊(duì)列,然后由消費(fèi)者去處理。消費(fèi)者可以到指定隊(duì)列拉取消息,或者訂閱相應(yīng)的隊(duì)列,由

    2024年02月09日
    瀏覽(27)
  • RabbitMQ入門 消息隊(duì)列快速入門 SpringAMQP WorkQueue 隊(duì)列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    RabbitMQ入門 消息隊(duì)列快速入門 SpringAMQP WorkQueue 隊(duì)列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延遲。 1.

    2024年04月08日
    瀏覽(19)
  • SpringBoot實(shí)現(xiàn)RabbitMQ的簡(jiǎn)單隊(duì)列(SpringAMQP 實(shí)現(xiàn)簡(jiǎn)單隊(duì)列)

    SpringBoot實(shí)現(xiàn)RabbitMQ的簡(jiǎn)單隊(duì)列(SpringAMQP 實(shí)現(xiàn)簡(jiǎn)單隊(duì)列)

    SpringAMQP 是基于 RabbitMQ 封裝的一套模板,并且還利用 SpringBoot 對(duì)其實(shí)現(xiàn)了自動(dòng)裝配,使用起來(lái)非常方便。 SpringAmqp 的官方地址:https://spring.io/projects/spring-amqp 說(shuō)明 : 1.Spring AMQP 是對(duì) Spring 基于 AMQP 的消息收發(fā)解決方案,它是一個(gè)抽象層,不依賴于特定的 AMQP Broker 實(shí)現(xiàn)和客戶端

    2024年04月13日
    瀏覽(22)
  • MQ消息隊(duì)列(主要介紹RabbitMQ)

    MQ消息隊(duì)列(主要介紹RabbitMQ)

    消息隊(duì)列概念:是在消息的傳輸過(guò)程中保存消息的容器。 作用:異步處理、應(yīng)用解耦、流量控制..... RabbitMQ: ? ? SpringBoot繼承RabbitMQ步驟: ? ? ? ? 1.加入依賴 ? ? ? ? ?2.配置 ? ? ? ? 3.開(kāi)啟(如果不需要監(jiān)聽(tīng)消息也就是不消費(fèi)就不需要該注解開(kāi)啟) ? ? ? ? 4.創(chuàng)建隊(duì)列、

    2024年02月11日
    瀏覽(33)
  • 消息隊(duì)列-RabbitMQ:MQ作用分類、RabbitMQ核心概念及消息生產(chǎn)消費(fèi)調(diào)試

    消息隊(duì)列-RabbitMQ:MQ作用分類、RabbitMQ核心概念及消息生產(chǎn)消費(fèi)調(diào)試

    1)什么是 MQ MQ (message queue),從字面意思上看, 本質(zhì)是個(gè)隊(duì)列,F(xiàn)IFO 先入先出 ,只不過(guò)隊(duì)列中存放的內(nèi)容是 message 而已,還是一種 跨進(jìn)程的通信機(jī)制 , 用于上下游傳遞消息 。在互聯(lián)網(wǎng)架構(gòu)中,MQ 是一種非常常見(jiàn)的上下游 “ 邏輯解耦 + 物理解耦” 的消息通信服務(wù) 。 使用了

    2024年02月20日
    瀏覽(28)
  • RaabitMQ(三) - RabbitMQ隊(duì)列類型、死信消息與死信隊(duì)列、懶隊(duì)列、集群模式、MQ常見(jiàn)消息問(wèn)題

    RaabitMQ(三) - RabbitMQ隊(duì)列類型、死信消息與死信隊(duì)列、懶隊(duì)列、集群模式、MQ常見(jiàn)消息問(wèn)題

    這是RabbitMQ最為經(jīng)典的隊(duì)列類型。在單機(jī)環(huán)境中,擁有比較高的消息可靠性。 經(jīng)典隊(duì)列可以選擇是否持久化(Durability)以及是否自動(dòng)刪除(Auto delete)兩個(gè)屬性。 Durability有兩個(gè)選項(xiàng),Durable和Transient。 Durable表示隊(duì)列會(huì)將消息保存到硬盤,這樣消息的安全性更高。但是同時(shí),由于需

    2024年02月14日
    瀏覽(1048)
  • MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型

    MQ消息隊(duì)列,以及RabbitMQ詳細(xì)(中1)五種rabbitMQ實(shí)用模型

    書接上文,展示一下五種模型我使用的是spring could 微服務(wù)的框架 文章說(shuō)明: ? ? ? ? 本文章我會(huì)分享總結(jié)5種實(shí)用的rabbitMQ的實(shí)用模型 1、hello world簡(jiǎn)單模型 2、work queues工作隊(duì)列 3、Publish/Subscribe發(fā)布訂閱模型 4、Routing路由模型 5、Topics 主題模型 (贈(zèng)送) 6、消息轉(zhuǎn)換器 Rabbi

    2024年02月05日
    瀏覽(40)
  • mq 消息隊(duì)列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十幾年前,淘寶的notify,借鑒ActiveMQ。京東的ActiveMQ集群幾百臺(tái),后面改成JMQ。 Linkedin的kafka,因?yàn)槭莝cala,國(guó)內(nèi)很多人不熟。淘寶的人把kafka用java寫了一遍,取名metaq,后來(lái)再改名RocketMQ。 總的來(lái)說(shuō),三大原因,語(yǔ)言、潮流、生態(tài)。 MQ這種東西,當(dāng)你的消息量不大的時(shí)候,用啥

    2024年02月12日
    瀏覽(17)
  • .NetCore 使用 RabbitMQ (交換機(jī)/隊(duì)列/消息持久化+mq高級(jí)特性+死信隊(duì)列+延遲隊(duì)列)

    .NetCore 使用 RabbitMQ (交換機(jī)/隊(duì)列/消息持久化+mq高級(jí)特性+死信隊(duì)列+延遲隊(duì)列)

    目錄 一、安裝mq 二、實(shí)操 1、簡(jiǎn)單模式 2、工作模式 3、fanout扇形模式(發(fā)布訂閱) 4、direct路由模式也叫定向模式 5、topic主題模式也叫通配符模式(路由模式的一種) 6、header 參數(shù)匹配模式 7、延時(shí)隊(duì)列(插件方式實(shí)現(xiàn)) 參考資料: 1、我的環(huán)境是使用VMware安裝的Centos7系統(tǒng)。MQ部署

    2023年04月09日
    瀏覽(112)
  • SpringCloud實(shí)用篇4——MQ RabbitMQ SpringAMQP

    SpringCloud實(shí)用篇4——MQ RabbitMQ SpringAMQP

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延遲。 1.

    2024年02月13日
    瀏覽(17)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包