国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

rabbitmq消息可靠性之消息回調(diào)機制

這篇具有很好參考價值的文章主要介紹了rabbitmq消息可靠性之消息回調(diào)機制。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

rabbitmq消息可靠性之消息回調(diào)機制

rabbitmq回調(diào)機制,java-rabbitmq,rabbitmq,分布式

rabbitmq在消息的發(fā)送與接收中,會經(jīng)過上面的流程,這些流程中每一步都有可能導(dǎo)致消息丟失,或者消費失敗甚至直接是服務(wù)器宕機等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機制

  • 生產(chǎn)者確認機制

  • 消息持久化存儲

  • 消費者確認機制

  • 失敗重試機制

本文主要講解生產(chǎn)者確認機制,也是rabbitmq提供的消息回調(diào)機制,這個機制可以解決生產(chǎn)者發(fā)送消息到交換機和交換機路由到隊列過程中的消息丟失問題

這種機制必須給每個消息指定一個唯一ID,消息發(fā)送到rabbitmq之后會返回結(jié)果給生產(chǎn)者,表示消息是否發(fā)送成功,返回結(jié)果有以下兩種

  • publisher-confirm:發(fā)送者確認:消息成功投遞到交換機,返回 ack;消息未投遞到交換機,返回 nack

  • publisher-return:發(fā)送者回執(zhí):消息成功投遞到交換機,但是沒有路由到隊列。返回 ack,及路由失敗原因

spring:
  rabbitmq:
    # rabbitMQ的ip地址
    host: 127.0.0.1
    # 端口
    port: 5672
    # 集群模式配置
    # addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
    username: admin
    password: 123456
    virtual-host: /
    # 消費者確認機制相關(guān)配置 
    # 開啟publisher-confirm,
    # 這里支持兩種類型:simple:同步等待confirm結(jié)果,直到超時;# correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback
    publisher-returns: true
    # 定義消息路由失敗時的策略。true,則調(diào)用ReturnCallback;false:則直接丟棄消息
    template:
      mandatory: true

然后定義 ReturnCallback 回調(diào),每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置

package com.gitee.small.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {

    //綁定鍵
    public final static String DOG = "topic.dog";
    public final static String CAT = "topic.cat";


    /**
     * Queue構(gòu)造函數(shù)參數(shù)說明
     * new Queue(SMS_QUEUE, true);
     * 1. 隊列名
     * 2. 是否持久化 true:持久化 false:不持久化
     */


    @Bean
    public Queue firstQueue() {
        return new Queue(DOG);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(CAT);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    /**
     * 將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.dog
     * 這樣只要是消息攜帶的路由鍵是topic.dog,才會分發(fā)到該隊列
     */
    @Bean(name = "binding.dog")
    public Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG);
    }

    /**
     * 將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#
     * 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊列
     */
    @Bean(name = "binding.cat")
    public Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發(fā)消息
        });
    }
}

接著定義 ConfirmCallback,ConfirmCallback 可以在發(fā)送消息時指定,因為每個業(yè)務(wù)處理 confirm 成功或失敗的邏輯不一定相同,上面已經(jīng)定義好exchange 和 queue,新建RabbitMqTest測試類

package smallJ;

import com.gitee.small.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class RabbitMqTest {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() throws InterruptedException {
        // 1.準備CorrelationData
        // 1.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 1.2.準備ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判斷結(jié)果
            if (result.isAck()) {
                // ACK
                log.info("消息成功投遞到交換機!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投遞到交換機失??!消息ID:{},原因:{}", correlationData.getId(), result.getReason());
                // 重發(fā)消息
            }
        }, ex -> {
            // 記錄日志
            log.error("消息發(fā)送異常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
            // 可以重發(fā)消息
        });
        rabbitTemplate.convertAndSend("topicExchange", "topic.dog", "路由模式測試-dog", correlationData);
        // 程序休眠兩秒等待回調(diào)
        Thread.sleep(2000);
    }
}

加兩個監(jiān)聽器進行測試

package com.gitee.small.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TopicRabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.dog"),
            exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC)
    ))
    public void process(String msg) {
        log.info("dog-收到消息:{}", msg);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.cat"),
            exchange = @Exchange(value = "bindingExchangeMessage2", type = ExchangeTypes.TOPIC)
    ))
    public void  process2(String msg){
        log.info("cat-收到消息:{}", msg);
    }
}

測試結(jié)果如下文章來源地址http://www.zghlxwxcb.cn/news/detail-717511.html

smallJ.RabbitMqTest   : 消息成功投遞到交換機!消息ID: 83f057fa-042d-4f56-872d-9d31a0444b82
c.g.small.rabbitmq.TopicRabbitReceiver   : dog-收到消息:路由模式測試-dog
c.g.small.rabbitmq.TopicRabbitReceiver   : cat-收到消息:路由模式測試-dog

到了這里,關(guān)于rabbitmq消息可靠性之消息回調(diào)機制的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ --- 消息可靠性

    RabbitMQ --- 消息可靠性

    消息隊列在使用過程中,面臨著很多實際問題需要思考: ?? ? 消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 co

    2024年02月14日
    瀏覽(29)
  • RabbitMQ消息的可靠性

    面試題: Rabbitmq怎么保證消息的可靠性? 1.消費端消息可靠性保證: 消息確認(Acknowledgements) : 消費者在接收到消息后,默認情況下RabbitMQ會自動確認消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費者在處理完消息后手動發(fā)送確認(basicAck)。如果消費

    2024年04月14日
    瀏覽(25)
  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(31)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場景 1.2 MQ導(dǎo)致消息丟失 1.3 消費者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機制 2.2 生產(chǎn)者確認機制 2.3 實現(xiàn)生產(chǎn)者確認 2.3.1 配置yml開啟生產(chǎn)者確認 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(25)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費者接收,會經(jīng)理多個過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時丟失: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 針對這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • RabbitMQ高級篇---消息可靠性

    RabbitMQ高級篇---消息可靠性

    1、消息可靠性: 消息從發(fā)送到消費者接受,會經(jīng)歷多個過程,每個消息傳遞的過程都可能導(dǎo)致消息的丟失: 常見的丟失原因: 發(fā)送時消息丟失原因: 生產(chǎn)者發(fā)送的消息未送達exchange 消息到達exchange后未到達queue MQ宕機,queue將消息丟失 consumer接收到消息后未消費就宕機 Rab

    2024年01月20日
    瀏覽(30)
  • RabbitMQ之消息的可靠性傳遞

    提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動添加 RabbitMQ之消息的可靠性傳遞 提示:寫完文章后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 提示:這里可以添加本文要記錄的大概內(nèi)容: 在當今的信息化時代,消息傳遞在企業(yè)級應(yīng)用和分布式

    2024年01月19日
    瀏覽(22)
  • RabbitMQ消息可靠性問題及解決

    RabbitMQ消息可靠性問題及解決

    說明:在RabbitMQ消息傳遞過程中,有以下問題: 消息沒發(fā)到交換機 消息沒發(fā)到隊列 MQ宕機,消息在隊列中丟失 消息者接收到消息后,未能正常消費(程序報錯),此時消息已在隊列中移除 針對以上問題,提供以下解決方案: 消息確認:確認消息是否發(fā)送到交換機、隊列;

    2024年02月16日
    瀏覽(25)
  • 如何保證 RabbitMQ 的消息可靠性?

    如何保證 RabbitMQ 的消息可靠性?

    項目開發(fā)中經(jīng)常會使用消息隊列來 完成異步處理、應(yīng)用解耦、流量控制等功能 。雖然消息隊列的出現(xiàn)解決了一些場景下的問題,但是同時也引出了一些問題,其中使用消息隊列時如何保證消息的可靠性就是一個常見的問題。 如果在項目中遇到需要保證消息一定被消費的場景

    2024年02月07日
    瀏覽(27)
  • 【RabbitMQ】之消息的可靠性方案

    【RabbitMQ】之消息的可靠性方案

    一、數(shù)據(jù)丟失場景 二、數(shù)據(jù)可靠性方案 1、生產(chǎn)者丟失消息解決方案 2、MQ 隊列丟失消息解決方案 3、消費者丟失消息解決方案 MQ 消息數(shù)據(jù)完整的鏈路為 :從 Producer 發(fā)送消息到 RabbitMQ 服務(wù)器中,再由 Broker 服務(wù)的 Exchange 根據(jù) Routing_Key 路由到指定的 Queue 隊列中,最后投送到消

    2024年02月14日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包