應用場景:
異步處理。把消息放入消息中間件中,等到需要的時候再去處理。
流量削峰 例如秒殺活動,在短時間內訪問量急劇增加,使用消息隊列,當消息隊列滿了就拒絕響應,跳轉到錯誤頁面,這樣就可以使得系統(tǒng)不會因為超負載而崩潰
安裝rabbitMQ
#拉取鏡像 docker pull rabbitmq:3.8-management #創(chuàng)建容器啟動 docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management
管理后臺:http://IP:15672
-
搭建rabbit_util 模塊
-
引入依賴
<dependencies> <!--rabbitmq消息隊列--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> </dependencies>
-
添加service---就是對RabbitTemplate的一個封裝,可以不封裝直接使用RabbitTemplate
-
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitService { // 引入操作rabbitmq 的模板 @Autowired private RabbitTemplate rabbitTemplate; /** * 發(fā)送消息 * @param exchange 交換機 * @param routingKey 路由鍵 * @param message 消息 * @return */ public boolean sendMessage(String exchange,String routingKey, Object message){ // 調用發(fā)送數(shù)據(jù)的方法 rabbitTemplate.convertAndSend(exchange,routingKey,message); return true; } /** * 發(fā)送延遲消息的方法 * @param exchange 交換機 * @param routingKey 路由鍵 * @param message 消息內容 * @param delayTime 延遲時間 * @return */ public boolean sendDelayMessage(String exchange,String routingKey, Object message, int delayTime){ // 在發(fā)送消息的時候設置延遲時間 rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 設置一個延遲時間 message.getMessageProperties().setDelay(delayTime*1000); return message; } }); return true; } }
?配置mq消息轉換器
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
說明:默認是字符串轉換器文章來源:http://www.zghlxwxcb.cn/news/detail-775880.html
添加消息的確認配置文章來源地址http://www.zghlxwxcb.cn/news/detail-775880.html
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {
// 我們發(fā)送消息使用的是 private RabbitTemplate rabbitTemplate; 對象
// 如果不做設置的話 當前的rabbitTemplate 與當前的配置類沒有任何關系!
@Autowired
private RabbitTemplate rabbitTemplate;
// 設置 表示修飾一個非靜態(tài)的void方法,在服務器加載Servlet的時候運行。并且只執(zhí)行一次!
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
/**
* 表示消息是否正確發(fā)送到了交換機上
* @param correlationData 消息的載體
* @param ack 判斷是否發(fā)送到交換機上
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息發(fā)送成功!");
}else {
System.out.println("消息發(fā)送失敗!"+cause);
}
}
/**
* 消息如果沒有正確發(fā)送到隊列中,則會走這個方法!如果消息被正常處理,則這個方法不會走!
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主體: " + new String(message.getBody()));
System.out.println("應答碼: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("消息使用的交換器 exchange : " + exchange);
System.out.println("消息使用的路由鍵 routing : " + routingKey);
}
}
到了這里,關于整合MQ-----RabbitMQ的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!