国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

@RabbitListener 消息隊列 消息序列化

這篇具有很好參考價值的文章主要介紹了@RabbitListener 消息隊列 消息序列化。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

MessageConvert

涉及網絡傳輸的應用序列化不可避免,發(fā)送端以某種規(guī)則將消息轉成 byte 數組進行發(fā)送,接收端則以約定的規(guī)則進行 byte[] 數組的解析。RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸的內容,RabbitMQ 抽象出一個 MessageConvert 接口處理消息的序列化,其實現有 SimpleMessageConverter(默認)、Jackson2JsonMessageConverter 等

  • 當調用了 convertAndSend 方法時會使用 MessageConvert 進行消息的序列化
  • SimpleMessageConverter 對于要發(fā)送的消息體 body 為 byte[] 時不進行處理,如果是 String 則轉成字節(jié)數組,如果是 Java 對象,則使用 jdk 序列化將消息轉成字節(jié)數組,轉出來的結果較大,含class類名,類相應方法等信息。因此性能較差
  • 當使用 RabbitMQ 作為中間件時,數據量比較大,此時就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {

    public static final String WINCALLCDR_QUEUE = "WINCHANCDR_QUEUE";

    //生產者
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //發(fā)送消息進行序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    //消費者
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory mqConnectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(mqConnectionFactory);
        //--加上這句  自定義MessageConverter
        factory.setMessageConverter(new RabbitMessageConverter());
        //反序列化
        //factory.setMessageConverter(new Jackson2JsonMessageConverter());

        //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ack
        return factory;
    }
}

自定義MessageConverter

在一些場景下我們希望在消息發(fā)送到MQ之前或者接受消息前對消息做一些自定義處理,這個時候就需要自定義MessageConverter了

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class RabbitMessageConverter implements MessageConverter {

    /**
     * 發(fā)送消息時轉換
     */
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("=======toMessage=========");
        return new Message(object.toString().getBytes(), messageProperties);
    }

    /**
     * 接受消息時轉換
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return new String(message.getBody());
    }
}

@RabbitListener 用法

使用 @RabbitListener 注解標記方法,當監(jiān)聽到隊列 debug 中有消息時則會進行接收并處理。@RabbitListener注解指定目標方法來作為消費消息的方法,通過注解參數指定所監(jiān)聽的隊列或者Binding。使用@RabbitListener可以設置一個自己明確默認值的RabbitListenerContainerFactory對象??梢栽谂渲梦募性O置RabbitListenerAnnotationBeanPostProcessor并通過<rabbit:annotation-driven/>來設置@RabbitListener的執(zhí)行,當然也可以通過@EnableRabbit注解來啟用@RabbitListener。

注意
消息處理方法參數是由 MessageConverter 轉化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實例中去設置(默認 Spring 使用的實現是 SimpleRabbitListenerContainerFactory)

消息的 content_type 屬性表示消息 body 數據以什么數據格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應類型接收消息 body 內容,但若方法參數類型不正確會拋異常

配置消費者

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.icsoc.axt.job.config.RabbitMQConfig;
import net.icsoc.axt.job.dto.WinCallCdrDTO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Component
@Slf4j
public class CallListener {
    @Autowired
    RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void convertAndSendOrder() {
         //創(chuàng)建生產數據
         String jsonStr ="{user_id:234}"
         rabbitTemplate.convertAndSend("exchange.topic", "routingKey.aa", jsonStr);
    }

    @RabbitListener(queues = RabbitMQConfig.WINCALLCDR_QUEUE, containerFactory = "rabbitListenerContainerFactory")
    public void winCallCdr(String messsageBody) {

        //log.info("winCallCdr消費者收到消息  : " + messsageBody);
        WinCallCdrDTO winCallCdrDTO = JSON.parseObject(messsageBody, WinCallCdrDTO.class);
        try {
            exectueSaveWinCallCdrData2Db(winCallCdrDTO);
            log.info("winCallCdr成功消費消息 {}", winCallCdrDTO.getCallId());
        } catch (DataAccessException e) {
            log.error("消費winCallCdr異常 {} {}", messsageBody, e);
        }
    }
}

和 @RabbitHandler 搭配使用

@RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換后的參數類型

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {

    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

@Payload 與 @Headers

使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body:"+body);
    System.out.println("Headers:"+headers);
}

也可以獲取單個 Header 屬性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body:"+body);
    System.out.println("token:"+token);
}

通過 @RabbitListener 注解聲明 Binding

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
        value = @Queue(value = "consumer_queue",durable = "true"),
        key = "key.#"
))
public void processMessage1(Message message) {
    System.out.println(message);
}

自動確認

生產者產生10筆消息,自動確認模式下,消息處理成功,消費者才會去獲取下一筆消息;消息處理拋出異常,那么將會消息重回隊列。自動確認分四種情況(第一就是正常消費,其他三種為異常情況)

  • 消息成功被消費,沒有拋出異常,則自動確認,回復ack。不涉及requeue,畢竟已經成功了。requeue是對被拒絕的消息生效。
  • 當拋出ImmediateAcknowledgeAmqpException異常的時候,則視為成功消費,確認該消息。
  • 當拋出AmqpRejectAndDontRequeueException異常的時候,則消息會被拒絕,且requeue = false(該異常會在重試超過限制后拋出)
  • 拋出其他的異常,消息會被拒絕,且requeue = true

手動確認

常用API

  • channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);? ?ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
  • channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);? Reject表示拒絕消息。requeue:false表示被拒絕的消息是丟棄;true表示重回隊列
  • channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);? ?nack表示拒絕消息。multiple表示拒絕指定了delivery_tag的所有未確認的消息,requeue表示不是重回隊列

當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會立馬又接收到這條消息進行處理,接著拋出異常,進行 回滾,如此反復進行。這種情況會導致消息隊列處理出現阻塞,消息堆積,導致正常消息也無法運行。

消息重發(fā)送到隊尾

可能會出現堆積

    //消費者處理消息緩慢
    @RabbitListener(queues = {"kinson1"})
    public void receiver3(Message msg, Channel channel) throws IOException {
        try {
            //打印數據
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【開始】:{}",message);
            if("0".equals(message)){
                throw new RuntimeException("0的消息消費異常");
            }
            log.info("【結束】:{}", message);
            //ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            //捕獲異常后,重新發(fā)送到指定隊列,自動ack不拋出異常即為ack
            channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                    msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    msg.getBody());
        }
    }

?如何處理異常消息

如果一個消息體本身有誤,會導致該消息體,一直無法進行處理,而服務器中刷出大量無用日志。解決這個問題可以采取兩種方案:

1.一種是對于日常細致處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。對于可以恢復的異常我們采取第三條中的解決方案,對于不可以處理的異常,我們采用記錄日志,直接丟棄該消息方案。

2.另一種是我們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,多次處理仍不能成功時,處理次數到達我們設置的值時,我們就丟棄該消息,但需要記錄詳細的日志。

將業(yè)務隊列綁定死信隊列,當消息被丟棄后,進入到死信隊列(代碼修復后監(jiān)聽死信隊列補償消息)??梢员苊馕覀兪謩拥幕謴拖ⅰ?/p>

@Component
@Slf4j
public class CustomerRev {

    @RabbitListener(queues = {"kinson1"})
    public void receiver3(Message msg, Channel channel) throws IOException {
        try {
            //打印數據
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【開始】:{}",message);
            if("0".equals(message)){
                throw new RuntimeException("0的消息消費異常");
            }
            log.info("【結束】:{}", message);
        } catch (Exception e) {
            //捕獲異常后,重新發(fā)送到指定隊列,自動確認不拋出異常即為ack
            Integer retryCount;
            Map<String, Object> headers = msg.getMessageProperties().getHeaders();
            if(!headers.containsKey("retry-count")){
                retryCount=0;
            }else {
                retryCount = (Integer)headers.get("retry-count");
            }
            //判斷是否滿足最大重試次數(重試3次)
            if(retryCount++<3) {
                headers.put("retry-count",retryCount);
                //重新發(fā)送到MQ中
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
                channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                        msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
                        msg.getBody());
            }
        }
    }
}

重試機制如何合理配置

重試機制能保證某些場景下消息能被消費掉。適合重試場景:大部分屬于讀取,如調用第三方接口、網絡波動問題、暫時調用不了、網絡連接等。重試并不是RabbitMQ重新發(fā)送了消息,僅僅是消費者內部進行的重試,換句話說就是重試跟mq沒有任何關系。

采坑:不管消息被消費了之后是手動確認還是自動確認,代碼中不能使用try/catch捕獲異常,否則重試機制失效。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          # 開啟消費者重試機制(默認就是true,false則取消重試機制)
          enabled: true
          # 最大重試次數
          max-attempts: 5
          # 重試間距(單位:秒)
          initial-interval: 2s

以上配置消息會重試5次,如果一直失敗,RabbitMQ放棄消費了文章來源地址http://www.zghlxwxcb.cn/news/detail-730835.html

到了這里,關于@RabbitListener 消息隊列 消息序列化的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • @RabbitListener詳解

    @RabbitListener 是用于在 Spring AMQP 中消息監(jiān)聽的注解。它允許在 Spring 應用程序中聲明消息監(jiān)聽器。在使用 @RabbitListener 注解的方法上,可以接收來自 RabbitMQ 隊列的消息。這些方法可以通過使用 @RabbitHandler 注解標記,并在方法中聲明一個參數來表示要接收的消息。 例如:

    2024年02月14日
    瀏覽(15)
  • @RabbitListener的作用詳解

    @RabbitListener用于在Spring Boot應用程序中創(chuàng)建消費者端接收和處理消息的方法。它是基于Spring AMQP和RabbitMQ實現的,可以用于消費者端消費RabbitMQ隊列中的消息。 具體來說,@RabbitListener的作用是: 聲明該方法是一個RabbitMQ消息監(jiān)聽器,用于接收指定隊列中的消息。 自動創(chuàng)建和配置

    2024年02月13日
    瀏覽(13)
  • rabbitmq整合springboot:ChannelAwareMessageListener和@RabbitListener的使用

    Springboot中使用Rabbimq監(jiān)聽隊列中有兩種方式,一種是@RabbitListener注解的方式,一種是實現springboot:ChannelAwareMessageListener接口的方式 前者使用如下: 消費者: 生產者: 后者使用方式: 配置文件:

    2024年02月12日
    瀏覽(21)
  • Spring Boot 中的 @RabbitListener 注解是什么,原理,如何使用

    Spring Boot 中的 @RabbitListener 注解是什么,原理,如何使用

    在 RabbitMQ 中,消息的接收需要通過監(jiān)聽隊列來實現。在 Spring Boot 應用程序中,可以使用 @RabbitListener 注解來監(jiān)聽隊列,并在接收到消息時執(zhí)行指定的方法。本文將介紹 @RabbitListener 注解的原理、使用方法和常見應用場景。 @RabbitListener 注解是 Spring AMQP 框架中的一個關鍵組件,

    2024年02月09日
    瀏覽(95)
  • 【序列化與反序列化】關于序列化與反序列化MessagePack的實踐

    【序列化與反序列化】關于序列化與反序列化MessagePack的實踐

    在進行序列化操作之前,我們還對系統(tǒng)進行壓測,通過 jvisualvm 分析cpu,線程,垃圾回收情況等;運用火焰圖 async-profiler 分析系統(tǒng)性能,找出程序中占用CPU資源時間最長的代碼塊。 代碼放置GitHub:https://github.com/nateshao/leetcode/tree/main/source-code/src/main/java/com/nateshao/source/code/ser

    2024年02月11日
    瀏覽(27)
  • 【網絡】序列化反序列化

    【網絡】序列化反序列化

    在前文《網絡編程套接字》中,我們實現了服務器與客戶端之間的字符串通信,這是非常簡單的通信,在實際使用的過程中,網絡需要傳輸的不僅僅是字符串,更多的是結構化的數據(類似于 class , struct 類似的數據)。 那么我們應該怎么發(fā)送這些結構化的數據呢? 如果我們

    2024年02月05日
    瀏覽(29)
  • 序列化,反序列化之實例

    序列化,反序列化之實例

    介紹文章 __construct() 當一個對象創(chuàng)建時自動調用 __destruct() 當對象被銷毀時自動調用 (php絕大多數情況下會自動調用銷毀對象) __sleep() 使**用serialize()函數時觸發(fā) __wakeup 使用unserialse()**函數時會自動調用 __toString 當一個對象被當作一個字符串被調用 __call() 在對象上下文中調用不

    2024年02月14日
    瀏覽(28)
  • Qt 對象序列化/反序列化

    閱讀本文大概需要 3 分鐘 日常開發(fā)過程中,避免不了對象序列化和反序列化,如果你使用 Qt 進行開發(fā),那么有一種方法實現起來非常簡單和容易。 我們知道 Qt 的元對象系統(tǒng)非常強大,基于此屬性我們可以實現對象的序列化和反序列化操作。 比如有一個學生類,包含以下幾

    2024年02月13日
    瀏覽(27)
  • 協議,序列化,反序列化,Json

    協議,序列化,反序列化,Json

    協議究竟是什么呢?首先得知道主機之間的網絡通信交互的是什么數據,像平時使用聊天APP聊天可以清楚,用戶看到的不僅僅是聊天的文字,還能夠看到用戶的頭像昵稱等其他屬性。也就可以證明網絡通信不僅僅是交互字符串那么簡單。事實上網絡通信還可能會通過一個結構

    2024年02月13日
    瀏覽(25)
  • 【網絡】協議定制+序列化/反序列化

    【網絡】協議定制+序列化/反序列化

    如果光看定義很難理解序列化的意義,那么我們可以從另一個角度來推導出什么是序列化, 那么究竟序列化的目的是什么? 其實序列化最終的目的是為了對象可以 跨平臺存儲,和進行網絡傳輸 。而我們進行跨平臺存儲和網絡傳輸的方式就是IO,而我們的IO支持的數據格式就是

    2024年02月08日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包