MessageConvert
涉及網絡傳輸的應用序列化不可避免,發(fā)送端以某種規(guī)則將消息轉成 byte 數組進行發(fā)送,接收端則以約定的規(guī)則進行 byte[] 數組的解析。RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸的內容,RabbitMQ 抽象出一個 MessageConvert 接口處理消息的序列化,其實現有 SimpleMessageConverter(默認)、Jackson2JsonMessageConverter 等
- 當調用了 convertAndSend 方法時會使用 MessageConvert 進行消息的序列化
- SimpleMessageConverter 對于要發(fā)送的消息體 body 為 byte[] 時不進行處理,如果是 String 則轉成字節(jié)數組,如果是 Java 對象,則使用 jdk 序列化將消息轉成字節(jié)數組,轉出來的結果較大,含class類名,類相應方法等信息。因此性能較差
- 當使用 RabbitMQ 作為中間件時,數據量比較大,此時就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {
public static final String WINCALLCDR_QUEUE = "WINCHANCDR_QUEUE";
//生產者
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//發(fā)送消息進行序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
//消費者
@Bean("rabbitListenerContainerFactory")
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory mqConnectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(mqConnectionFactory);
//--加上這句 自定義MessageConverter
factory.setMessageConverter(new RabbitMessageConverter());
//反序列化
//factory.setMessageConverter(new Jackson2JsonMessageConverter());
//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ack
return factory;
}
}
自定義MessageConverter
在一些場景下我們希望在消息發(fā)送到MQ之前或者接受消息前對消息做一些自定義處理,這個時候就需要自定義MessageConverter了
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class RabbitMessageConverter implements MessageConverter {
/**
* 發(fā)送消息時轉換
*/
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("=======toMessage=========");
return new Message(object.toString().getBytes(), messageProperties);
}
/**
* 接受消息時轉換
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return new String(message.getBody());
}
}
@RabbitListener 用法
使用 @RabbitListener 注解標記方法,當監(jiān)聽到隊列 debug 中有消息時則會進行接收并處理。@RabbitListener注解指定目標方法來作為消費消息的方法,通過注解參數指定所監(jiān)聽的隊列或者Binding。使用@RabbitListener可以設置一個自己明確默認值的RabbitListenerContainerFactory對象??梢栽谂渲梦募性O置RabbitListenerAnnotationBeanPostProcessor并通過<rabbit:annotation-driven/>來設置@RabbitListener的執(zhí)行,當然也可以通過@EnableRabbit注解來啟用@RabbitListener。
注意
消息處理方法參數是由 MessageConverter 轉化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實例中去設置(默認 Spring 使用的實現是 SimpleRabbitListenerContainerFactory)
消息的 content_type 屬性表示消息 body 數據以什么數據格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應類型接收消息 body 內容,但若方法參數類型不正確會拋異常
配置消費者
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import net.icsoc.axt.job.config.RabbitMQConfig;
import net.icsoc.axt.job.dto.WinCallCdrDTO;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class CallListener {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void convertAndSendOrder() {
//創(chuàng)建生產數據
String jsonStr ="{user_id:234}"
rabbitTemplate.convertAndSend("exchange.topic", "routingKey.aa", jsonStr);
}
@RabbitListener(queues = RabbitMQConfig.WINCALLCDR_QUEUE, containerFactory = "rabbitListenerContainerFactory")
public void winCallCdr(String messsageBody) {
//log.info("winCallCdr消費者收到消息 : " + messsageBody);
WinCallCdrDTO winCallCdrDTO = JSON.parseObject(messsageBody, WinCallCdrDTO.class);
try {
exectueSaveWinCallCdrData2Db(winCallCdrDTO);
log.info("winCallCdr成功消費消息 {}", winCallCdrDTO.getCallId());
} catch (DataAccessException e) {
log.error("消費winCallCdr異常 {} {}", messsageBody, e);
}
}
}
和 @RabbitHandler 搭配使用
@RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換后的參數類型
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
@RabbitHandler
public void processMessage2(byte[] message) {
System.out.println(new String(message));
}
}
@Payload 與 @Headers
使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
也可以獲取單個 Header 屬性
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
通過 @RabbitListener 注解聲明 Binding
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
value = @Queue(value = "consumer_queue",durable = "true"),
key = "key.#"
))
public void processMessage1(Message message) {
System.out.println(message);
}
自動確認
生產者產生10筆消息,自動確認模式下,消息處理成功,消費者才會去獲取下一筆消息;消息處理拋出異常,那么將會消息重回隊列。自動確認分四種情況(第一就是正常消費,其他三種為異常情況)
- 消息成功被消費,沒有拋出異常,則自動確認,回復ack。不涉及requeue,畢竟已經成功了。requeue是對被拒絕的消息生效。
- 當拋出ImmediateAcknowledgeAmqpException異常的時候,則視為成功消費,確認該消息。
- 當拋出AmqpRejectAndDontRequeueException異常的時候,則消息會被拒絕,且requeue = false(該異常會在重試超過限制后拋出)
- 拋出其他的異常,消息會被拒絕,且requeue = true
手動確認
常用API
- channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);? ?ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
- channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);? Reject表示拒絕消息。requeue:false表示被拒絕的消息是丟棄;true表示重回隊列
- channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);? ?nack表示拒絕消息。multiple表示拒絕指定了delivery_tag的所有未確認的消息,requeue表示不是重回隊列
當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會立馬又接收到這條消息進行處理,接著拋出異常,進行 回滾,如此反復進行。這種情況會導致消息隊列處理出現阻塞,消息堆積,導致正常消息也無法運行。
消息重發(fā)送到隊尾
可能會出現堆積
//消費者處理消息緩慢
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) throws IOException {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消費異常");
}
log.info("【結束】:{}", message);
//ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//捕獲異常后,重新發(fā)送到指定隊列,自動ack不拋出異常即為ack
channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBody());
}
}
?如何處理異常消息
如果一個消息體本身有誤,會導致該消息體,一直無法進行處理,而服務器中刷出大量無用日志。解決這個問題可以采取兩種方案:
1.一種是對于日常細致處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。對于可以恢復的異常我們采取第三條中的解決方案,對于不可以處理的異常,我們采用記錄日志,直接丟棄該消息方案。
2.另一種是我們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,多次處理仍不能成功時,處理次數到達我們設置的值時,我們就丟棄該消息,但需要記錄詳細的日志。
將業(yè)務隊列綁定死信隊列,當消息被丟棄后,進入到死信隊列(代碼修復后監(jiān)聽死信隊列補償消息)??梢员苊馕覀兪謩拥幕謴拖ⅰ?/p>
@Component
@Slf4j
public class CustomerRev {
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) throws IOException {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消費異常");
}
log.info("【結束】:{}", message);
} catch (Exception e) {
//捕獲異常后,重新發(fā)送到指定隊列,自動確認不拋出異常即為ack
Integer retryCount;
Map<String, Object> headers = msg.getMessageProperties().getHeaders();
if(!headers.containsKey("retry-count")){
retryCount=0;
}else {
retryCount = (Integer)headers.get("retry-count");
}
//判斷是否滿足最大重試次數(重試3次)
if(retryCount++<3) {
headers.put("retry-count",retryCount);
//重新發(fā)送到MQ中
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
msg.getBody());
}
}
}
}
重試機制如何合理配置
重試機制能保證某些場景下消息能被消費掉。適合重試場景:大部分屬于讀取,如調用第三方接口、網絡波動問題、暫時調用不了、網絡連接等。重試并不是RabbitMQ重新發(fā)送了消息,僅僅是消費者內部進行的重試,換句話說就是重試跟mq沒有任何關系。
采坑:不管消息被消費了之后是手動確認還是自動確認,代碼中不能使用try/catch捕獲異常,否則重試機制失效。文章來源:http://www.zghlxwxcb.cn/news/detail-730835.html
spring:
rabbitmq:
listener:
simple:
retry:
# 開啟消費者重試機制(默認就是true,false則取消重試機制)
enabled: true
# 最大重試次數
max-attempts: 5
# 重試間距(單位:秒)
initial-interval: 2s
以上配置消息會重試5次,如果一直失敗,RabbitMQ放棄消費了文章來源地址http://www.zghlxwxcb.cn/news/detail-730835.html
到了這里,關于@RabbitListener 消息隊列 消息序列化的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!