国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Rabbitmq - rabbitmq Listener監(jiān)聽(tīng)

這篇具有很好參考價(jià)值的文章主要介紹了Rabbitmq - rabbitmq Listener監(jiān)聽(tīng)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Listener監(jiān)聽(tīng)

Listener的yml配置參數(shù)形式如下:

   listener:
     simple:
       prefetch: 1 # 一次拉取的數(shù)量
       concurrency: 5 # 消費(fèi)端的監(jiān)聽(tīng)個(gè)數(shù)(即@RabbitListener開(kāi)啟幾個(gè)線程去處理數(shù)據(jù)。)
       max-concurrency: 10 # 消費(fèi)端的監(jiān)聽(tīng)最大個(gè)數(shù)
       acknowledge-mode: manual
       retry:
         multiplier: 1
         max-attempts: 3
         enabled: true
     direct:
       retry:
         enabled: true
         max-attempts: 3
       acknowledge-mode: manual
       auto-startup: true
     type: simple

在消費(fèi)端,配置prefetch和concurrency參數(shù)便可以實(shí)現(xiàn)消費(fèi)端MQ并發(fā)處理消息,下面詳細(xì)敘述下listener下的幾個(gè)參數(shù)的意思

listener.type 表示監(jiān)聽(tīng)的類(lèi)型 主要有兩種 simple和direct 對(duì)于的監(jiān)聽(tīng)容器是 
SimpleMessageListenerContainer和DirectMessageListenerContainer

listener.simple.prefetch: 
每個(gè)customer會(huì)在MQ預(yù)取一些消息放入內(nèi)存的LinkedBlockingQueue中進(jìn)行消費(fèi),這個(gè)值越高,消息傳遞的越快,
但非順序處理消息的風(fēng)險(xiǎn)更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。
不過(guò)在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內(nèi)存中大量堆積,消耗大量?jī)?nèi)存;以及對(duì)于一些嚴(yán)格要求順序的消息,prefetch的值應(yīng)當(dāng)設(shè)置為1。
acknowledge-mode:表示消費(fèi)端收到消息后的確認(rèn)方式。
有三種確認(rèn)方式:
自動(dòng)確認(rèn):acknowledge="none"
手動(dòng)確認(rèn):acknowledge="manual"
根據(jù)異常情況確認(rèn):acknowledge="auto"

其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。
如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。
concurrency:
concurrency =1,即每個(gè)Listener容器將開(kāi)啟一個(gè)線程去處理消息。
可以在 @RabbitListener(concurrency = "3")直接配置當(dāng)前監(jiān)聽(tīng)器啟動(dòng)線程,如果在Listener配置了exclusive參數(shù),即確定此容器中的單個(gè)customer是否具有對(duì)隊(duì)列的獨(dú)占訪問(wèn)權(quán)限。如果為true,則容器的并發(fā)性必須為1。
如果配置了exclusive=true,但是concurrency>1則會(huì)拋錯(cuò)
prefetch和concurrency
若一個(gè)消費(fèi)者配置prefetch=10,concurrency=2,即會(huì)開(kāi)啟2個(gè)線程去消費(fèi)消息,每個(gè)線程都會(huì)抓取10個(gè)線程到內(nèi)存中(注意不是兩個(gè)線程去共享內(nèi)存中抓取的消息)

 max-concurrency:: 表示最大能啟動(dòng)的線程數(shù)
retry:
表示消息被消費(fèi)者拒收后重試發(fā)送或者因?yàn)楫惓T蛳⒅卦嚢l(fā)送
multiplier:重試間隔乘法策略
max-attempts: 是最大重試次數(shù),默認(rèn)是三
enabled: 是否開(kāi)啟重試
initial-interval: 初始化重試次數(shù)間隔
max-interval 最大重試間隔
stateless:重試是無(wú)狀態(tài)的還是有狀態(tài)的。
autoStartup:
當(dāng)autoStartup為false的時(shí)候,監(jiān)聽(tīng)容器就不會(huì)自動(dòng)啟動(dòng),然后我們可以通過(guò)使用單個(gè)容器的ID,調(diào)用RabbitListenerEndpointRegistry類(lèi)的getListenerContainer(String id)方法來(lái)獲得對(duì)單個(gè)容器的引用,并執(zhí)行strat方法,啟動(dòng)容器。

舉一個(gè)例子:


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "${rabbitmq.queue.routing.beijing}", durable = "true", autoDelete = "false"),
                    exchange = @Exchange(
                            value = "${rabbitmq.exchange.routing}",
                            durable = "true",
                            type = ExchangeTypes.TOPIC),
                    key = "china.#")}, concurrency = "3", exclusive = false,id = "autoStart",autoStartup = "false")
    public void receive(@Payload String msg, Message message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        System.out.println(String.format("mesage:%s", message.getBody()));
        System.out.println("路由監(jiān)聽(tīng)接受到發(fā)送者發(fā)送的信息:" + msg);
        // 確認(rèn)消息
//        channel.basicAck(deliveryTag, false);
    }

手動(dòng)開(kāi)啟/關(guān)閉容器

public void stopContainer(String containerId){
        //得到容器的對(duì)象
        MessageListenerContainer container = registry.getListenerContainer(containerId);
        //判斷容器狀態(tài)
        if(container.isRunning()){
            //開(kāi)啟容器
            container.stop();
            System.out.println("關(guān)閉容器");
        }

    }

    public void startContainer(String containerId){
        //得到容器的對(duì)象
        MessageListenerContainer container = registry.getListenerContainer(containerId);
        //判斷容器狀態(tài)
        if(!container.isRunning()){
            //開(kāi)啟容器
            container.start();
            System.out.println("開(kāi)啟容器");
        }

    }

Rabbitmq listener監(jiān)聽(tīng)Message消息,其中Message主要包含兩部分

public class Message implements Serializable {
	private final MessageProperties messageProperties;

	private final byte[] body;
	
	public Message(byte[] body, MessageProperties messageProperties) { //NOSONAR
		this.body = body; //NOSONAR
		this.messageProperties = messageProperties;
	}
}
MessageProperties // 消息屬性

byte[] body // 消息內(nèi)容

當(dāng)監(jiān)聽(tīng)者監(jiān)聽(tīng)到隊(duì)列中有消息時(shí)則會(huì)進(jìn)行接收并處理,MessageConvert 會(huì)直接轉(zhuǎn)換成消息類(lèi)型,并綁定在對(duì)應(yīng)被注解的方法中。默認(rèn)實(shí)現(xiàn)類(lèi)為SimpleMessageConverter文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-543506.html

public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		Object content = null;
		MessageProperties properties = message.getMessageProperties();
		if (properties != null) {
			String contentType = properties.getContentType();
			if (contentType != null && contentType.startsWith("text")) {
				String encoding = properties.getContentEncoding();
				if (encoding == null) {
					encoding = this.defaultCharset;
				}
				try {
					content = new String(message.getBody(), encoding);
				}
				catch (UnsupportedEncodingException e) {
					throw new MessageConversionException(
							"failed to convert text-based Message content", e);
				}
			}
			else if (contentType != null &&
					contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
				try {
					content = SerializationUtils.deserialize(
							createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
				}
				catch (IOException | IllegalArgumentException | IllegalStateException e) {
					throw new MessageConversionException(
							"failed to convert serialized Message content", e);
				}
			}
		}
		if (content == null) {
			content = message.getBody();
		}
		return content;
	}

	/**
	 * Creates an AMQP Message from the provided Object.
	 */
	@Override
	protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		byte[] bytes = null;
		if (object instanceof byte[]) {
			bytes = (byte[]) object;
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
		}
		else if (object instanceof String) {
			try {
				bytes = ((String) object).getBytes(this.defaultCharset);
			}
			catch (UnsupportedEncodingException e) {
				throw new MessageConversionException(
						"failed to convert to Message content", e);
			}
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
			messageProperties.setContentEncoding(this.defaultCharset);
		}
		else if (object instanceof Serializable) {
			try {
				bytes = SerializationUtils.serialize(object);
			}
			catch (IllegalArgumentException e) {
				throw new MessageConversionException(
						"failed to convert to serialized Message content", e);
			}
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
		}
		if (bytes != null) {
			messageProperties.setContentLength(bytes.length);
			return new Message(bytes, messageProperties);
		}
		throw new IllegalArgumentException(getClass().getSimpleName()
				+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
	}

}

到了這里,關(guān)于Rabbitmq - rabbitmq Listener監(jiān)聽(tīng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ默認(rèn)監(jiān)聽(tīng)的ip地址

    RabbitMQ默認(rèn)監(jiān)聽(tīng)的ip地址

    RabbitMQ 默認(rèn)監(jiān)聽(tīng)所有可用 ip 地址,當(dāng)Rabbitmq 所在的服務(wù)端節(jié)點(diǎn)上存在多 ip 時(shí),只要客戶端能與服務(wù)端任一 ip 通信,即可向 RabbitMQ 發(fā)送消息

    2024年02月11日
    瀏覽(21)
  • SpringBoot整合Canal+RabbitMQ監(jiān)聽(tīng)數(shù)據(jù)變更

    需求 步驟 環(huán)境搭建 整合SpringBoot Canal實(shí)現(xiàn)客戶端 Canal整合RabbitMQ SpringBoot整合RabbitMQ ? 我想要在SpringBoot中采用一種與業(yè)務(wù)代碼解耦合的方式,來(lái)實(shí)現(xiàn)數(shù)據(jù)的變更記錄,記錄的內(nèi)容是新數(shù)據(jù),如果是更新操作還得有舊數(shù)據(jù)內(nèi)容。 經(jīng)過(guò)調(diào)研發(fā)現(xiàn),使用Canal來(lái)監(jiān)聽(tīng)MySQL的binlog變化可

    2024年02月11日
    瀏覽(16)
  • linux部署rabbitmq開(kāi)啟mqtt插件由于監(jiān)聽(tīng)1883端口導(dǎo)致重啟rabbitmq失敗的解決方法

    linux部署rabbitmq開(kāi)啟mqtt插件由于監(jiān)聽(tīng)1883端口導(dǎo)致重啟rabbitmq失敗的解決方法

    第一步:部署rabbitmq 部署rabbitmq請(qǐng)移步(在這里可以找到erlang和rabbitmq適配的版本并下載安裝包): 通過(guò)移步的地址中執(zhí)行以下步驟 1. 安裝erlang環(huán)境 2. 下載完rabbitmq的安裝包并執(zhí)行命令 yum localinstall 安裝包的名稱 3. 開(kāi)啟rabbitmq插件 rabbitmq-plugins enable rabbitmq_management rabbitmq_man

    2024年02月09日
    瀏覽(35)
  • springboot-rabbitmq 實(shí)現(xiàn)動(dòng)態(tài)配置監(jiān)聽(tīng)容器

    springboot-rabbitmq 實(shí)現(xiàn)動(dòng)態(tài)配置監(jiān)聽(tīng)容器

    1.1.1從factories我們可以看到mq的啟動(dòng)配置類(lèi) 1.1.2然后我們找到 RabbitAutoConfiguration ,發(fā)現(xiàn)它引入了 RabbitAnnotationDrivenConfiguration 這個(gè)配置類(lèi) 1.1.3進(jìn)入 RabbitAnnotationDrivenConfiguration 滑到最低部看到這里引入了 @EnableRabbit 這個(gè)注解,找個(gè)注解里面又引出 RabbitBootstrapConfiguration 這個(gè)配置類(lèi)

    2023年04月09日
    瀏覽(25)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 實(shí)現(xiàn)數(shù)據(jù)監(jiān)聽(tīng)

    SpringCloud 整合 Canal+RabbitMQ+Redis 實(shí)現(xiàn)數(shù)據(jù)監(jiān)聽(tīng)

    Canal 指的是阿里巴巴開(kāi)源的數(shù)據(jù)同步工具,用于數(shù)據(jù)庫(kù)的實(shí)時(shí)增量數(shù)據(jù)訂閱和消費(fèi)。它可以針對(duì) MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的異構(gòu)數(shù)據(jù)同步等情況進(jìn)行實(shí)時(shí)增量數(shù)據(jù)同步。 當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步數(shù)據(jù)庫(kù)

    2024年02月03日
    瀏覽(19)
  • Spring項(xiàng)目配置文件中RabbitMQ監(jiān)聽(tīng)器各個(gè)參數(shù)的作用

    spring.rabbitmq.listener.simple.concurrency :設(shè)置監(jiān)聽(tīng)器容器的并發(fā)消費(fèi)者數(shù)量,默認(rèn)為1,即單線程消費(fèi)。 spring.rabbitmq.listener.simple.max-concurrency :設(shè)置監(jiān)聽(tīng)器容器的最大并發(fā)消費(fèi)者數(shù)量。 spring.rabbitmq.listener.simple.prefetch :設(shè)置每個(gè)消費(fèi)者從RabbitMQ服務(wù)器獲取的消息數(shù)量,即每次從隊(duì)列

    2024年02月16日
    瀏覽(35)
  • RabbitMQ之動(dòng)態(tài)創(chuàng)建隊(duì)列與綁定交換機(jī)和監(jiān)聽(tīng)器

    RabbitMQ之動(dòng)態(tài)創(chuàng)建隊(duì)列與綁定交換機(jī)和監(jiān)聽(tīng)器

    為什么需要?jiǎng)討B(tài)創(chuàng)建隊(duì)列與綁定交換機(jī)?我在寫(xiě)項(xiàng)目的時(shí)候遇到這么個(gè)問(wèn)題,我數(shù)據(jù)庫(kù)中存在一個(gè)字段messageType指定為消息類(lèi)型,消息類(lèi)型存在三種,一種是通知類(lèi),一種是驗(yàn)證碼類(lèi),一種是活動(dòng)類(lèi)。并且對(duì)應(yīng)的,要將消息進(jìn)行不同渠道的分發(fā),還存在一個(gè)channelType,而他又存

    2024年02月03日
    瀏覽(17)
  • SpringBoot 整合RabbitMq 自定義消息監(jiān)聽(tīng)容器來(lái)實(shí)現(xiàn)消息批量處理

    SpringBoot 整合RabbitMq 自定義消息監(jiān)聽(tīng)容器來(lái)實(shí)現(xiàn)消息批量處理

    RabbitMQ是一種常用的消息隊(duì)列,Spring Boot對(duì)其進(jìn)行了深度的整合,可以快速地實(shí)現(xiàn)消息的發(fā)送和接收。在RabbitMQ中,消息的發(fā)送和接收都是異步的,因此需要使用監(jiān)聽(tīng)器來(lái)監(jiān)聽(tīng)消息的到來(lái)。Spring Boot中提供了默認(rèn)的監(jiān)聽(tīng)器容器,但是有時(shí)候我們需要自定義監(jiān)聽(tīng)器容器,來(lái)滿足一

    2024年02月16日
    瀏覽(17)
  • 207、SpringBoot 整合 RabbitMQ 實(shí)現(xiàn)消息的發(fā)送 與 接收(監(jiān)聽(tīng)器)

    207、SpringBoot 整合 RabbitMQ 實(shí)現(xiàn)消息的發(fā)送 與 接收(監(jiān)聽(tīng)器)

    1、ContentUtil 先定義常量 2、RabbitMQConfig 創(chuàng)建隊(duì)列的兩種方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 類(lèi)型的Bean,RabbitMQ將會(huì)自動(dòng)為該Bean創(chuàng)建對(duì)應(yīng)的隊(duì)列。 就是在配置類(lèi)中創(chuàng)建一個(gè)生成消息隊(duì)列的@Bean。 問(wèn)題: 用 @Configuration 注解聲明為配置類(lèi),但是項(xiàng)目啟動(dòng)

    2024年02月06日
    瀏覽(25)
  • Canal同步Mysql實(shí)時(shí)操作日志至RabbitMQ,并實(shí)現(xiàn)監(jiān)聽(tīng)及解析處理

    Canal同步Mysql實(shí)時(shí)操作日志至RabbitMQ,并實(shí)現(xiàn)監(jiān)聽(tīng)及解析處理

    關(guān)于Canal的介紹及原理不在此贅述,可自行查閱。筆者在使用Canal同步Mysql實(shí)時(shí)操作記錄至RabbitMQ的過(guò)程中,也翻閱了一些大牛們的文章,可能是我使用的Canal版本與文中版本不一致,出現(xiàn)了一些問(wèn)題,在此總結(jié)記錄一下可行的方案。 注:本文使用的Canal為 v1.1.7 先查看目標(biāo)數(shù)據(jù)

    2024年04月10日
    瀏覽(48)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包