国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

高級篇-rabbitmq的高級特性

這篇具有很好參考價值的文章主要介紹了高級篇-rabbitmq的高級特性。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

setreturncallback,rabbitmq,mq,rabbitmq

?setreturncallback,rabbitmq,mq,rabbitmq

?setreturncallback,rabbitmq,mq,rabbitmq

1.消息可靠性

三種丟失的情形:

setreturncallback,rabbitmq,mq,rabbitmq

1.1? 生產(chǎn)者確認(rèn)機(jī)制?

setreturncallback,rabbitmq,mq,rabbitmq

?啟動MQ

setreturncallback,rabbitmq,mq,rabbitmq

創(chuàng)建Queues:?

setreturncallback,rabbitmq,mq,rabbitmq兩種Callback:

1.ReturnCallback:全局callback?setreturncallback,rabbitmq,mq,rabbitmq

?2.ComfirmCallback:?發(fā)送信息時候設(shè)置

setreturncallback,rabbitmq,mq,rabbitmq?setreturncallback,rabbitmq,mq,rabbitmq

 @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.準(zhǔn)備消息
        String message = "hello, spring amqp!";
        // 2.準(zhǔn)備CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.準(zhǔn)備ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判斷結(jié)果
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投遞到交換機(jī)!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投遞到交換機(jī)失??!消息ID:{}", correlationData.getId());
                // 重發(fā)消息
            }
        }, ex -> {
            // 記錄日志
            log.error("消息發(fā)送失??!", ex);
            // 重發(fā)消息
        });
        // 3.發(fā)送消息
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }

?執(zhí)行成功:

setreturncallback,rabbitmq,mq,rabbitmq

?監(jiān)控頁面:

setreturncallback,rabbitmq,mq,rabbitmq

模擬失?。?/p>

?1.投遞到交互機(jī)失敗

setreturncallback,rabbitmq,mq,rabbitmq

2.投遞到交換機(jī)了,但是沒有進(jìn)入隊列?

setreturncallback,rabbitmq,mq,rabbitmq?setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

1.2 消息持久化?

注意: 生產(chǎn)者確認(rèn)只能保證數(shù)據(jù)放到隊列當(dāng)中,但是無法保證數(shù)據(jù)不丟失(比如所在的機(jī)器宕機(jī)了),
所以還需要保證數(shù)據(jù)的持久化

setreturncallback,rabbitmq,mq,rabbitmq

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct");
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }
}
@Test
public void testDurableMessage() {
   // 1.準(zhǔn)備消息 消息持久化
   Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
   // 2.發(fā)送消息
   rabbitTemplate.convertAndSend("simple.queue", message);
}

?注意:

    //交換機(jī)不傳值默認(rèn)就是持久化  
    //交換機(jī)、隊列、消息默認(rèn)都是持久化   
    @Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct");
    }

    public AbstractExchange(String name) {
        this(name, true, false);
    }

? 演示數(shù)據(jù)是否默認(rèn)持久化:?setreturncallback,rabbitmq,mq,rabbitmq

? ? ?重啟mq:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq?1. 交互機(jī)、隊列、消息都做持久化

? 2.消費者端關(guān)閉防止被消費

? 3.重啟mq后看隊列中數(shù)據(jù)是否還在(是否持久化)

setreturncallback,rabbitmq,mq,rabbitmq

?1.3? 消費者消息確認(rèn)

生產(chǎn)者確認(rèn):能確定消息投遞到隊列
消息持久化:能避免MQ宕機(jī)造成的消息丟失
生產(chǎn)者確認(rèn)和消息持久化能保證消息能投遞到消費者,但是無法保證消息被消費者消費(比如投遞消費者的
同時,消費者所在機(jī)器宕機(jī)了)

setreturncallback,rabbitmq,mq,rabbitmq

1.manual:不推薦 代碼侵入
try{
  //業(yè)務(wù)邏輯
  ack
} catch(ex){
  nack
}
2.auto:推薦 spring全權(quán)完成,不需要手動寫代碼
3.none:不推薦 投遞完成立馬刪除消息,是否成功都不管
@Slf4j
@Component
public class SpringRabbitListener { 
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消費者接收到simple.queue的消息:【" + msg + "】");
        //模擬出現(xiàn)異常情況
        System.out.println(1 / 0);
        log.info("消費者處理消息成功!");
    }
}

默認(rèn)為none:拋出異常后消息立即被刪除:setreturncallback,rabbitmq,mq,rabbitmq

?修改為auto模式:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

隊列返回nack會再去發(fā)送信息:?

setreturncallback,rabbitmq,mq,rabbitmq

1.4 失敗重試機(jī)制

setreturncallback,rabbitmq,mq,rabbitmq

?演示失敗重試機(jī)制:

listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true
          initial-interval: 1000
          multiplier: 3
          max-attempts: 4

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmqsetreturncallback,rabbitmq,mq,rabbitmq

?默認(rèn)重試到達(dá)最大次數(shù)后消息就丟棄:

? ? ? ?但是對于一些比較重要不能丟棄的消息需要使用以下策略:? ??setreturncallback,rabbitmq,mq,rabbitmq?

推薦使用第三種方案:將失敗的消息發(fā)送到失敗的交換機(jī)和失敗的隊列中,后面可以告知管理員然后重新
人工去處理

setreturncallback,rabbitmq,mq,rabbitmq

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

?演示:

發(fā)送消息:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq?setreturncallback,rabbitmq,mq,rabbitmq

?面試題:最后一分鐘的總結(jié)

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

?2. 死信交換機(jī)

?2.1? 初識死信交換機(jī)

setreturncallback,rabbitmq,mq,rabbitmq

1.發(fā)送信息到消費者默認(rèn)的retry重試機(jī)制,達(dá)到最大次數(shù)就會被reject
2.隊列中綁定一個死信交換機(jī),接收被reject的信息,然后發(fā)送到dl.queue
3.這樣就不擔(dān)心死信會丟失

對比消息失敗信息處理策略:

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

2.2? TTL?

?注意: 存活時間取消息所在隊列中存貨時間 、消息本身存活時間的以短的時間為準(zhǔn)

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
        log.info("消費者接收到了dl.queue的延遲消息");
    }
}

setreturncallback,rabbitmq,mq,rabbitmq

@Configuration
public class TTLMessageConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

?setreturncallback,rabbitmq,mq,rabbitmq

    @Test
    public void testTTLMessage() {
        // 1.準(zhǔn)備消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setExpiration("5000")
                .build();
        // 2.發(fā)送消息
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
        // 3.記錄日志
        log.info("消息已經(jīng)成功發(fā)送!");
    }

? 演示延時隊列:

? 1.啟動消費者?

? 2.發(fā)送消息:testTTLMessage()?

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

setreturncallback,rabbitmq,mq,rabbitmq

2.3 延遲隊列?

setreturncallback,rabbitmq,mq,rabbitmqp159 27:18?

setreturncallback,rabbitmq,mq,rabbitmq

?setreturncallback,rabbitmq,mq,rabbitmq

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消費者接收到了delay.queue的延遲消息");
    }
}

?setreturncallback,rabbitmq,mq,rabbitmq

?setreturncallback,rabbitmq,mq,rabbitmq

?文章來源地址http://www.zghlxwxcb.cn/news/detail-701346.html

    @Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.準(zhǔn)備消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.準(zhǔn)備CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.發(fā)送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("發(fā)送消息成功");
    }

演示延時隊列:

1.啟動消費者

setreturncallback,rabbitmq,mq,rabbitmq

?2.運行testSendDelayMessage

報錯原因:消息沒有做路由

setreturncallback,rabbitmq,mq,rabbitmq

?如何不報錯:添加延遲的判斷:?

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate對象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判斷是否是延遲消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一個延遲消息,忽略這個錯誤提示
                return;
            }
            // 記錄日志
            log.error("消息發(fā)送到隊列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的話,重發(fā)消息
        });
    }
}

setreturncallback,rabbitmq,mq,rabbitmq

?

到了這里,關(guān)于高級篇-rabbitmq的高級特性的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • RabbitMQ之高級特性

    RabbitMQ之高級特性

    提示:以下是本篇文章正文內(nèi)容,RabbitMQ 系列學(xué)習(xí)將會持續(xù)更新 官網(wǎng) :https://www.rabbitmq.com RabbitMQ 消息確定主要分為兩部分: 第一種是 消息發(fā)送確認(rèn) 。這種是用來確認(rèn)生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。 確認(rèn)發(fā)送的第一步是確認(rèn)是

    2023年04月10日
    瀏覽(14)
  • 學(xué)習(xí)RabbitMQ高級特性

    學(xué)習(xí)RabbitMQ高級特性

    了解熟悉RabbitMQ的高級特性 1、消息可靠性投遞 【confirm 確認(rèn)模式、return 退回模式】 2、Consumer ACK 【acknowledge】 3、消費端限流 【prefetch】 4、TTL過期時間 【time to live】 5、死信隊列 【Dead Letter Exchange】 6、延遲隊列 【rabbitmq-delayed-message-exchange】 7、優(yōu)先級隊列 【x-max-priority】

    2024年02月04日
    瀏覽(20)
  • RabbitMQ的高級特性及其特點

    RabbitMQ的高級特性及其特點

    1、應(yīng)用解耦 提高系統(tǒng)容錯性和可維護(hù)性 在訂單系統(tǒng)中,可以通過遠(yuǎn)程調(diào)用直接調(diào)用庫存系統(tǒng),支付系統(tǒng),物流系統(tǒng)。 但是這三個系統(tǒng)耦合度太高了,因為訂單系統(tǒng)下完訂單首先去庫存系統(tǒng)將庫存-1,然后將返回值返回給訂單系統(tǒng),然后通過訂單系統(tǒng)的返回結(jié)果來在支付系統(tǒng)

    2024年02月08日
    瀏覽(25)
  • RabbitMQ——高級特性(SpringBoot實現(xiàn))

    RabbitMQ——高級特性(SpringBoot實現(xiàn))

    本篇文章的內(nèi)容與我之前如下這篇文章一樣,只是使用技術(shù)不同,本篇文章使用SpringBoot實現(xiàn)RabbitMQ的高級特性! RabbitMQ——高級特性_小曹愛編程!的博客-CSDN博客 RabbitMQ——高級特性:1、RabbitMQ高級特性;2、RabbitMQ應(yīng)用問題;3、RabbitMQ集群搭建 https://blog.csdn.net/weixin_62993347/

    2023年04月21日
    瀏覽(16)
  • rabbitmq筆記-rabbitmq進(jìn)階-數(shù)據(jù)可靠性,rabbitmq高級特性

    消息何去何從 mandatory和immediate是channel.basicPublish方法的兩個參數(shù),都有消息傳遞過程中不可達(dá)目的地時將消息返回給生產(chǎn)者的功能。 mandatory參數(shù) true:交換器無法根據(jù)自身的類型 和路由鍵找到符合條件的隊列,rabbitmq調(diào)用Basic.Return命令將消息返回給生產(chǎn)者 生產(chǎn)者調(diào)用channel.

    2024年02月10日
    瀏覽(45)
  • 4.RabbitMQ高級特性 冪等 可靠消息 等等

    4.RabbitMQ高級特性 冪等 可靠消息 等等

    保障消息的成功發(fā)出 保障MQ節(jié)點的成功接收 發(fā)送端收到MQ節(jié)點(Broker)確認(rèn)應(yīng)答 完善的消息進(jìn)行補(bǔ)償機(jī)制 消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個應(yīng)答。 生產(chǎn)者進(jìn)行接收應(yīng)答,用來確定這條消息是否正常的發(fā)送到了Broker,這種方式也是

    2024年02月11日
    瀏覽(25)
  • RabbitMQ養(yǎng)成記 (10.高級特性:死信隊列,延遲隊列)

    RabbitMQ養(yǎng)成記 (10.高級特性:死信隊列,延遲隊列)

    這個概念 在其他MQ產(chǎn)品里面也是有的,只不過在Rabbitmq中稍微特殊一點 什么叫私信隊列呢? 就是當(dāng)消息成為 dead message之后,可以重新發(fā)到另外一臺交換機(jī),這個交換機(jī)就是DLX。 注意這里的有翻譯歧義, 這里的DLX 指的是 交換機(jī) ,而不是一個隊列。 隊列的消息長度 到達(dá)限制

    2024年02月05日
    瀏覽(17)
  • RabbitMQ高級特性2 、TTL、死信隊列和延遲隊列

    RabbitMQ高級特性2 、TTL、死信隊列和延遲隊列

    設(shè)置 消費者 測試 添加多條消息 拉取消息 每隔20秒拉取一次 一次拉取五條 然后在20秒內(nèi)一條一條消費 Time To Live(存活時間/過期時間)。 當(dāng)消息到達(dá)存活時間后,還沒有被消費,會被自動清除。 RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。 可

    2024年01月16日
    瀏覽(20)
  • RabbitMQ高級特性解析:消息投遞的可靠性保證與消費者ACK機(jī)制探究

    RabbitMQ高級特性解析:消息投遞的可靠性保證與消費者ACK機(jī)制探究

    學(xué)習(xí)RabbitMQ高級特性,涵蓋消息的持久化、確認(rèn)模式、退回模式以及消費者ACK機(jī)制等方面,助您構(gòu)建高可靠性的消息隊列系統(tǒng)。

    2024年01月16日
    瀏覽(17)
  • (四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列

    (四)RabbitMQ高級特性(消費端限流、利用限流實現(xiàn)不公平分發(fā)、消息存活時間、優(yōu)先級隊列

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 之前我們講過MQ可以對請求進(jìn)行“削峰填谷”,即通過消費端限流的方式限制消息的拉取速度,達(dá)到保護(hù)消費端的目的。 1、 生產(chǎn)者批量發(fā)送消息 2、消費端配置限流機(jī)制 3、消費者監(jiān)聽隊列 在RabbitMQ中,多個消費者監(jiān)聽同一條隊列,則隊列

    2024年02月15日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包