????????我們在消費RabbitMQ消息的過程中,有時候可能會想先暫停消費一段時間,然后過段時間再啟動消費者,這個需求怎么實現(xiàn)呢?我們可以借助RabbitListenerEndpointRegistry這個類來實現(xiàn),它的全類名是org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry,通過這個類可以實現(xiàn)全部隊列消息的啟動、停止消費,也可以實現(xiàn)指定隊列消息的啟動、停止消費。具體的原因感興趣的話可以參考一下我前面的這篇博客(17)不重啟服務(wù)動態(tài)調(diào)整RabbitMQ消費者數(shù)量,里面有相應(yīng)的源碼分析。
停止、啟動全部隊列消費
????????RabbitListenerEndpointRegistry類提供了start()方法和stop()方法,可以看到底層都是通過調(diào)用getListenerContainers()獲取到所有隊列的消費監(jiān)聽容器列表,然后遍歷挨個調(diào)用對應(yīng)的start()方法和stop()方法。
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
@Override
public void stop() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
listenerContainer.stop();
}
}
? ? ? ? 我們只需要獲取到RabbitListenerEndpointRegistry對象,然后調(diào)用其start()方法和stop()方法即可實現(xiàn)啟動/停止所有隊列消費。
? ? ? ? 實現(xiàn)代碼如下所示:
@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@RequestMapping(value = "/startStopAllConsumer")
@ApiOperation(value = "啟動/暫停全部隊列消息消費")
public Response startStopAllConsumer(@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {
log.info("啟動/暫停全部隊列消息消費,consumeSwitch:{}",consumeSwitch);
if(consumeSwitch){
rabbitListenerEndpointRegistry.start();
}else {
rabbitListenerEndpointRegistry.stop();
}
return Response.success();
}
????????傳入開關(guān)參數(shù)為false,會停止所有隊列消費者消費,調(diào)用后控制臺看到如下日志
2023-09-04 19:43:11.480 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO ?c.b.t.m.p.w.PayCashierMockController:67 - 啟動/暫停全部隊列消息消費,consumeSwitch:false
2023-09-04 19:43:11.556 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO ?o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
2023-09-04 19:43:12.352 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO ?o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
可以看到消息監(jiān)聽容器關(guān)閉的日志,然后再傳入開關(guān)參數(shù)為true,調(diào)用后會啟動所有隊列消息消費。
停止、啟動指定隊列消費
? ? ? ? 上面提到了RabbitListenerEndpointRegistry.getListenerContainers()可以獲取到所有隊列的消費監(jiān)聽容器列表,我們可以使用MessageListenerContainer中獲取消費的隊列名進(jìn)行判斷,以實現(xiàn)指定隊列的停止、啟動消費。
????????實現(xiàn)代碼如下所示:
@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@RequestMapping(value = "/startStopConsumer")
@ApiOperation(value = "啟動/暫停指定隊列消息消費")
public Response startStopConsumer(@RequestParam(value = "queueName", required = false) String queueName,
@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {
log.info("啟動/暫停指定隊列消息消費,consumeSwitch:{},queueName:{}",consumeSwitch,queueName);
//獲取所有消息監(jiān)聽容器
Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
for (MessageListenerContainer container : listenerContainers) {
SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
//消息監(jiān)聽容器要消費的隊列名稱集合
List<String> queueNamesList = Arrays.asList(con.getQueueNames());
//判斷容器中的隊列名稱是否包含需要調(diào)整的隊列名參數(shù)
if (queueNamesList.contains(queueName)) {
if(consumeSwitch){
con.start();
}else{
con.stop();
}
}
}
return Response.success();
}
傳入開關(guān)參數(shù)為false,停止pay_work_notify隊列消費者消費,調(diào)用后控制臺看到如下日志文章來源:http://www.zghlxwxcb.cn/news/detail-707658.html
2023-09-04 19:51:37.130 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO ?c.b.t.m.p.w.PayCashierMockController:80 - 啟動/暫停指定隊列消息消費,consumeSwitch:false,queueName:pay_work_notify
2023-09-04 19:51:37.200 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO ?o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
2023-09-04 19:51:37.903 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO ?o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
可以看到消息監(jiān)聽容器關(guān)閉的日志,然后再傳入開關(guān)參數(shù)為true,調(diào)用后會啟動pay_work_notify隊列消息消費。文章來源地址http://www.zghlxwxcb.cn/news/detail-707658.html
到了這里,關(guān)于(18)不重啟服務(wù)動態(tài)停止、啟動RabbitMQ消費者的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!