@Configuration
@ConditionalOnClass(SimpleRabbitListenerContainerFactory.class)
public class ConsumerConfig {
@Value("${rabbit.batch.num:100}")
private int batchNum;
@Bean("batchQueueRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true);
factory.setConsumerBatchEnabled(true);
factory.setBatchSize(batchNum);
factory.setConcurrentConsumers(5); // 設(shè)置并發(fā)消費(fèi)者數(shù)量為 5
factory.setMaxConcurrentConsumers(10); // 設(shè)置最大并發(fā)消費(fèi)者數(shù)量為 10
return factory;
}
}
concurrentConsumers 和 maxConcurrentConsumers 屬性的具體含義如下:
concurrentConsumers:指定同時(shí)運(yùn)行的消費(fèi)者數(shù)量,默認(rèn)為1。
maxConcurrentConsumers:指定允許的最大并發(fā)消費(fèi)者數(shù)量,默認(rèn)為1。
因此,在上述示例中,設(shè)置了 concurrentConsumers 為 5,maxConcurrentConsumers 為 10,意味著 RabbitMQ 容器將維持一個(gè)初始的消費(fèi)者池大小為 5,并在需要時(shí)最多擴(kuò)展到 10 個(gè)并發(fā)消費(fèi)者。
通過以上修改,你就可以在 batchQueueRabbitListenerContainerFactory 中控制消費(fèi)者的并發(fā)數(shù)量了。根據(jù)你的實(shí)際需求,可以調(diào)整并發(fā)消費(fèi)者的數(shù)量以滿足系統(tǒng)性能和資源的要求。
需要注意的是,這種設(shè)置會影響到特定隊(duì)列的消費(fèi)者并發(fā)數(shù)量,而不會影響其他隊(duì)列的消費(fèi)者。因?yàn)槟闶轻槍μ囟ǖ腷atchQueueRabbitListenerContainerFactory進(jìn)行配置,所以只會影響使用該工廠的隊(duì)列。
如果你想配置多個(gè)工廠,可以繼續(xù)添加其他的@Bean方法。
例如,你可以添加另一個(gè)SimpleRabbitListenerContainerFactory bean,命名為anotherQueueRabbitListenerContainerFactory,并配置相應(yīng)的屬性:
@Bean("anotherQueueRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory anotherQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 配置其他屬性
return factory;
}
通過這種方式,你可以定義多個(gè)SimpleRabbitListenerContainerFactory bean,并分別配置每個(gè)工廠需要的屬性。然后在需要使用特定工廠的@RabbitListener注解中,通過containerFactory屬性指定使用哪個(gè)工廠。
舉個(gè)例子,如果你希望某個(gè)隊(duì)列使用anotherQueueRabbitListenerContainerFactory工廠進(jìn)行監(jiān)聽,可以這樣設(shè)置:文章來源:http://www.zghlxwxcb.cn/news/detail-754044.html
@RabbitListener(queues = "another.queue", containerFactory = "anotherQueueRabbitListenerContainerFactory")
public void onAnotherMessage(Message message) {
// 處理消息
}
通過這種方式,你可以根據(jù)需要定義多個(gè)工廠,并將它們分配給不同的隊(duì)列進(jìn)行監(jiān)聽。
以下是rabbitMq監(jiān)聽消息代碼:文章來源地址http://www.zghlxwxcb.cn/news/detail-754044.html
@RabbitListener(queues = "test.queue", containerFactory = "batchQueueRabbitListenerContainerFactory")
@RabbitHandler
public void onReportMessage(List<Message> messages) {
List<Map<String, Object>> list = messages.stream().map(message -> (Map<String, Object>) message.getPayload()).collect(Collectors.toList());
log.info("report收到數(shù)據(jù):{}", JSON.toJSONString(list));
service.handler(list);
}
到了這里,關(guān)于rabbitMq 針對于當(dāng)前監(jiān)聽的隊(duì)列,來控制消費(fèi)者并發(fā)數(shù)量,不影響其他隊(duì)列,代碼示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!