国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制

這篇具有很好參考價(jià)值的文章主要介紹了8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。


在 RabbitMQ之生產(chǎn)者發(fā)布確認(rèn)原理章節(jié)已經(jīng)介紹了rabbitmq生產(chǎn)者是如何對(duì)消息進(jìn)行發(fā)布確認(rèn)保證消息不丟失的。本章節(jié)繼續(xù)看下springboot整合rabbitmq后是如何保證消息不丟失的。

1. 消息需要發(fā)布確認(rèn)的原因

消息正常是通過(guò)生產(chǎn)者生產(chǎn)消息傳遞到交換機(jī),然后經(jīng)過(guò)交換機(jī)路由到消息隊(duì)列中,最后消費(fèi)者消費(fèi),如下圖所示
8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制
上述為正常情況,但也有情況會(huì)導(dǎo)致消息丟失的情況:第一種情況,交換機(jī)重啟,當(dāng)消息經(jīng)過(guò)消費(fèi)者投遞后,恰巧交換機(jī)正在重啟,會(huì)導(dǎo)致生產(chǎn)者投遞消息失敗,從而導(dǎo)致消息丟失;第二種情況,交換機(jī)沒(méi)問(wèn)題,消息投遞到交換機(jī)后,交換機(jī)路由到消息隊(duì)列過(guò)程中出了問(wèn)題,比如routingKey錯(cuò)誤導(dǎo)致路由不到隊(duì)列中。
針對(duì)上述兩種導(dǎo)致消息丟失的情況,下面采用消息確認(rèn)發(fā)布機(jī)制,分別采取消息正確投遞到交換機(jī)后回調(diào)接口來(lái)確認(rèn)消息正確被投遞,消息經(jīng)交換機(jī)正確路由到隊(duì)列中回調(diào)接口來(lái)確認(rèn)消息正確被路由。

2. 消息發(fā)送交換機(jī)后回調(diào)接口ConfirmCallback ,保證消息在發(fā)送交換機(jī)處不丟失

當(dāng)消息經(jīng)生產(chǎn)者投遞到交換機(jī)后,為避免消息丟失,需要回調(diào)RabbitTemplate.ConfirmCallback接口,回調(diào)接口后,尤其是要對(duì)投遞失敗的消息進(jìn)行處理或者記錄下來(lái)保證消息不丟失。該接口不管消息投遞到交換機(jī)成功或者失敗都會(huì)進(jìn)行回調(diào),未避免消息丟失,可以選擇在回調(diào)接口中只處理或者登記投遞失敗的消息,達(dá)到消息不丟失的目的。

下面通過(guò)案例演示生產(chǎn)者投遞消息到交換機(jī)后回調(diào)ConfirmCallback接口情況。

8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制
`

2.1 在application.properties中添加spring.rabbitmq.publisher-confirm-type=correlated配置,開(kāi)啟回調(diào)機(jī)制

spring.rabbitmq.host=192.168.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

spring.rabbitmq.publisher-confirm-type=correlated

2.2 聲明交換機(jī)和隊(duì)列并進(jìn)行綁定
聲明confirm_exchange交換機(jī),聲明confirm_queue隊(duì)列,并通過(guò)routingKey=confirm綁定交換機(jī)和隊(duì)列。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    public static final String ROUTING_KEY = "confirm";

    /*聲明交換機(jī)*/
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    /*聲明隊(duì)列*/
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /*綁定交換機(jī)和隊(duì)列*/
    @Bean
    public Binding exchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }
}

2.3 實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口
當(dāng)消息由生產(chǎn)者發(fā)到交換機(jī)后會(huì)回調(diào)該接口中的confirm方法

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback {
	/* correlationData 內(nèi)含消息內(nèi)容
	 * ack 交換機(jī)接受成功或者失敗。 true表示交換機(jī)接受消息成功, false表示交換機(jī)接受失敗
	 * cause 表示失敗原因
	*/
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("hello world");
        String id = correlationData.getId();
        String message = new String(correlationData.getReturnedMessage().getBody());
        if (ack){
            log.info("交換機(jī)收到消息id為{}, 消息內(nèi)容為{}", id, message);
        }else {
            log.info("交換機(jī)未收到消息id為{}, 消息內(nèi)容為{}, 原因?yàn)閧}", id, message, cause);
        }
    }
}

2.4 創(chuàng)建生產(chǎn)者
創(chuàng)建生產(chǎn)者生產(chǎn)消息,本案例中生產(chǎn)者發(fā)送了2個(gè)消息,分別為hello rabbitmq 1hello rabbitmq 2。
在創(chuàng)建生產(chǎn)者時(shí)要設(shè)置一下需要回調(diào)的接口ExchangeCallback ,在設(shè)置回調(diào)接口時(shí)用了java的@PostConstruct注解,該注解作用用來(lái)指定bean初始化的順序,Constructor(構(gòu)造方法) -> @Autowired(依賴(lài)注入) -> @PostConstruct(注釋的方法)。也就是說(shuō)在初始化Producer3對(duì)象時(shí),先調(diào)了Producer3的默認(rèn)無(wú)參構(gòu)造函數(shù);然后執(zhí)行Autowired注解部分,從Spring IOC容器中尋找rabbitTemplate和exchangeCallback對(duì)象分別注入到Producer3的rabbitTemplate和exchangeCallback屬性中;最后執(zhí)行PostConstruct注解部分,把exchangeCallback設(shè)置到rabbitTemplate中。

在發(fā)送hello rabbitmq 2消息時(shí)故意把routingKey寫(xiě)錯(cuò)導(dǎo)致hello rabbitmq 2消息不能從交換機(jī)發(fā)送到隊(duì)列中,為下一節(jié)做鋪墊。

import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class Producer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ExchangeCallback exchangeCallback;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(exchangeCallback);
    }

    public void produceMessage(){
        String message = "hello rabbitmq 1";
        CorrelationData correlationData1 = new CorrelationData("1");
        correlationData1.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
        message = "hello rabbitmq 2";
        CorrelationData correlationData2 = new CorrelationData("2");
        correlationData2.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
    }
}

2.5 啟動(dòng)測(cè)試

@SpringBootApplication
@Slf4j
public class SpringbootDemo {
    public static void main(String[] args) {
        ConfigurableApplicationContext app = SpringApplication.run(SpringbootDemo.class, args);
        Producer3 producer = app.getBean(Producer3.class);
        producer.produceMessage();
    }
}

啟動(dòng)上面測(cè)試案例,生產(chǎn)者發(fā)送了id為1的hello rabbitmq 1消息和id為2的hello rabbitmq 2,交換機(jī)收到消息也回調(diào)了ExchangeCallback 接口對(duì)2條消息都進(jìn)行了確認(rèn),尤其是對(duì)于失敗的消息要在此步保存失敗的消息,避免消息在交換機(jī)這一步丟失。

2022-08-04 00:43:28.817  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:43:28.823  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-04 00:43:28.827  INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:43:28.828  INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2

但上面還有一個(gè)問(wèn)題,雖然回調(diào)ExchangeCallback接口,可以保證消息到交換機(jī)一步不會(huì)丟失,但如果交換機(jī)到隊(duì)列的過(guò)程中出現(xiàn)了問(wèn)題,消息一樣會(huì)丟失。比如上面生產(chǎn)者把routingKey寫(xiě)錯(cuò)了,就會(huì)導(dǎo)致hello rabbitmq 2消息從交換機(jī)路由不到隊(duì)列中。下面創(chuàng)建消費(fèi)者程序,看消費(fèi)者消費(fèi)confirm_queue隊(duì)列中消息情況

import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class Consumer3 {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void consuemrMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消費(fèi)者消費(fèi)消息{}", msg);
    }
}

然后重新啟動(dòng)測(cè)試代碼,輸出如下,可以看出消費(fèi)端只消費(fèi)了hello rabbitmq 1,而hello rabbitmq 2消息則丟失了。

2022-08-04 00:45:28.817  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:45:28.823  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-04 00:45:28.827  INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-04 00:45:28.828  INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-04 00:45:28.831  INFO 13512 --- [ntContainer#2-1] com.lzj.consumer.Consumer3   			   : 消費(fèi)者消費(fèi)消息hello rabbitmq 1

3. 消息經(jīng)交換機(jī)路由到隊(duì)列后回調(diào)接口ReturnCallback ,保證消息在發(fā)送隊(duì)列處不丟失

為解決上一節(jié)中,消息由交換機(jī)和消息隊(duì)列中異常,導(dǎo)致消息丟失問(wèn)題,解決辦法就是在添加消息從交換機(jī)路由到隊(duì)列中失敗后回調(diào)的接口,在回調(diào)接口中把失敗的消息保存下來(lái)就可以避免消息丟失了。

在回調(diào)接口之前還需為RabbitMQ設(shè)置Mandatory標(biāo)志,只有當(dāng)該標(biāo)志為true時(shí),消息由交換機(jī)到隊(duì)列失敗后才會(huì)回調(diào)接口;如果該標(biāo)志設(shè)置false時(shí),消息由交換機(jī)路由到隊(duì)列失敗后自動(dòng)丟棄消息,會(huì)導(dǎo)致消息丟失,這也是默認(rèn)設(shè)置,所以如需保證消息不丟失,要打開(kāi)Mandatory標(biāo)志。

下面繼續(xù)進(jìn)行上面案例,在上面案例的基礎(chǔ)上添加ReturnCallback接口實(shí)現(xiàn)

package com.lzj.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息 {} 經(jīng)交換機(jī) {} 通過(guò)routingKey={} 路由到隊(duì)列失敗,失敗code為:{}, 失敗原因?yàn)椋簕}",
                new String(message.getBody()), exchange, routingKey, replyCode, replyText);
    }
}

然后修改上面的生產(chǎn)者Producer3 ,只需修改2點(diǎn):第一點(diǎn),為RabbitTemplate 設(shè)置Mandatory標(biāo)志;第二點(diǎn),把ReturnCallback的實(shí)現(xiàn)加入監(jiān)聽(tīng)。

import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import com.lzj.config.QueueCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class Producer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ExchangeCallback exchangeCallback;

    @Autowired
    private QueueCallback queueCallback;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(exchangeCallback);
        /**
		* true:交換機(jī)無(wú)法將消息進(jìn)行路由時(shí),會(huì)將該消息返回給生產(chǎn)者
		* false:如果發(fā)現(xiàn)消息無(wú)法進(jìn)行路由,則直接丟棄
		*/
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(queueCallback);
    }

    public void produceMessage(){
        String message = "hello rabbitmq 1";
        CorrelationData correlationData1 = new CorrelationData("1");
        correlationData1.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
        log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData1.getId(), message);
        message = "hello rabbitmq 2";
        CorrelationData correlationData2 = new CorrelationData("2");
        correlationData2.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
        log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData2.getId(), message);
    }
}

下面重新啟動(dòng)上面的測(cè)試案例,可以得出如下測(cè)試結(jié)果,生產(chǎn)者生產(chǎn)的hello rabbitmq 1hello rabbitmq 2消息都已發(fā)到交換機(jī),也都被交換機(jī)確認(rèn)了,但hello rabbitmq 2被交換機(jī)路由到隊(duì)列時(shí)由于routingKey錯(cuò)誤導(dǎo)致路由失敗,已在ReturnCallback接口回調(diào)中被記錄下來(lái),最終正確被路由到隊(duì)列中的消息只有hello rabbitmq 1,從打印日志看也只有hello rabbitmq 1被消費(fèi)了。

2022-08-05 00:39:29.670  INFO 6832 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-05 00:39:29.683  INFO 6832 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-05 00:39:29.702  INFO 6832 --- [nectionFactory3] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-05 00:39:29.704  INFO 6832 --- [nectionFactory1] com.lzj.config.QueueCallback             : 消息 hello rabbitmq 2 經(jīng)交換機(jī) confirm_exchange 通過(guò)routingKey=error_routingkey 路由到隊(duì)列失敗,失敗code為:312, 失敗原因?yàn)椋篘O_ROUTE
2022-08-05 00:39:29.705  INFO 6832 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-05 00:39:29.707  INFO 6832 --- [ntContainer#2-1] com.lzj.consumer.Consumer3               : 消費(fèi)者消費(fèi)消息hello rabbitmq 1

結(jié)論:從上面2個(gè)案例可以看出,通過(guò)ConfirmCallback和ReturnCallback接口的回調(diào),保證了消息在交換機(jī)和隊(duì)列處消息不丟失。

4. 備份交換機(jī)

備份交換機(jī)目的是解決消息交換機(jī)路由到隊(duì)列中失敗時(shí)保證消息不丟失的一種方法,與上面第3節(jié)中回調(diào)ReturnCallback達(dá)到的目的一致,但不管是回調(diào)ConfirmCallback還是回調(diào)ReturnCallback都增加了生產(chǎn)者的負(fù)擔(dān),當(dāng)消息異常時(shí)會(huì)增加生產(chǎn)者的處理時(shí)間,導(dǎo)致生產(chǎn)端生產(chǎn)消息的吞吐量下降。另外對(duì)異常的消息也不可以用死信隊(duì)列(死信隊(duì)列詳解參考此篇文章),因?yàn)橄⒃谕哆f到交換機(jī)和隊(duì)列過(guò)程中異常時(shí)無(wú)法被投遞到死信交換機(jī),因?yàn)椴粷M足死信隊(duì)列的3個(gè)條件(參見(jiàn)死信隊(duì)列篇章)。因此備份交換機(jī)就是一種處理交換機(jī)路由隊(duì)列過(guò)程中失敗時(shí)保證消息不丟失的不錯(cuò)選擇。

以下圖為例,消息正常是有Producer生產(chǎn)者到confirm_exchange交換機(jī),再由confirm_exchange交換機(jī)路由到confirm_queue隊(duì)列中,最后被confirm_consumer消費(fèi)掉。如果消息從Producer到confirm_exchange過(guò)程中出現(xiàn)了異常會(huì)回調(diào)ConfirmCallback接口保證消息不丟失。如果消息在confirm_exchange交換機(jī)到confirm_queue出現(xiàn)了異常,在前面案例中是回調(diào)ReturnCallback接口保證消息不丟失的,而此處是通過(guò)備用交換機(jī)實(shí)現(xiàn),異常消息會(huì)被投遞到back_exchange交換機(jī)中,該交換機(jī)可以是fanout類(lèi)型交換機(jī),可以把消息分別路由到backup_queue隊(duì)列和warn_queue隊(duì)列中,backup_queue隊(duì)列中消息由backup_consumer消費(fèi)者消費(fèi),warn_queue隊(duì)列中消息由warn_consumer消費(fèi),其中backup_consumer用于備份失敗的消息,比如失敗的消息存儲(chǔ)到數(shù)據(jù)庫(kù)中,warn_consumer用于告警運(yùn)維或開(kāi)發(fā)人員有失敗的消息。
8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制
1. 首先修改上述案例中ConfirmConfig類(lèi),用于聲明confirm_exchange、backup_exchange交換機(jī),聲明confirm_queue、backup_queue、warn_queue隊(duì)列,并對(duì)交換機(jī)和隊(duì)列進(jìn)行綁定。

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    public static final String WARN_QUEUE_NAME = "warn_queue";

    /*聲明確認(rèn)交換機(jī)*/
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                              .durable(true)
                              .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)     //設(shè)置備份交換機(jī)
                              .build();
    }

    /*聲明確認(rèn)隊(duì)列*/
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /*綁定確認(rèn)交換機(jī)和確認(rèn)隊(duì)列*/
    @Bean
    public Binding confirmExchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                               @Qualifier("confirmQueue") Queue confirmQueue){
       return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm");
    }

    /*聲明備份交換機(jī)*/
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    /*聲明備份隊(duì)列*/
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    /*綁定備份交換機(jī)和備份隊(duì)列*/
    @Bean
    public Binding backupExchangeBindingBackupQueue(@Qualifier("backupExchange")FanoutExchange backupExchange,
                                                    @Qualifier("backupQueue")Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    /*聲明警告隊(duì)列*/
    @Bean("warnQueue")
    public Queue warnQueue(){
        return QueueBuilder.durable(WARN_QUEUE_NAME).build();
    }

    /*綁定備份交換機(jī)和警告隊(duì)列*/
    @Bean
    public Binding backupExchangeBindingWarnQueue(@Qualifier("backupExchange")FanoutExchange backupExchange,
                                                  @Qualifier("warnQueue")Queue warnQueue){
        return BindingBuilder.bind(warnQueue).to(backupExchange);
    }
}

2. 創(chuàng)建ConfirmConsumer消費(fèi)者用于消費(fèi)confirm_queue中的消息

import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void consuemrMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消費(fèi)者消費(fèi)消息{}", msg);
    }
}

3. 創(chuàng)建BackupConsumer消費(fèi)者用于消費(fèi)backup_queue中的消息

import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class BackupConsumer {

    @RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE_NAME)
    public void backupMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消息{} 備份到數(shù)據(jù)庫(kù)", msg);
    }
}
  1. 創(chuàng)建WarnConsumer消費(fèi)者用于消費(fèi)warn_queue中消息
import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WarnConsumer {

    @RabbitListener(queues = ConfirmConfig.WARN_QUEUE_NAME)
    public void warnMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消息{} 消費(fèi)失敗,請(qǐng)盡快處理", msg);
    }
}

5. 修改上述案例中的生產(chǎn)者Producer3中發(fā)消息時(shí)的routingKey為confirm,修改后代碼如下

import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import com.lzj.config.QueueCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class Producer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ExchangeCallback exchangeCallback;

    @Autowired
    private QueueCallback queueCallback;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(exchangeCallback);
        /**
         * true:
         * 交換機(jī)無(wú)法將消息進(jìn)行路由時(shí),會(huì)將該消息返回給生產(chǎn)者
         * false:
         * 如果發(fā)現(xiàn)消息無(wú)法進(jìn)行路由,則直接丟棄
         */
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(queueCallback);
    }

    public void produceMessage(){
        String message = "hello rabbitmq 1";
        CorrelationData correlationData1 = new CorrelationData("1");
        correlationData1.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "confirm", message, correlationData1);
        log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData1.getId(), message);
        message = "hello rabbitmq 2";
        CorrelationData correlationData2 = new CorrelationData("2");
        correlationData2.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
        log.info("生產(chǎn)者發(fā)送id為{}, 消息內(nèi)容為{}", correlationData2.getId(), message);
    }
}

然后重新啟動(dòng)測(cè)試程序進(jìn)行測(cè)試,輸出代碼如下所示,生產(chǎn)者生產(chǎn)了hello rabbitmq 1hello rabbitmq 2消息,并且2條消息偶成功發(fā)到了confirm_exchange交換機(jī)中,由于hello rabbitmq 2的routingKey寫(xiě)的有問(wèn)題,導(dǎo)致hello rabbit 1消息被confirm_consumer正常消費(fèi),而hello rabbit 2則被投遞到了backup_exchange備份交換機(jī)中,然后改消息被backup_consumer消費(fèi)者保存,并同時(shí)被warn_consumer發(fā)出警告。

2022-08-18 00:56:29.549  INFO 7088 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-18 00:56:29.559  INFO 7088 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為1, 消息內(nèi)容為hello rabbitmq 1
2022-08-18 00:56:29.563  INFO 7088 --- [           main] com.lzj.producer.Producer3               : 生產(chǎn)者發(fā)送id為2, 消息內(nèi)容為hello rabbitmq 2
2022-08-18 00:56:29.578  INFO 7088 --- [ntContainer#0-1] com.lzj.consumer.BackupConsumer          : 消息hello rabbitmq 2 備份到數(shù)據(jù)庫(kù)
2022-08-18 00:56:29.578  INFO 7088 --- [ntContainer#1-1] com.lzj.consumer.ConfirmConsumer         : 消費(fèi)者消費(fèi)消息hello rabbitmq 1
2022-08-18 00:56:29.580  INFO 7088 --- [ntContainer#4-1] com.lzj.consumer.WarnConsumer            : 消息hello rabbitmq 2 消費(fèi)失敗,請(qǐng)盡快處理
2022-08-18 00:56:29.584  INFO 7088 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交換機(jī)收到消息id為2, 消息內(nèi)容為hello rabbitmq 2

可見(jiàn),備份隊(duì)列f方式保證了消息不丟失。如果同時(shí)設(shè)置了備份隊(duì)列以及第二個(gè)案例中的ReturnCallback回調(diào)接口,失敗消息會(huì)先進(jìn)備份隊(duì)列,因?yàn)閭浞蓐?duì)列優(yōu)先級(jí)高于回調(diào)RetrunCallback接口。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-424314.html

到了這里,關(guān)于8. springboot + rabbitmq 消息發(fā)布確認(rèn)機(jī)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • rabbitmq消息確認(rèn)機(jī)制

    rabbitmq消息確認(rèn)機(jī)制

    (1) publish === broker 只要broker收到消息,就會(huì)執(zhí)行 confirmCallback (2) exchange === queue 如果exchange有消息沒(méi)有成功發(fā)送至queue,就會(huì)執(zhí)行RuturnCallback,例:routing key錯(cuò)誤導(dǎo)致發(fā)送消息到隊(duì)列失敗 (3)RabbitmqConfig (1) queue === consumer 默認(rèn)是ack,consumer只要拿到消息就會(huì)自動(dòng)確認(rèn),服務(wù)端

    2024年02月13日
    瀏覽(20)
  • RabbitMQ 消息確認(rèn)機(jī)制

    為了保證消息從隊(duì)列可靠的到達(dá)消費(fèi)者,RabbitMQ 提供了消息確認(rèn)機(jī)制(Message Acknowledgement)。消費(fèi)者在訂閱隊(duì)列時(shí),可以指定 autoAck 參數(shù),當(dāng) autoAck 參數(shù)等于 false 時(shí),RabbitMQ 會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號(hào)后才從內(nèi)存(或者磁盤(pán))中移除消息(實(shí)際上是先打上刪除標(biāo)記,之

    2024年02月15日
    瀏覽(23)
  • RabbitMq 消息確認(rèn)機(jī)制詳解

    RabbitMq 消息確認(rèn)機(jī)制詳解

    目錄 1.消息可靠性 1.1.生產(chǎn)者消息確認(rèn) 1.1.1.修改配置 1.1.2.定義Return回調(diào) 1.1.3.定義ConfirmCallback 1.2.消息持久化 1.2.1.交換機(jī)持久化 1.2.2.隊(duì)列持久化 1.2.3.消息持久化 1.3.消費(fèi)者消息確認(rèn) 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消費(fèi)失敗重試機(jī)制 1.4.1.本地重試 1.4.2.失敗策略 1.5.總結(jié)

    2024年01月21日
    瀏覽(17)
  • RabbitMQ 發(fā)布確認(rèn)機(jī)制

    RabbitMQ 發(fā)布確認(rèn)機(jī)制

    發(fā)布確認(rèn)模式是避免消息由生產(chǎn)者到RabbitMQ消息丟失的一種手段 ??生產(chǎn)者通過(guò)調(diào)用channel.confirmSelect方法將信道設(shè)置為confirm模式,之后RabbitMQ會(huì)返回Confirm.Select-OK命令表示同意生產(chǎn)者將當(dāng)前信道設(shè)置為confirm模式。 ??confirm模式下的信道所發(fā)送的消息都將被應(yīng)帶ack或者nack一次

    2024年02月13日
    瀏覽(21)
  • RabbitMQ--基礎(chǔ)--8.1--消息確認(rèn)機(jī)制--接受確認(rèn)機(jī)制(ACK)

    RabbitMQ--基礎(chǔ)--8.1--消息確認(rèn)機(jī)制--接受確認(rèn)機(jī)制(ACK)

    代碼位置 消費(fèi)者收到Queue中的消息,但沒(méi)有處理完成就宕機(jī)的情況,這種情況下就可能會(huì)導(dǎo)致消息丟失。 為了避免這種情況發(fā)生,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknowledgment)后才將該消息從Queue中移除。 如果RabbitMQ沒(méi)有收

    2024年02月10日
    瀏覽(18)
  • Rabbitmq入門(mén)與應(yīng)用(六)-rabbitmq的消息確認(rèn)機(jī)制

    Rabbitmq入門(mén)與應(yīng)用(六)-rabbitmq的消息確認(rèn)機(jī)制

    確認(rèn)消息是否發(fā)送給交換機(jī) 配置 編碼RabbitTemplate.ConfirmCallback ConfirmCallback 是一個(gè)回調(diào)接口,消息發(fā)送到 Broker 后觸發(fā)回調(diào),確認(rèn)消息是否到達(dá) Broker 服務(wù)器, 也就是只確認(rèn)是否正確到達(dá) Exchange 中。 在配置類(lèi)中編碼確認(rèn)回調(diào)函數(shù)。tips: 設(shè)置 rabbitTemplate.setMandatory(true); 配置類(lèi)

    2024年02月20日
    瀏覽(15)
  • RabbitMQ的幾種消息確認(rèn)機(jī)制詳細(xì)介紹

    RabbitMQ的幾種消息確認(rèn)機(jī)制詳細(xì)介紹

    前言:大家好,我是小威,24屆畢業(yè)生,在一家滿意的公司實(shí)習(xí)。本篇文章將詳細(xì)介紹RabbitMQ的幾種消息確認(rèn)機(jī)制。 如果文章有什么需要改進(jìn)的地方還請(qǐng)大佬不吝賜教 ????。 小威在此先感謝各位大佬啦~~???? ??個(gè)人主頁(yè):小威要向諸佬學(xué)習(xí)呀 ??個(gè)人簡(jiǎn)介:大家好,我是

    2023年04月25日
    瀏覽(21)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生產(chǎn)者和消費(fèi)者消息的確認(rèn),消息的持久化以及消費(fèi)失敗的重試機(jī)制_rabbitmq 生產(chǎn)者消息確認(rèn)

    【RabbitMQ】RabbitMQ 消息的可靠性 —— 生產(chǎn)者和消費(fèi)者消息的確認(rèn),消息的持久化以及消費(fèi)失敗的重試機(jī)制_rabbitmq 生產(chǎn)者消息確認(rèn)

    先自我介紹一下,小編浙江大學(xué)畢業(yè),去過(guò)華為、字節(jié)跳動(dòng)等大廠,目前阿里P7 深知大多數(shù)程序員,想要提升技能,往往是自己摸索成長(zhǎng),但自己不成體系的自學(xué)效果低效又漫長(zhǎng),而且極易碰到天花板技術(shù)停滯不前! 因此收集整理了一份《2024年最新大數(shù)據(jù)全套學(xué)習(xí)資料》,

    2024年04月26日
    瀏覽(66)
  • RabbitMQ:第一章:6 種工作模式以及消息確認(rèn)機(jī)制

    RabbitMQ:第一章:6 種工作模式以及消息確認(rèn)機(jī)制

    } System.out.println(“發(fā)送數(shù)據(jù)成功”); channel.close(); connection.close(); } } 消費(fèi)者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消費(fèi)者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    瀏覽(18)
  • RabbitMQ消息可靠性投遞與ACK確認(rèn)機(jī)制

    RabbitMQ消息可靠性投遞與ACK確認(rèn)機(jī)制

    什么是消息的可靠性投遞 保證消息百分百發(fā)送到消息隊(duì)列中去 保證MQ節(jié)點(diǎn)成功接收消息 消息發(fā)送端需要接收到MQ服務(wù)端接收到消息的確認(rèn)應(yīng)答 完善的消息補(bǔ)償機(jī)制,發(fā)送失敗的消息可以再感知并二次處理 RabbitMQ消息投遞路徑 生產(chǎn)者–交換機(jī)–隊(duì)列–消費(fèi)者 通過(guò)兩個(gè)節(jié)點(diǎn)控制

    2024年02月20日
    瀏覽(28)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包