国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

這篇具有很好參考價值的文章主要介紹了springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

目錄

1.生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn)

2.交換機(jī)給隊列發(fā)消息時候的消息確認(rèn)

3.備用隊列

3.消費(fèi)者手動ack

?文章來源地址http://www.zghlxwxcb.cn/news/detail-484008.html

rabbitmq的發(fā)布確認(rèn)方式,可以有效的保證我們的數(shù)據(jù)不丟失。
?

消息正常發(fā)送的流程是:生產(chǎn)者發(fā)送消息到交換機(jī),然后交換機(jī)通過路由鍵把消息發(fā)送給對應(yīng)的隊列,然后消費(fèi)者監(jiān)聽隊列消費(fèi)消息

但是如果生產(chǎn)者發(fā)送的消息,交換機(jī)收不到呢,又或者交換機(jī)通過路由鍵給對應(yīng)的隊列發(fā)消息時,路由鍵不存在呢,這些就是消息發(fā)布確認(rèn)所要解決的問題

?

消息的發(fā)布確認(rèn)分別有:

  • 生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn)
  • 以及交換機(jī)發(fā)消息給隊列的消息確認(rèn)

先在application.properties配置文件中加上以下代碼:

# 確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
spring.rabbitmq.publisher-confirm-type= correlated
# 確認(rèn)消息已發(fā)送到隊列
spring.rabbitmq.publisher-returns= true

# 確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)
spring.rabbitmq.publisher-confirm-type= correlated

這個意思是開啟confirm模式,這樣的話,當(dāng)生產(chǎn)者發(fā)送消息的時候,無論交換機(jī)是否收到,都會觸發(fā)回調(diào)方法

1.生產(chǎn)者發(fā)消息到交換機(jī)時候的消息確認(rèn)

?寫一個容器:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;

// ConfirmCallback:消息只要發(fā)出,無論交換機(jī)有沒有接到消息,都會觸發(fā)ConfirmCallback類的confirm方法
// ConfirmCallback是有個內(nèi)部類

@Component
public class messageConfirm implements RabbitTemplate.ConfirmCallback{


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init()
    {
        rabbitTemplate.setConfirmCallback(this);
       
    }

    /**
     *
     * @param correlationData correlationData是發(fā)送消息時候攜帶的消息
     * @param ack 如果為true,表示交換機(jī)接收到消息了
     * @param message 異常消息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String message) {

        if (ack)
        {
            System.out.println("交換機(jī)收到消息成功:" + correlationData.getId());
        }else {
            System.out.println("交換機(jī)收到消息失?。? + correlationData.getId() + "原因:" + message);
        }

    }
   
}

RabbitTemplate.ConfirmCallback是一個內(nèi)部接口類,只要生產(chǎn)者往交換機(jī)發(fā)送消息,都會該觸發(fā)ConfirmCallback類的confirm方法

注意:
? ? ? ? 因為RabbitTemplate.ConfirmCallback是一個內(nèi)部類,所以我們要通過? ? @PostConstruct注解,把當(dāng)前類賦值給ConfirmCallback

配置類:

package com.example.rabbitmq.發(fā)布確認(rèn);


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class messageConfrimConfig {

    @Bean
    public DirectExchange getConfrimTopic()
    {
        // 創(chuàng)建一個直接交換機(jī)
        return ExchangeBuilder.directExchange("ljl-ConfrimTopic").build();
    }


    @Bean
    public Queue getConfrimQueue()
    {
        return new Queue("ljl-ConfrimQueue");
    }


    @Bean
    public Binding TopicConfrimBinding()
    {
        return BindingBuilder.bind(getConfrimQueue()).to(getConfrimTopic()).with("messageConfirm");
    }

}

消費(fèi)者:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class clientConfirm {

    @RabbitListener(queues = "ljl-ConfrimQueue")
    @RabbitHandler
    public void ConfrimQueue(Message message) {
        System.out.println("正常隊列正常接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

生產(chǎn)者:

@RestController
public class testConfirmController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendMessageConfirm")
    public String sendMessageConfirm()
    {
        HashMap<String, Object> mapExchange = new HashMap<>();
        mapExchange.put("message","測試交換機(jī)的發(fā)布確認(rèn)消息");


        // 關(guān)聯(lián)數(shù)據(jù)的一個類,交換機(jī)無論有沒有收到生產(chǎn)者發(fā)送的消息,都會返回這個對象
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        // 這個是正常發(fā)送的,交換機(jī)的名稱,跟路由鍵的名稱都是存在的
        rabbitTemplate.convertAndSend("ljl-ConfrimTopic","messageConfirm",JSONObject.toJSONString(mapExchange),correlationData);
        return "成功";
    }


}

直接運(yùn)行項目代碼:http://localhost:8080/sendMessageConfirm
springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

?可以看到消息正常發(fā)送,正常消費(fèi),然后交換機(jī)回調(diào)方法

?

當(dāng)交換機(jī)不存在的時候:
springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失
一樣會觸發(fā)回調(diào)方法,然后打印錯誤消息?

?

2.交換機(jī)給隊列發(fā)消息時候的消息確認(rèn)

????????寫一個容器,實現(xiàn) RabbitTemplate.ReturnCallback 接口,重寫 returnedMessage 方法,這個方法是當(dāng)交換機(jī)推送消息給隊列的時候,路由鍵不存在就觸發(fā)的方法
????????注意:
? ? ? ????????? 因為RabbitTemplate.ReturnCallback是一個內(nèi)部類,所以我們要通過? ? @PostConstruct注解,把當(dāng)前類賦值給ReturnCallback

寫一個容器類:

package com.example.rabbitmq.發(fā)布確認(rèn);


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;

// ConfirmCallback:消息只要發(fā)出,無論交換機(jī)有沒有接到消息,都會觸發(fā)ConfirmCallback類的confirm方法
// ConfirmCallback是個內(nèi)部類

// ReturnCallback是個內(nèi)部類
// ReturnCallback:但不可路由的時候,觸發(fā)回調(diào)方法
@Component
public class messageConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init()
    {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     *
     * @param correlationData correlationData是發(fā)送消息時候攜帶的消息
     * @param ack 如果為true,表示交換機(jī)接收到消息了
     * @param message 異常消息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String message) {

        if (ack)
        {
            System.out.println("交換機(jī)收到消息成功:" + correlationData.getId());
        }else {
            System.out.println("交換機(jī)收到消息失?。? + correlationData.getId() + "原因:" + message);
        }

    }

    // 當(dāng)routingkey不存在的時候,會觸發(fā)該方法
    /**
     *
     * @param message 消息主體
     * @param code 錯誤碼
     * @param text 錯誤消息
     * @param exchange 推送該消息的交換機(jī)
     * @param routingkey 推送消息時的routingkey
     */
    @Override
    public void returnedMessage(Message message, int code, String text, String exchange, String routingkey) {
        System.out.println("交換機(jī)推送消息到隊列失敗,推送的消息是:" + new String(message.getBody()) + "錯誤原因:" + text);
    }
}

生產(chǎn)者:

       // 這個是正常發(fā)送到交換機(jī)的,但是路由建的名稱不存在
        rabbitTemplate.convertAndSend("ljl-ConfrimTopic","messageConfirmAnomaly",JSONObject.toJSONString(mapExchange),correlationData);


springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

?運(yùn)行代碼看效果:
springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

?

3.備用隊列

????????當(dāng)消息不可路由的時候,mq會觸發(fā)returncallback接口的回調(diào)方法,把不可路由的消息回調(diào)回來,但是這有個問題,就是消息雖然回調(diào)過來了,但是并沒有消費(fèi)者去把不可路由的消息給消費(fèi)掉,所以這個時候就要加一個備用隊列和一個報警隊列,報警隊列的作用是用來通知管理員,有什么消息被回退了....然后備用隊列是把消息給保存起來,需要的時候就從備用隊列中取數(shù)據(jù)出來使用
? ? ? ? 注意:當(dāng)我們設(shè)置了備用隊列的時候,returncallback接口的回調(diào)方法將不會被觸發(fā),

但是當(dāng)消息不可路由,而且備用隊列也不能使用的時候,才會觸發(fā)returncallback接口的回調(diào)方法,也就是說,觸發(fā)回調(diào)方法在最終條件是消息無法被任何一個隊列接受,在mq丟棄前才會觸發(fā)回調(diào)方法

配置類(加入備用交換機(jī),備用隊列,報警隊列,然后使用的是扇形交換機(jī)):

alternate-exchange 參數(shù):設(shè)置備用交換機(jī),當(dāng)消息不可路由的時候就會把消息推送到該交換機(jī)上
@Configuration
public class messageConfrimConfig {

    @Bean
    public DirectExchange getConfrimTopic()
    {
        // 創(chuàng)建一個直接交換機(jī)
//        return ExchangeBuilder.directExchange("ljl-ConfrimTopic").build();

//       alternate-exchange 參數(shù):設(shè)置備用交換機(jī),當(dāng)消息不可路由的時候就會把消息推送到該交換機(jī)上
        return ExchangeBuilder.directExchange("ljl-ConfrimTopic").withArgument("alternate-exchange","ljl-standbyFanoutExchange").build();
    }


    @Bean
    public Queue getConfrimQueue()
    {
        return new Queue("ljl-ConfrimQueue");
    }


    @Bean
    public Binding TopicConfrimBinding()
    {
        return BindingBuilder.bind(getConfrimQueue()).to(getConfrimTopic()).with("messageConfirm");
    }


    // 備用交換機(jī),備用隊列,報警隊列
    @Bean
    public FanoutExchange standbyFanoutExchange()
    {
        // 備用交換機(jī)
        return new FanoutExchange("ljl-standbyFanoutExchange");
    }

    @Bean
    public Queue getstandbyQueue()
    {
        // 備用隊列
        return new Queue("ljl-standbyQueue");
    }

    @Bean
    public Queue getalarmQueue()
    {
        // 報警隊列
        return new Queue("ljl-alarmQueue");
    }

    // 設(shè)置備用隊列和備用交換機(jī)的綁定關(guān)系
    @Bean
    public Binding standbyExchagneBinding()
    {
        return BindingBuilder.bind(getstandbyQueue()).to(standbyFanoutExchange());
    }

    // 設(shè)置報警隊列和備用交換機(jī)的綁定關(guān)系
    @Bean
    public Binding alarmExchagneBinding()
    {
        return BindingBuilder.bind(getalarmQueue()).to(standbyFanoutExchange());
    }

}

在消費(fèi)者上:
?

@Component
public class clientConfirm {

    @RabbitListener(queues = "ljl-ConfrimQueue")
    @RabbitHandler
    public void ConfrimQueue(Message message) {
        System.out.println("正常隊列正常接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queues = "ljl-alarmQueue")
    @RabbitHandler
    public void alarmQueue(Message message) {
        System.out.println("報警隊列接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

執(zhí)行代碼看效果(此時的生產(chǎn)者發(fā)送給mq的路由鍵還是不存在的):

springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

?這個時候會發(fā)現(xiàn)我們設(shè)置的備用交換機(jī)沒有起到效果,這是因為我們在修改參數(shù)的時候
springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

在mq中并沒有起到效果,在是因為原本‘ljl-ConfrimTopic' 交換機(jī)已經(jīng)存在,寫的參數(shù)并不會覆蓋之前的,我們需要把這個交換機(jī)給刪掉,然后再執(zhí)行一起看下效果:
springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

?

報警交換機(jī)的作用生效了,不可路由的時候不會觸發(fā)?returncallback接口的回調(diào)方

?

3.消費(fèi)者手動ack

在配置文件中加入:

#開啟手動ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#設(shè)置消費(fèi)者每一次拉取的條數(shù)
spring.rabbitmq.listener.simple.prefetch= 5

消費(fèi)者:
springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失

?在消費(fèi)者的方法上,加上這個類(Channel channel),然后這個類有幾個方法:

1.消費(fèi)者正常消費(fèi)完成該消息,手動返回ack,然后隊列把消息移除掉:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
參數(shù)1:message.getMessageProperties().getDeliveryTag()表示的是這條消息在隊列中的一個標(biāo)志,刪除的時候也是根據(jù)這個標(biāo)志來進(jìn)行刪除
參數(shù)2:是否要批量確認(rèn),這個意思是:是否把小于等于message.getMessageProperties().getDeliveryTag()值的消息批量確認(rèn)

2.消費(fèi)者在消費(fèi)消息的過程中,出現(xiàn)了異常,那么就可以使用channel.basicNack方法

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
參數(shù)1:標(biāo)志
參數(shù)2:是否批量.true:將一次性拒絕所有小于deliveryTag的消息。
參數(shù)3:是否重新進(jìn)入隊列?

出現(xiàn)異常的時候,可以的用參數(shù)3來指定該條消息是否重新入隊,然后參數(shù)2來控制這個操作是否批量操作

對于手動ack以及消息阻塞的一些總結(jié):
? ? ? ? 假設(shè)生產(chǎn)者發(fā)送了一百條消息
????????現(xiàn)在只有一個消費(fèi)者,然后設(shè)置消費(fèi)者每一次拉取10條消息來消費(fèi)(默認(rèn)好像200多條),這個時候的正常流程就是消費(fèi)者拉取一批消息,然后正常消費(fèi),通過返回ack,接著拉取消息來進(jìn)行下一批消費(fèi),假如出現(xiàn)異常那就需要使用basicNack方法來判斷是否要重新入隊,但是異常消息入隊后,被消費(fèi)者重新消費(fèi),還是會出現(xiàn)異常,這個時候就會一直循環(huán),造成消息堆積
????????兩個消費(fèi)者:假設(shè)其中一個消費(fèi)者A可以正常消費(fèi)消息并正常返回ack,而另外一個消費(fèi)者B會中會出現(xiàn)異常,使用basicNack方法讓消息重新入隊,然后重新入隊的消息有可能會被消費(fèi)者A獲取,然后正常消費(fèi)并正常手動返回ack

? ? ? ? 面試題:如何rabbitmq確保消息不丟失/消息的可靠性
? ? ? ? 在生產(chǎn)者生成消息的時候,去開啟confirm模式,寫一個容器類去實行confirmcallback接口,這樣交換機(jī)是否成功收到消息都會觸發(fā)回調(diào)方法,然后在聲明交換機(jī),聲明隊列,以及發(fā)送消息的時候,做持久化處理,然后開啟消息回退模式,寫一個容器類去實現(xiàn)returncallback接口,這樣當(dāng)交換機(jī)推送消息給隊列時,如果失敗會觸發(fā)回調(diào)方法,在消費(fèi)者這邊,開啟手動ack模式,確保消息正常執(zhí)行完畢,然后還可以去配置備用隊列跟死信隊列,這樣就可以基本上確保mq的消息不會丟失了

? ? ? ?

? ? ? ? 以上就是總體的解答思路,大家用自己的話來總結(jié)就行

?

到了這里,關(guān)于springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動返回ack,設(shè)置備用隊列,以及面試題:rabbitmq確保消息不丟失的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包