Listener監(jiān)聽(tīng)
Listener的yml配置參數(shù)形式如下:
listener:
simple:
prefetch: 1 # 一次拉取的數(shù)量
concurrency: 5 # 消費(fèi)端的監(jiān)聽(tīng)個(gè)數(shù)(即@RabbitListener開(kāi)啟幾個(gè)線程去處理數(shù)據(jù)。)
max-concurrency: 10 # 消費(fèi)端的監(jiān)聽(tīng)最大個(gè)數(shù)
acknowledge-mode: manual
retry:
multiplier: 1
max-attempts: 3
enabled: true
direct:
retry:
enabled: true
max-attempts: 3
acknowledge-mode: manual
auto-startup: true
type: simple
在消費(fèi)端,配置prefetch和concurrency參數(shù)便可以實(shí)現(xiàn)消費(fèi)端MQ并發(fā)處理消息,下面詳細(xì)敘述下listener下的幾個(gè)參數(shù)的意思
listener.type 表示監(jiān)聽(tīng)的類(lèi)型 主要有兩種 simple和direct 對(duì)于的監(jiān)聽(tīng)容器是
SimpleMessageListenerContainer和DirectMessageListenerContainer
listener.simple.prefetch:
每個(gè)customer會(huì)在MQ預(yù)取一些消息放入內(nèi)存的LinkedBlockingQueue中進(jìn)行消費(fèi),這個(gè)值越高,消息傳遞的越快,
但非順序處理消息的風(fēng)險(xiǎn)更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。
不過(guò)在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內(nèi)存中大量堆積,消耗大量?jī)?nèi)存;以及對(duì)于一些嚴(yán)格要求順序的消息,prefetch的值應(yīng)當(dāng)設(shè)置為1。
acknowledge-mode:表示消費(fèi)端收到消息后的確認(rèn)方式。
有三種確認(rèn)方式:
自動(dòng)確認(rèn):acknowledge="none"
手動(dòng)確認(rèn):acknowledge="manual"
根據(jù)異常情況確認(rèn):acknowledge="auto"
其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。
如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。
concurrency:
concurrency =1,即每個(gè)Listener容器將開(kāi)啟一個(gè)線程去處理消息。
可以在 @RabbitListener(concurrency = "3")直接配置當(dāng)前監(jiān)聽(tīng)器啟動(dòng)線程,如果在Listener配置了exclusive參數(shù),即確定此容器中的單個(gè)customer是否具有對(duì)隊(duì)列的獨(dú)占訪問(wèn)權(quán)限。如果為true,則容器的并發(fā)性必須為1。
如果配置了exclusive=true,但是concurrency>1則會(huì)拋錯(cuò)
prefetch和concurrency
若一個(gè)消費(fèi)者配置prefetch=10,concurrency=2,即會(huì)開(kāi)啟2個(gè)線程去消費(fèi)消息,每個(gè)線程都會(huì)抓取10個(gè)線程到內(nèi)存中(注意不是兩個(gè)線程去共享內(nèi)存中抓取的消息)
max-concurrency:: 表示最大能啟動(dòng)的線程數(shù)
retry:
表示消息被消費(fèi)者拒收后重試發(fā)送或者因?yàn)楫惓T蛳⒅卦嚢l(fā)送
multiplier:重試間隔乘法策略
max-attempts: 是最大重試次數(shù),默認(rèn)是三
enabled: 是否開(kāi)啟重試
initial-interval: 初始化重試次數(shù)間隔
max-interval 最大重試間隔
stateless:重試是無(wú)狀態(tài)的還是有狀態(tài)的。
autoStartup:
當(dāng)autoStartup為false的時(shí)候,監(jiān)聽(tīng)容器就不會(huì)自動(dòng)啟動(dòng),然后我們可以通過(guò)使用單個(gè)容器的ID,調(diào)用RabbitListenerEndpointRegistry類(lèi)的getListenerContainer(String id)方法來(lái)獲得對(duì)單個(gè)容器的引用,并執(zhí)行strat方法,啟動(dòng)容器。
舉一個(gè)例子:
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "${rabbitmq.queue.routing.beijing}", durable = "true", autoDelete = "false"),
exchange = @Exchange(
value = "${rabbitmq.exchange.routing}",
durable = "true",
type = ExchangeTypes.TOPIC),
key = "china.#")}, concurrency = "3", exclusive = false,id = "autoStart",autoStartup = "false")
public void receive(@Payload String msg, Message message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println(String.format("mesage:%s", message.getBody()));
System.out.println("路由監(jiān)聽(tīng)接受到發(fā)送者發(fā)送的信息:" + msg);
// 確認(rèn)消息
// channel.basicAck(deliveryTag, false);
}
手動(dòng)開(kāi)啟/關(guān)閉容器
public void stopContainer(String containerId){
//得到容器的對(duì)象
MessageListenerContainer container = registry.getListenerContainer(containerId);
//判斷容器狀態(tài)
if(container.isRunning()){
//開(kāi)啟容器
container.stop();
System.out.println("關(guān)閉容器");
}
}
public void startContainer(String containerId){
//得到容器的對(duì)象
MessageListenerContainer container = registry.getListenerContainer(containerId);
//判斷容器狀態(tài)
if(!container.isRunning()){
//開(kāi)啟容器
container.start();
System.out.println("開(kāi)啟容器");
}
}
Rabbitmq listener監(jiān)聽(tīng)Message消息,其中Message主要包含兩部分文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-543506.html
public class Message implements Serializable {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) { //NOSONAR
this.body = body; //NOSONAR
this.messageProperties = messageProperties;
}
}
MessageProperties // 消息屬性
byte[] body // 消息內(nèi)容
當(dāng)監(jiān)聽(tīng)者監(jiān)聽(tīng)到隊(duì)列中有消息時(shí)則會(huì)進(jìn)行接收并處理,MessageConvert 會(huì)直接轉(zhuǎn)換成消息類(lèi)型,并綁定在對(duì)應(yīng)被注解的方法中。默認(rèn)實(shí)現(xiàn)類(lèi)為SimpleMessageConverter文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-543506.html
public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert text-based Message content", e);
}
}
else if (contentType != null &&
contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
try {
content = SerializationUtils.deserialize(
createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
}
catch (IOException | IllegalArgumentException | IllegalStateException e) {
throw new MessageConversionException(
"failed to convert serialized Message content", e);
}
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
/**
* Creates an AMQP Message from the provided Object.
*/
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
}
到了這里,關(guān)于Rabbitmq - rabbitmq Listener監(jiān)聽(tīng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!