国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springboot RabbitMQ客戶端連接故障恢復(fù)

這篇具有很好參考價(jià)值的文章主要介紹了springboot RabbitMQ客戶端連接故障恢復(fù)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

最近做RabbitMQ故障演練發(fā)現(xiàn)RabbitMQ服務(wù)器停止后,基于springboot的消費(fèi)端不可以自動(dòng)的恢復(fù),隊(duì)列的消費(fèi)者消失,消息一直積壓到隊(duì)列中,這種情況肯定是不可接收的;通過研究源代碼找到了解決方案。

一、添加自動(dòng)恢復(fù)配置automaticRecovery
 CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactoryConfigurer.configure(factory);

//設(shè)置TCP連接超時(shí)時(shí)間,默認(rèn):60000ms
factory.getRabbitConnectionFactory().setConnectionTimeout(properties.getConnectionTimeout());
//啟用或禁用連接自動(dòng)恢復(fù),默認(rèn):false
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(properties.isAutomaticRecovery());
//設(shè)置連接恢復(fù)時(shí)間間隔,默認(rèn):5000ms
factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(properties.getNetworkRecoveryInterval());
//啟用或禁用拓?fù)浠謴?fù),默認(rèn):true【拓?fù)浠謴?fù)功能可以幫助消費(fèi)者重新聲明之前定義的隊(duì)列、交換機(jī)和綁定等拓?fù)浣Y(jié)構(gòu)】
factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(properties.isTopologyRecovery());
//替換默認(rèn)異常處理DefaultExceptionHandler
factory.getRabbitConnectionFactory().setExceptionHandler(new DefaultMqExceptionHandler());
//添加連接監(jiān)聽器
factory.addConnectionListener(new DefaultMqConnectionListener(factory));

通過上述配置如果RabbitMQ服務(wù)器發(fā)生故障,則會(huì)自動(dòng)重啟恢復(fù)連接及隊(duì)列的消費(fèi)者,如果恢復(fù)失敗則會(huì)間隔5000ms再次重試;在這里提一個(gè)問題,如果服務(wù)重試一直失敗,重試的上限是多少?帶著這個(gè)問題我們分析下源碼。

二、RabbitMQ客戶端實(shí)現(xiàn)連接的自動(dòng)恢復(fù)功能

AutorecoveringConnection#beginAutomaticRecovery是在 RabbitMQ 客戶端庫層面實(shí)現(xiàn)的連接的自動(dòng)恢復(fù)功能。當(dāng) RabbitMQ 連接出現(xiàn)故障時(shí),它會(huì)嘗試重新建立連接,以確保消息傳遞的可靠性。

  private synchronized void beginAutomaticRecovery() throws InterruptedException {
        //獲取故障恢復(fù)連接的間隔時(shí)間,實(shí)際是設(shè)置的:networkRecoveryInterval
        final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
        if (delay > 0)  {
           //等待指定的間隔時(shí)間
            this.wait(delay);
        }
				//調(diào)用恢復(fù)通知監(jiān)聽器
        this.notifyRecoveryListenersStarted();
				//獲取恢復(fù)建立的連接對象
        final RecoveryAwareAMQConnection newConn = this.recoverConnection();
        //如果為null則直接返回
        if (newConn == null) {
            return;
        }
    //連接已經(jīng)恢復(fù)建立,恢復(fù)監(jiān)聽器、channel等資源
        LOGGER.debug("Connection {} has recovered", newConn);
        this.addAutomaticRecoveryListener(newConn);
	    this.recoverShutdownListeners(newConn);
	    this.recoverBlockedListeners(newConn);
	    this.recoverChannels(newConn);
	    // don't assign new delegate connection until channel recovery is complete
	    this.delegate = newConn;
     //判斷是否恢復(fù)拓?fù)浣Y(jié)構(gòu),如果開啟則開啟拓?fù)浣Y(jié)構(gòu)恢復(fù)
	    if (this.params.isTopologyRecoveryEnabled()) {
	        notifyTopologyRecoveryListenersStarted();
	        recoverTopology(params.getTopologyRecoveryExecutor());
	    }
		this.notifyRecoveryListenersComplete();
    }

addAutomaticRecoveryListener自動(dòng)恢復(fù)監(jiān)聽器

private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {
    final AutorecoveringConnection c = this;
    // this listener will run after shutdown listeners,
    // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
    RecoveryCanBeginListener starter = cause -> {
        try {
            if (shouldTriggerConnectionRecovery(cause)) {
                //開始自動(dòng)回復(fù)
                c.beginAutomaticRecovery();
            }
        } catch (Exception e) {
            newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
        }
    };
    synchronized (this) {
        newConn.addRecoveryCanBeginListener(starter);
    }
}

init初始化

public void init() throws IOException, TimeoutException {
    //建立連接,否則拋出異常
    this.delegate = this.cf.newConnection();
    //自動(dòng)回復(fù)監(jiān)聽器
    this.addAutomaticRecoveryListener(delegate);
}
三、消費(fèi)端實(shí)現(xiàn)消息的消費(fèi)和處理

SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run是應(yīng)用程序?qū)用鎸?shí)現(xiàn)消息的消費(fèi)和處理,它負(fù)責(zé)從RabbitMQ中接收消息并進(jìn)行相應(yīng)的邏輯處理:

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
  ...

  try {
    //消費(fèi)端初始化方法
    initialize();
    //當(dāng)消費(fèi)端是活躍狀態(tài),或者隊(duì)列非空,或者消費(fèi)端未被關(guān)閉則進(jìn)入主循環(huán)
    while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
      mainLoop();
    }
  }
  catch (InterruptedException e) {
    ...
    }
}

消費(fèi)端initialize初始化方法:

	private void initialize() throws Throwable { // NOSONAR
			try {
				redeclareElementsIfNecessary();
        //啟動(dòng)消費(fèi)端初始化
				this.consumer.start();
				this.start.countDown();
			}
			catch (QueuesNotAvailableException e) {
				if (isMissingQueuesFatal()) {
					throw e;
				}
				else {
					this.start.countDown();
          //消費(fèi)端啟動(dòng)異常等待處理
					handleStartupFailure(this.consumer.getBackOffExecution());
					throw e;
				}
			}
			catch (FatalListenerStartupException ex) {
				if (isPossibleAuthenticationFailureFatal()) {
					throw ex;
				}
				else {
					Throwable possibleAuthException = ex.getCause().getCause();
					if (!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
						throw ex;
					}
					else {
						this.start.countDown();
            //消費(fèi)端啟動(dòng)異常等待處理
						handleStartupFailure(this.consumer.getBackOffExecution());
						throw possibleAuthException;
					}
				}
			}
			catch (Throwable t) { //NOSONAR
				this.start.countDown();
        //消費(fèi)端啟動(dòng)異常等待處理
				handleStartupFailure(this.consumer.getBackOffExecution());
				throw t;
			}

			if (getTransactionManager() != null) {
				/*
				 * Register the consumer's channel so it will be used by the transaction manager
				 * if it's an instance of RabbitTransactionManager.
				 */
				ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
			}
		}

消費(fèi)端異常等待處理處理:

protected void handleStartupFailure(BackOffExecution backOffExecution) {
   //獲取等待時(shí)間間隔,參考FixedBackOff類實(shí)現(xiàn)
		long recoveryInterval = backOffExecution.nextBackOff();
		if (BackOffExecution.STOP == recoveryInterval) {
			synchronized (this) {
				if (isActive()) {
					logger.warn("stopping container - restart recovery attempts exhausted");
					stop();
				}
			}
			return;
		}
		try {
			if (logger.isDebugEnabled() && isActive()) {
				logger.debug("Recovering consumer in " + recoveryInterval + " ms.");
			}
      //當(dāng)前時(shí)間加上等待時(shí)間
			long timeout = System.currentTimeMillis() + recoveryInterval;
      //如果當(dāng)前時(shí)間小于等待時(shí)間,則休眠200毫秒,再次嘗試
			while (isActive() && System.currentTimeMillis() < timeout) {
				Thread.sleep(RECOVERY_LOOP_WAIT_TIME);
			}
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new IllegalStateException("Irrecoverable interruption on consumer restart", e);
		}
	}

FixedBackOff回退等待時(shí)間類實(shí)現(xiàn):

public class FixedBackOff implements BackOff {
   // 默認(rèn)恢復(fù)重試間隔
    public static final long DEFAULT_INTERVAL = 5000L;
   //最大重試次數(shù),可以認(rèn)為無限大
    public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;
   // 默認(rèn)恢復(fù)重試間隔
    private long interval = 5000L;
   //最大重試次數(shù),可以認(rèn)為無限大
    private long maxAttempts = Long.MAX_VALUE;

    public FixedBackOff() {
    }

    public FixedBackOff(long interval, long maxAttempts) {
        this.interval = interval;
        this.maxAttempts = maxAttempts;
    }

    public void setInterval(long interval) {
        this.interval = interval;
    }

    public long getInterval() {
        return this.interval;
    }

    public void setMaxAttempts(long maxAttempts) {
        this.maxAttempts = maxAttempts;
    }

    public long getMaxAttempts() {
        return this.maxAttempts;
    }

    public BackOffExecution start() {
        return new FixedBackOffExecution();
    }

    private class FixedBackOffExecution implements BackOffExecution {
        private long currentAttempts;

        private FixedBackOffExecution() {
            this.currentAttempts = 0L;
        }
				//獲取下一次嘗試的時(shí)間間隔,可以認(rèn)為一直都是5000ms
        public long nextBackOff() {
            ++this.currentAttempts;
            return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;
        }

        public String toString() {
            String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE ? "unlimited" : String.valueOf(FixedBackOff.this.maxAttempts);
            return "FixedBackOff{interval=" + FixedBackOff.this.interval + ", currentAttempts=" + this.currentAttempts + ", maxAttempts=" + attemptValue + '}';
        }
    }
}

總結(jié):綜上源碼分析可知消費(fèi)端故障恢復(fù)重試等待時(shí)間是5000ms,重試次數(shù)可以認(rèn)為是無限制(Long最大值)

mainloop主循環(huán)邏輯:

		private void mainLoop() throws Exception { // NOSONAR Exception
			try {
				if (SimpleMessageListenerContainer.this.stopNow.get()) {
					this.consumer.forceCloseAndClearQueue();
					return;
				}
        //接收客戶端發(fā)送過來的消息,至少獲取一條
				boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
				if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
					checkAdjust(receivedOk);
				}
				long idleEventInterval = getIdleEventInterval();
				if (idleEventInterval > 0) {
					if (receivedOk) {
						updateLastReceive();
					}
					else {
						long now = System.currentTimeMillis();
						long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
						long lastReceive = getLastReceive();
						if (now > lastReceive + idleEventInterval
								&& now > lastAlertAt + idleEventInterval
								&& SimpleMessageListenerContainer.this.lastNoMessageAlert
								.compareAndSet(lastAlertAt, now)) {
							publishIdleContainerEvent(now - lastReceive);
						}
					}
				}
			}
			catch (ListenerExecutionFailedException ex) {
				// Continue to process, otherwise re-throw
				if (ex.getCause() instanceof NoSuchMethodException) {
					throw new FatalListenerExecutionException("Invalid listener", ex);
				}
			}
			catch (AmqpRejectAndDontRequeueException rejectEx) {
				/*
				 *  These will normally be wrapped by an LEFE if thrown by the
				 *  listener, but we will also honor it if thrown by an
				 *  error handler.
				 */
			}
		}

receiveAndExecute接收和處理消息:

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR

		PlatformTransactionManager transactionManager = getTransactionManager();
		if (transactionManager != null) {
			try {
				if (this.transactionTemplate == null) {
					this.transactionTemplate =
							new TransactionTemplate(transactionManager, getTransactionAttribute());
				}
				return this.transactionTemplate
						.execute(status -> { // NOSONAR null never returned
							RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
									new RabbitResourceHolder(consumer.getChannel(), false),
									getConnectionFactory(), true);
							// unbound in ResourceHolderSynchronization.beforeCompletion()
							try {
                //接收處理消息
								return doReceiveAndExecute(consumer);
							}
							catch (RuntimeException e1) {
								prepareHolderForRollback(resourceHolder, e1);
								throw e1;
							}
							catch (Exception e2) {
								throw new WrappedTransactionException(e2);
							}
						});
			}
			catch (WrappedTransactionException e) { // NOSONAR exception flow control
				throw (Exception) e.getCause();
			}
		}
		//接收處理消息
		return doReceiveAndExecute(consumer);

	}

調(diào)用具體的消息監(jiān)聽器消費(fèi)消息:

	private void doExecuteListener(Channel channel, Object data) {
		if (data instanceof Message) {
			Message message = (Message) data;
			if (this.afterReceivePostProcessors != null) {
				for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
					message = processor.postProcessMessage(message);
					if (message == null) {
						throw new ImmediateAcknowledgeAmqpException(
								"Message Post Processor returned 'null', discarding message");
					}
				}
			}
			if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {
				this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));
			}
			else {
				invokeListener(channel, message);
			}
		}
		else {
			invokeListener(channel, data);
		}
	}

GitHub代碼:https://github.com/mingyang66/spring-parent文章來源地址http://www.zghlxwxcb.cn/news/detail-672700.html

到了這里,關(guān)于springboot RabbitMQ客戶端連接故障恢復(fù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包