国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring Boot整合RabbitMQ之路由模式(Direct)

這篇具有很好參考價值的文章主要介紹了Spring Boot整合RabbitMQ之路由模式(Direct)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

RabbitMQ中的路由模式(Direct模式)應該是在實際工作中運用的比較多的一種模式了,這個模式和發(fā)布與訂閱模式的區(qū)別在于路由模式需要有一個routingKey,在配置上,交換機類型需要注入DirectExchange類型的交換機bean對象。在交換機和隊列的綁定過程中,綁定關系需要在綁定一個路由key。由于在實際的工作中不大可能會用自動確認的模式,所以我們在整合路由模式的過程中,依然采用發(fā)送消息雙確認機制和消費端手動確認的機制來保證消息的準確送達與消息防丟失。

1. 添加配置

在配置文件中,配置rabbitmq的相關賬號信息,開啟消息發(fā)送回調機制,配置文件其實和發(fā)布訂閱模式是一樣的。配置詳情如下:

server:
  port: 10001

spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 發(fā)送者開啟 return 確認機制
    publisher-returns: true
    # 發(fā)送者開啟 confirm 確認機制
    publisher-confirm-type: correlated

2. 創(chuàng)建配置類

    創(chuàng)建配置類RabbitMQConfig,用于聲明交換機、隊列,建立隊列和交換機的綁定關系,注入RabbitTemplate的bean對象。配置類詳情如下:
package com.study.rabbitmq.config;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "direct-order-exchange";
    public static final String SMS_QUEUE = "sms-direct-queue";
    public static final String EMAIL_QUEUE = "email-direct-queue";
    public static final String WECHAT_QUEUE = "wechat-direct-queue";

    /**
     * 1.
     * 聲明交換機
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        /**
         * directExchange的參數(shù)說明:
         * 1. 交換機名稱
         * 2. 是否持久化 true:持久化,交換機一直保留 false:不持久化,用完就刪除
         * 3. 是否自動刪除 false:不自動刪除 true:自動刪除
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 聲明隊列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue構造函數(shù)參數(shù)說明
         * 1. 隊列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 隊列與交換機綁定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
    }

    /**
     * 將自定義的RabbitTemplate對象注入bean容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //設置開啟消息推送結果回調
        rabbitTemplate.setMandatory(true);
        //設置ConfirmCallback回調
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("==============ConfirmCallback start ===============");
                log.info("回調數(shù)據(jù):{}", correlationData);
                log.info("確認結果:{}", ack);
                log.info("返回原因:{}", cause);
                log.info("==============ConfirmCallback end =================");
            }
        });
        //設置ReturnCallback回調
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("==============ReturnCallback start ===============");
                log.info("發(fā)送消息:{}", JSONUtil.toJsonStr(message));
                log.info("結果狀態(tài)碼:{}", replyCode);
                log.info("結果狀態(tài)信息:{}", replyText);
                log.info("交換機:{}", exchange);
                log.info("路由key:{}", routingKey);
                log.info("==============ReturnCallback end =================");
            }
        });
        return rabbitTemplate;
    }
}

3. 消費者配置

    在消費者項目的配置文件中開啟手動確認,配置詳情如下:
server:
  port: 10002

spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        # 表示消費者消費成功消息以后需要手工的進行簽收(ack確認),默認為 auto
        acknowledge-mode: manual

4. 創(chuàng)建消費者

分別創(chuàng)建三個消費者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer來監(jiān)聽對應的隊列,有消息后進行消費,三個消費者大同小異,分別如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:54
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //監(jiān)聽隊列
public class DirectEmailConsumer {

    //標記消費者邏輯執(zhí)行方法
    @RabbitHandler
    public void emailMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("Email direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重復處理失敗,拒絕再次接收...");
                //basicReject: 拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似 false表示消息不再重新進入隊列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                log.error("消息即將再次返回隊列處理...");
                // basicNack:表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //監(jiān)聽隊列
public class DirectSmsConsumer {

    @RabbitHandler
    public void smsMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("sms direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重復處理失敗,拒絕再次接收...");
                //basicReject: 拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似 false表示消息不再重新進入隊列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                log.error("消息即將再次返回隊列處理...");
                // basicNack:表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author chaoxian.wu
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //監(jiān)聽隊列
public class DirectWechatConsumer {

    @RabbitHandler
    public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("wechat direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重復處理失敗,拒絕再次接收...");
                //basicReject: 拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似 false表示消息不再重新進入隊列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                log.error("消息即將再次返回隊列處理...");
                // basicNack:表示失敗確認,一般在消費消息業(yè)務異常時用到此方法,可以將消息重新投遞入隊列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

以上就是全部的代碼部分,接下來我們在進入測試,看看實際效果如何,先發(fā)布一個routingKey=sms的消息,查看是不是只有對應的一個隊列中接收到消息,消息發(fā)送詳情:

package com.study.rabbitmq;

import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        for (long i = 1; i < 2; i++) {
            //交換機名稱
            String exchangeName = "direct-order-exchange";
            //路由key
            String routingKey = "sms";
            Order order = buildOrder(i);
            orderService.createOrder(order, routingKey, exchangeName);
        }
    }

    private Order buildOrder(long id) {
        Order order = new Order();
        order.setRequestId(id);
        order.setUserId(id);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setAmount(10L);
        order.setGoodsNum(1);
        order.setTotalAmount(10L);
        return order;
    }
}

我們登錄rabbitmq管理后臺查看下,只有sms-direct-queue這個隊列有一條消息,效果如下:

Spring Boot整合RabbitMQ之路由模式(Direct),消息隊列rabbitmq,java-rabbitmq,spring boot,rabbitmq

我們啟動消費者,看下是不是只有監(jiān)聽了sms-direct-queue這個隊列的消費者有消費日志,效果如下:

Spring Boot整合RabbitMQ之路由模式(Direct),消息隊列rabbitmq,java-rabbitmq,spring boot,rabbitmq

再發(fā)一條routingKey=email的消息,消費的日志,效果圖示如下

Spring Boot整合RabbitMQ之路由模式(Direct),消息隊列rabbitmq,java-rabbitmq,spring boot,rabbitmq

到此其實已經(jīng)springboot整合rabbitmq的路由模式結束了,這種模式在工作中還是比較常見的,我們演示的是單點的效果,實際工作中,不大可能會使用服務單點部署,現(xiàn)在都講究服務的高可用,就得服務集群部署,又會涉及到消息重復消費的問題需要處理,我個人覺得,遇到重復消費問題,我第一時間想到的就是分布式鎖,哈哈~。但是鎖什么呢?肯定是消息中的具備唯一性的屬性。來達到防止消息的重復消費。

整個過程中,其實還存在一個小問題沒有驗證,就是ReturnCallback回調機制沒有觸發(fā),因為這個得發(fā)生在交換機將消息發(fā)送到隊列的時候失敗才會觸發(fā),那么我們就發(fā)送一個不存在的routingKey就可以觸發(fā)了,我們發(fā)送一個routingKey=duanxin的消息,這個肯定不會發(fā)送成功,我們通過斷點來看看效果,效果如下:

Spring Boot整合RabbitMQ之路由模式(Direct),消息隊列rabbitmq,java-rabbitmq,spring boot,rabbitmq

然后我們常見的就全部整合完成了,當然,開啟了雙確認機制,雖然我們可以檢測到消息投送的結果,然后可以針對投送失敗的結果進行預警。但是開啟了這個操作,就必然會對消息的處理效率產(chǎn)生影響。所以還得根據(jù)實際業(yè)務場景而定是否需要使用這個確認機制。文章來源地址http://www.zghlxwxcb.cn/news/detail-681763.html

到了這里,關于Spring Boot整合RabbitMQ之路由模式(Direct)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Spring Boot 整合 RabbitMQ 實現(xiàn)延遲消息

    Spring Boot 整合 RabbitMQ 實現(xiàn)延遲消息

    消息隊列(Message Queuing,簡寫為 MQ)最初是為了解決金融行業(yè)的特定業(yè)務需求而產(chǎn)生的。慢慢的,MQ 被應用到了更多的領域,然而商業(yè) MQ 高昂的價格讓很多初創(chuàng)公司望而卻步,于是 AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)應運而生。 隨著 AMQP 草案的發(fā)布,兩個月

    2024年04月08日
    瀏覽(29)
  • 第二章 Spring Boot 整合 Kafka消息隊列 生產(chǎn)者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全認證 第二章??Spring Boot 整合 Kafka消息隊列?生產(chǎn)者 第三章??Spring Boot 整合 Kafka消息隊列?消息者 ????????Kafka 是一個消息隊列產(chǎn)品,基于Topic partitions的設計,能達到非常高的消息發(fā)送處理性能。本文主是基于Spirng Boot封裝了Apache 的

    2024年01月25日
    瀏覽(18)
  • Spring整合RabbitMQ-配制文件方式-3-消息拉模式

    拉消息的消費者 spring-rabbit.xml 當啟動消費者后,便可獲取到發(fā)送至隊列的消息 檢查隊列的消息的情況: 經(jīng)過檢查確認,發(fā)現(xiàn)消息已經(jīng)被消費了。 至此拉模式的消費者完成。

    2024年02月09日
    瀏覽(19)
  • Spring Boot整合RabbitMQ之發(fā)布與訂閱模式

    Spring Boot整合RabbitMQ之發(fā)布與訂閱模式

    RabbitMQ的模式中,常用的模式有:簡單模式,發(fā)布與訂閱模式,工作模式,路由模式,主題模式。簡單模式不太會運用到工作中,我們可以使用 RabbitMQ 的發(fā)布訂閱模式,實現(xiàn): 用戶發(fā)布動態(tài),其“粉絲”收到其發(fā)布動態(tài)的消息 用戶下訂單,庫存模塊、支付模塊等收到消息并

    2024年02月12日
    瀏覽(22)
  • RabbitMQ入門 消息隊列快速入門 SpringAMQP WorkQueue 隊列和交換機 Fanout Direct exchange RAbbitMQ單體部署

    RabbitMQ入門 消息隊列快速入門 SpringAMQP WorkQueue 隊列和交換機 Fanout Direct exchange RAbbitMQ單體部署

    微服務間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實時響應。 異步通訊:就像發(fā)郵件,不需要馬上回復。 兩種方式各有優(yōu)劣,打電話可以立即得到響應,但是你卻不能跟多個人同時通話。發(fā)送郵件可以同時與多個人收發(fā)郵件,但是往往響應會有延遲。 1.

    2024年04月08日
    瀏覽(19)
  • RabbitMQ詳解(三):消息模式(fanout、direct、topic、work)

    RabbitMQ詳解(三):消息模式(fanout、direct、topic、work)

    參考官網(wǎng):https://www.rabbitmq.com/getstarted.html 簡單模式 Simple, 參考RabbitMQ詳解(二):消息模式 Simple(簡單)模式 簡單模式是最簡單的消息模式,它包含一個生產(chǎn)者、一個消費者和一個隊列。生產(chǎn)者向隊列里發(fā)送消息,消費者從隊列中獲取消息并消費。 發(fā)布訂閱模式 fanout 同時向

    2024年02月10日
    瀏覽(20)
  • 利用消息中間件RabbitMQ創(chuàng)建隊列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機來完成消息的發(fā)送和監(jiān)聽接收(完整版)

    利用消息中間件RabbitMQ創(chuàng)建隊列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機來完成消息的發(fā)送和監(jiān)聽接收(完整版)

    目錄 一、前期項目環(huán)境準備 1.1父項目以及子項目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout)?交換機實現(xiàn)消息的發(fā)送和接收 2.1編寫子項目consumer(消費者,接收消息)的代碼實現(xiàn)扇出(Fanout)交換機接收消息 2.1.1consumer子項目結構 2.1.2FanoutConfig類的實現(xiàn)扇出(Fanout)交

    2024年02月05日
    瀏覽(95)
  • 消息隊列-RabbitMQ:Exchanges、綁定 bindings以及3大常用交換機(Fanout exchange、Direct exchange、Topics exchange)

    消息隊列-RabbitMQ:Exchanges、綁定 bindings以及3大常用交換機(Fanout exchange、Direct exchange、Topics exchange)

    RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會直接發(fā)送到隊列 。實際上, 通常生產(chǎn)者甚至都不知道這些消息傳遞傳遞到了哪些隊列中 。 相反, 生產(chǎn)者只能將消息發(fā)送到交換機 (exchange) , 交換機工作 的內容非常簡單, 一方面它接收來自生產(chǎn)者的消息 , 另一

    2024年04月08日
    瀏覽(21)
  • 消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

    消息隊列-RabbitMQ:延遲隊列、rabbitmq 插件方式實現(xiàn)延遲隊列、整合SpringBoot

    1、延遲隊列概念 延時隊列內部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時屬性 上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說, 延時隊列就是用來存放需要在指定時間被處理的元素的隊列。 延遲隊列使用場景: 訂單在十分鐘之內未支付則

    2024年02月22日
    瀏覽(20)
  • 消息隊列之RabbitMQ工作模式

    提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動添加 消息隊列之RabbitMQ工作模式 提示:寫完文章后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內容: 在這篇博客中,我將深入探討 RabbitMQ 的工作模式,帶你

    2024年01月18日
    瀏覽(114)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包