延遲隊列:其實就是死信隊列中消息過期的特殊情況
延遲隊列應用場景:
可以用死信隊列來實現(xiàn),不過死信隊列要等上一個消息消費成功,才會進行下一個消息的消費,這時候就需要用到延遲插件了,不過要線在docker上裝一個插件
安裝過程(Linux【Docker】)
前置條件是在Docker中部署過RabbitMq。
1、打開你的遠程工具,首先查看docker中已有的容器,主要是為了查看rabbitmq的容器ID
2、將本地下載好的壓縮包傳到服務器某文件夾下,然后將其復制到Docker中的RabbitMq容器中的plugins文件夾下。
docker cp /home/rabbitmq_delayed_message_exchange-3.8.0.ez a687ef46141b:/plugins
3、進入容器查看該目錄下是否有該壓縮包。
進入容器命令:(通過容器號或者容器名)
docker exec -it a687ef46141b bash
4、同樣在容器中的命令行執(zhí)行一下命令添加插件。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
5、退出容器,重啟該容器。
6、在管理端即同樣可以看到新增了一種交換機模式。
總結:以上就是RabbitMQ的延遲插件的安裝過程!
基于插件的延遲隊列DEMO
成功安裝RabbitMQ的延遲插件之后,我們就可以嘗試寫一個延遲隊列來驗證一下是否可以解決上述問題。
首先我們的測試環(huán)境是在一個Springboot的框架下完成!
1、最先寫配置類
/**
* 定義延遲交換機
*/
@Configuration
public class RabbitMQDelayedConfig {
//隊列
private static final String DELAYQUEUE = "delayedqueue";
//交換機
private static final String DELAYEXCHANGE = "delayedExchange";
@Bean
public Queue delayqueue(){return new Queue(DELAYQUEUE);}
//自定義延遲交換機
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
/**
* 1、交換機名稱
* 2、交換機類型
* 3、是否需要持久化
* 4、是否需要自動刪除
* 5、其他參數(shù)
*/
return new CustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);
}
//綁定隊列和延遲交換機
@Bean
public Binding delaybinding(){
return BindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();
}
}
2、先寫生產(chǎn)者
/**
* 基于插件的延遲隊列
* 消息生產(chǎn)者
*/
@Service
@Slf4j
public class DelayMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendmsg(String message,Integer delaytime){
log.info("當前時間:{},發(fā)送時長{}信息給延遲隊列:{}",new Date().toString(),delaytime,message);
rabbitTemplate.convertAndSend("delayedExchange","sectest",message,msg->{
//設置發(fā)送消息的延長時間 單位:ms
msg.getMessageProperties().setDelay(delaytime);
return msg;
});
}
}
3、再寫消費者
/**
* 基于插件的延遲隊列
* 消息的消費者
*/
@Service
@Slf4j
public class DelayMQReceiver {
@RabbitListener(queues = "delayedqueue")
public void receivemsg(Message messages){
String msg = new String(messages.getBody());
log.info("當前時間:{},接收時長信息給延遲隊列:{}",new Date().toString(),msg);
}
}
4、進行測試
將模擬請求放在了一個簡易的網(wǎng)頁上,點擊后輸出如下結果,證明當先發(fā)送了20s延時的消息,再發(fā)送2s延時的消息,在2s后消息正常被消費,基于插件的延遲隊列完美解決了問題。
文章來源:http://www.zghlxwxcb.cn/news/detail-685131.html
**【思考】:**如果在實際業(yè)務場景中使用延遲隊列的話,那就需要服務端在消息被消費之后主動告訴前端消費的結果,如果是這樣的話,那么Ajxs的通信方式是單雙工通信,只能前端主動訪問后端并返回結果,后端無法主動發(fā)送消息,應該使用Websocket來進行通信才可,websocket是長連接,不同于http的短連接,可以實現(xiàn)全雙工通信,前后端都可以主動發(fā)送消息。文章來源地址http://www.zghlxwxcb.cn/news/detail-685131.html
到了這里,關于RabbitMQ+springboot用延遲插件實現(xiàn)延遲消息的發(fā)送的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!