国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ入門(mén)指南

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ入門(mén)指南。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

RabbitMQ 的作用

  • 提供了系統(tǒng)之間的異步調(diào)用,比如一個(gè)支付功能,用戶在支付完成之后,會(huì)去數(shù)據(jù)庫(kù)中執(zhí)行后續(xù)操作,然后更新支付狀態(tài),會(huì)生成訂單信息,如果后續(xù)還需要添加功能,就需要去業(yè)務(wù)邏輯中修改代碼,這樣就會(huì)出現(xiàn)業(yè)務(wù)耦合。同時(shí)想要執(zhí)行后續(xù)操作,需要等待支付功能完成,在此等待過(guò)程中會(huì)耗費(fèi)時(shí)間,CPU空轉(zhuǎn),性能比較差。當(dāng)業(yè)務(wù)中有操作失敗,就會(huì)將全部操作回滾。如果下一個(gè)操作依賴上一個(gè)操作時(shí)就需要用到同步操作。但是后續(xù)的很多業(yè)務(wù)操作只需要知道支付成功之后就去執(zhí)行,不需要等待其他業(yè)務(wù)執(zhí)行完成之后再去執(zhí)行。同步操作的時(shí)效性強(qiáng),但是拓展性差,并且性能下降還會(huì)出現(xiàn)級(jí)聯(lián)失敗等問(wèn)題。
  • 異步調(diào)用的方式就是基于消息通知的方式,其中有三個(gè)角色:消息發(fā)送者、消費(fèi)代理、消息接收者。微信消息發(fā)送、送外賣。支付服務(wù)就不在同步調(diào)用業(yè)務(wù)關(guān)聯(lián)度低的服務(wù),而是發(fā)送消息通知Broker,這樣做具有以下優(yōu)勢(shì)
    • 解除耦合,拓展性強(qiáng)。
    • 無(wú)需等待,性能好。
    • 故障隔離:當(dāng)某一個(gè)業(yè)務(wù)接收服務(wù)宕機(jī),其他的服務(wù)可以正常執(zhí)行,這個(gè)服務(wù)重連之后只需要去MQ中去獲取數(shù)據(jù)就行。
    • 緩存消息,削峰填谷作用:當(dāng)突然有大量的支付請(qǐng)求過(guò)來(lái)后,不會(huì)第一時(shí)間去沖擊數(shù)據(jù)庫(kù),而是存放在MQ中,根據(jù)業(yè)務(wù)處理的速度自己去取,業(yè)務(wù)服務(wù)壓力就很小。
  • 異步調(diào)用的問(wèn)題:
    • 不能立刻得到調(diào)用結(jié)果,時(shí)效性差。
    • 不確定下游業(yè)務(wù)是否執(zhí)行成功。
    • 業(yè)務(wù)安全依賴于Broker的可靠性。

RabbitMQ入門(mén)指南,消息隊(duì)列,java-rabbitmq,rabbitmq,java

為什么使用RabbitMQ

MQ就是MessageQueue,存放消息的隊(duì)列,也就是異步調(diào)用中的Broker。

在日常開(kāi)發(fā)過(guò)程中,常見(jiàn)的消息隊(duì)列有四種,RabbitMQ、ActiveMQ、RocketMQ、Kafka。 這四中的對(duì)比性下圖可以看到,其中RabbitMQ是Rabbit公司專門(mén)研究的,相較于其他消息中間件它支持SMTP協(xié)議,并且它的消息延遲更是達(dá)到了恐怖的微秒級(jí)。當(dāng)然它的消息可靠性以及可用性也是非常高的,所以一般項(xiàng)目開(kāi)發(fā)沒(méi)有特殊要求都是使用的是RabbitMQ。

RabbitMQ入門(mén)指南,消息隊(duì)列,java-rabbitmq,rabbitmq,java

數(shù)據(jù)隔離

交換機(jī)和隊(duì)列都有自己的VirtualHost,不同的VirtualHost都有自己不同的交換機(jī)和隊(duì)列。一個(gè)MQ中可以有多個(gè)VirtualHost,在發(fā)消息的時(shí)候去連接對(duì)應(yīng)的VirtualHost就行。每個(gè)user可以去操作自己創(chuàng)建的的VirtualHost,查看的話時(shí)根據(jù)管理員創(chuàng)建user時(shí)分配的權(quán)限決定。

SpringAMQP

		<!--RabbitMQ-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		
spring:	 
  rabbitmq:
    host:  # 主機(jī)名
    port: 5672 # 端口
    virtual-host: / # 虛擬主機(jī)
    username:  # 用戶名
    password:  # 密碼

加入RabbitMQ依賴,在yml文件中配置,然后通過(guò)RabbitTemple向隊(duì)列中發(fā)送消息。

work模式

默認(rèn)情況下,RabbitMQ會(huì)將消息依次輪詢投遞給綁定在隊(duì)列上的每一個(gè)消費(fèi)者。并沒(méi)有考慮到消費(fèi)者是否已經(jīng)處理完消息,這種情況可能出現(xiàn)的問(wèn)題就是當(dāng)我們不知道消費(fèi)者消費(fèi)能力的時(shí)候容易出現(xiàn)消息堆積。比如此時(shí)有兩個(gè)消費(fèi)者,消費(fèi)者a一秒鐘可以處理50條數(shù)據(jù),消費(fèi)者b一秒鐘只能處理5條數(shù)據(jù),此時(shí)有1000條消息發(fā)送到隊(duì)列中,一次輪詢綁定消費(fèi)者,每一個(gè)消費(fèi)者綁定了500個(gè)數(shù)據(jù),但是消費(fèi)者a10秒鐘就處理完成,此時(shí)消費(fèi)者b還在處理消息,a此時(shí)就空閑著,可用性比較低。

因此我們需要在yml中設(shè)置prefetch值為1,確保同一時(shí)刻最多投遞給消費(fèi)者1條消息,處理完之后才能獲取下一條消息。

  Rabbitmq:
	listener:
      simple:
        prefetch: 1

交換機(jī)

交換機(jī)主要分為三種類型:Fanout(廣播)、Direct(定向)、Topic(話題)。

  • Fanout:Fanout Exchange會(huì)將接收到的消息廣播到每一個(gè)跟其綁定的queue中,所以也叫廣播模式。
  • Direct:Direct Exchange會(huì)將接收到的消息根據(jù)規(guī)則路由到指定的Queue,因此稱為定向路由。
    • 每一個(gè)Queue都與Exchange設(shè)置一個(gè)BindingKey。
    • 發(fā)布者發(fā)布消息時(shí),指定消息的RoutingKey。
    • Exchange將消息路由到BindingKey與消息BindingKey一致的隊(duì)列。
  • Topic:TopicExchange與DirectExchange類似,特殊之處在于
    • routingKey可以是多個(gè)單詞的列表,并且以 . 分割。
    • Queue與Exchange指定BinddingKey時(shí)可以使用通配符:
      • #:代指0個(gè)或多個(gè)單詞。
      • *:代指一個(gè)單詞。

如何聲明隊(duì)列和交換機(jī)

1. Spring AMQP提供了幾個(gè)類,用來(lái)聲明隊(duì)列、交換機(jī)以及其綁定關(guān)系。

  • Queue:用于聲明隊(duì)列,可以用工廠類QueueBuilder構(gòu)建。
  • Exchange:用于聲明交換機(jī),可以用工廠類ExchangeBuilder構(gòu)建。
  • Binding:用于聲明隊(duì)列和交換機(jī)的綁定關(guān)系,可以用工廠類BindingBuilder構(gòu)建。
  	@Bean
    public FanoutExchange fanoutExchange(){
//        ExchangeBuilder.fanoutExchange("").build();
        return new FanoutExchange("shuqg.fanout2");
    }

    @Bean
    public Queue fanoutQueue3(){
//        QueueBuilder.durable("").build();
        return new Queue("shuqg.queue3");
    }

    @Bean
    public Binding fanoutBinging3(Queue fanoutqueue3, FanoutExchange fanoutExchange){
        // 如果需要綁定bindingkey在后面.with("")
        return BindingBuilder.bind(fanoutqueue3).to(fanoutExchange);
    }

2. 基于注解聲明

 	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2", durable = "true"),
            exchange = @Exchange(name = "shuqg.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) throws InterruptedException {
        System.out.println("消費(fèi)者2 收到了 direct.queue2的消息:【" +msg+ "】");
        Thread.sleep(200);
    }

消息轉(zhuǎn)換器

  • Spring對(duì)消息處理默認(rèn)實(shí)現(xiàn)的是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

  • 但是存在以下問(wèn)題,JDK的序列化有安全風(fēng)險(xiǎn)、JDK序列化的消息太大、JDK序列化的消息可讀性差。

    • 在傳輸put類型消息的時(shí)候,RabbitMQ默認(rèn)會(huì)將消息序列化轉(zhuǎn)換為字節(jié)碼,但是可讀性非常差,原本非常短的一個(gè)消息變得非常大,并且有亂碼風(fēng)險(xiǎn)。
  • 建議采用JSON序列化代替默認(rèn)序列化,在SpringAMQP中有JSON的接口,只不過(guò)沒(méi)有生效,我們只需要引入JSON依賴。

		<!--JSON-->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
		</dependency>

然后在publisher和consumer中都要配置MessageConverter

	@Bean
	public MessageConverter jacksonMessageConvertor(){
		return new Jackson2JsonMessageConverter();
	}

生產(chǎn)者重連

有時(shí)候由于網(wǎng)絡(luò)波動(dòng),可能會(huì)出現(xiàn)客戶端連接MQ失敗的情況。我們可以通過(guò)配置開(kāi)啟失敗后的重連機(jī)制。 當(dāng)網(wǎng)絡(luò)不穩(wěn)定時(shí),使用重試機(jī)制可以有效提高消息發(fā)送成功概率。不過(guò)SpringAMQP消息重試機(jī)制是阻塞式的重試,也就是多次重試等待過(guò)程中,線程是被阻塞的,會(huì)影響業(yè)務(wù)性能。如果對(duì)業(yè)務(wù)性能有要求,建議禁用重試機(jī)制,如果要使用就合理配置等待時(shí)長(zhǎng)和重試次數(shù),也可以考慮使用異步線程來(lái)執(zhí)行發(fā)送消息的代碼。

  rabbitmq:
	template:
      retry:
        enabled: true # 開(kāi)啟超時(shí)重試機(jī)制
        initial-interval: 1000ms # 失敗后的初始等待時(shí)間
        multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multiplier
        max-attempts: 3 # 最大重試次數(shù)

生產(chǎn)者確認(rèn)

RabbitMQ有Publisher Confirm和Publisher Return兩種確認(rèn)機(jī)制。開(kāi)啟確認(rèn)機(jī)制后,在MQ成功發(fā)送消息后返回確認(rèn)消息給生產(chǎn)者。

  • 消息投遞到了交換機(jī),但是路由失敗。此時(shí)會(huì)通過(guò)PublisherReturn返回路由異常的原因,然后返回ACK,告知投遞成功,此時(shí)消息成功發(fā)送到了交換機(jī)中,但是路由失敗的原因可能是交換機(jī)沒(méi)有關(guān)聯(lián)隊(duì)列或者交換機(jī)沒(méi)有BindingKey與隊(duì)列相匹配。
  • 如果臨時(shí)消息(未開(kāi)啟持久化non durable)投遞到了MQ,并且入隊(duì)成功,返回ack,表示投遞成功。
  • 如果持久消息(開(kāi)啟了持久化durable)投遞到了MQ,并且入隊(duì)完成持久化,返回ack,表示投遞成功。
  • 其他情況都會(huì)返回nack,出現(xiàn)nack可能的情況有
    • 如果消息投遞到交換機(jī)失敗,會(huì)通過(guò)Publisher Confirm返回nack,表示消息投遞失敗,這種情況一般很少發(fā)生,如果發(fā)生就要不是代碼寫(xiě)的有問(wèn)題,要不就是交換機(jī)的配置有問(wèn)題。
    • 消息投遞到隊(duì)列時(shí)隊(duì)列已滿。

配置生產(chǎn)者確認(rèn)機(jī)制

  rabbitmq:
	publisher-confirm-type: correlated # 開(kāi)啟publisher-confirm機(jī)制,并設(shè)置confirm類型
	# 這里有三種參數(shù),默認(rèn)none關(guān)閉,其次simple是同步阻塞等待MQ回執(zhí)消息,然后是correlated是MQ異步回調(diào)方式返回回執(zhí)消息。
    publisher-returns: true # 開(kāi)啟publisher return機(jī)制

每一個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目啟動(dòng)過(guò)程中配置

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 設(shè)置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",
                    replyCode, replyText,exchange, routingKey, message.toString());
        });
    }
}

生產(chǎn)者發(fā)送消息

    @Test
    void testConfirmCallback() throws InterruptedException {
        // 創(chuàng)建cd
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        // 添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息回調(diào)失敗", ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback回執(zhí)");
                if (result.isAck()) {
                    // 消息發(fā)送成功
                    log.debug("消息發(fā)送成功,返回ack");
                }else{
                    // 消息發(fā)送失敗
                    log.error("消息發(fā)送失敗,返回nack,原因{}", result.getReason());
                }
            }
        });
        rabbitTemplate.convertAndSend("shuqg.direct", "red","hello", cd);
        Thread.sleep(2000);
    }

生產(chǎn)者確認(rèn)需要額外的網(wǎng)絡(luò)和系統(tǒng)資源開(kāi)銷,盡量不要使用。如果一定要使用,無(wú)需開(kāi)啟Publisher-Return機(jī)制,因?yàn)橐话懵酚墒∈亲约簶I(yè)務(wù)的問(wèn)題。對(duì)于nack消息可以設(shè)置有限的重試次數(shù),依然失敗則記錄異常消息到日志中。

MQ持久化

RabbitMQ如何保證消息可靠性

  • 首先通過(guò)配置可以讓交換機(jī)、隊(duì)列、以及發(fā)送的消息都持久化。這樣隊(duì)列中的消息會(huì)持久化到磁盤(pán),MQ重啟消息依然存在。
  • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后會(huì)稱為隊(duì)列的默認(rèn)模式。LazyQueue會(huì)將所有消息都持久化
  • 開(kāi)啟持久化和生產(chǎn)者確認(rèn)時(shí),RabbitMQ只有在消息持久化完成后才會(huì)給生產(chǎn)者返回ACK回執(zhí)

在默認(rèn)情況下,RabbitMQ會(huì)將接收到的消息保存到內(nèi)存中以降低消息收發(fā)的延遲。這樣會(huì)導(dǎo)致兩個(gè)問(wèn)題:

  • 一旦MQ宕機(jī),內(nèi)存中的消息會(huì)丟失。
  • 內(nèi)存空間有限,當(dāng)消費(fèi)者故障或處理過(guò)慢,會(huì)導(dǎo)致消息積壓,引發(fā)MQ阻塞。
  • 可以通過(guò)數(shù)據(jù)持久化(mq3.6以前)和Lazy Queue(mq3.6以后)去解決這兩個(gè)問(wèn)題。

數(shù)據(jù)持久化

  • 交換機(jī)持久化:創(chuàng)建交換機(jī)的時(shí)候如果勾選Transient就是臨時(shí)的,但是我們平時(shí)都是勾選Durable是要保證交換機(jī)持久化的,mq重啟之后交換機(jī)不會(huì)消失。
  • 隊(duì)列持久化:創(chuàng)建隊(duì)列的時(shí)候也是同理,默認(rèn)的是Druable持久化的,不然mq重啟之后隊(duì)列就會(huì)丟失。在Spring中創(chuàng)建交換機(jī)和隊(duì)列的時(shí)候默認(rèn)就是持久化的。

LazyQueue

從mq3.6之后開(kāi)始增加了LazyQueue的概念,也就是惰性隊(duì)列。惰性隊(duì)列有以下特點(diǎn):

  • 接收到消息之后直接存入磁盤(pán)而非內(nèi)存(內(nèi)存中只保留最近的消息,默認(rèn)2048條)
  • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存。
  • 支持?jǐn)?shù)百萬(wàn)條的數(shù)據(jù)存儲(chǔ)。
  • 在3.12版本之后,所有的隊(duì)列都是LazyQueue模式,無(wú)法更改。

消費(fèi)者的可靠性

1. 消費(fèi)者確認(rèn)機(jī)制

為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement),當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)。

  • ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息。
  • nack:消息處理失敗,RabbitMQ重新發(fā)送消息。
  • reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息。

開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由Spring確認(rèn)消息處理成功后返回ack。開(kāi)啟消費(fèi)者確認(rèn)機(jī)制,RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,當(dāng)消費(fèi)者處理消息之后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)之后才會(huì)去刪除該消息。 SpringAMQP中允許配置三種確認(rèn)模式:

  rabbitmq:
	listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none, manual手動(dòng), auto自動(dòng)
  • none:默認(rèn)情況,不處理,即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用。
  • manual:手動(dòng)模式,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack或reject,存在業(yè)務(wù)入侵,但是更靈活。
  • auto(一般選擇這種):自動(dòng)模式,由Spring監(jiān)聽(tīng)listener代碼是否出現(xiàn)異常,當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack.當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
    • 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack。
    • 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject。
  • 當(dāng)消費(fèi)者異常返回時(shí),我們可以開(kāi)啟消費(fèi)者失敗重試機(jī)制,利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,設(shè)置重試次數(shù),多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。

2. 消費(fèi)失敗問(wèn)題

當(dāng)消費(fèi)者出現(xiàn)異常后,會(huì)不斷requeue(重新入隊(duì)到隊(duì)列),再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無(wú)限循環(huán),導(dǎo)致mq消息處理飆升,帶來(lái)不必要的壓力。 我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。

  • 消息失敗后處理策略:在開(kāi)始重試模式后,默認(rèn)情況下報(bào)錯(cuò)三次,也就是重試三次就會(huì)放棄,此時(shí)需要使用MessageRecoverer接口來(lái)處理,包含三種實(shí)現(xiàn)
    • RejectAndDontRequeueRecoverer:重試耗盡后,直接丟棄消息。默認(rèn)的就是這種方式。
    • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)。
    • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定交換機(jī)。
      • 首先將失敗處理策略改為第三種RepublishMessageRecoverer。
      • 然后定義接收失敗消息的交換機(jī)、隊(duì)列及其綁定關(guān)系。
      • 然后定義RepublishMessageRecoverer。

3. 業(yè)務(wù)冪等性

冪等性是一個(gè)數(shù)學(xué)概念,就是f(x) = f(f(x)),在程序開(kāi)發(fā)中,指的是同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。重復(fù)消費(fèi)問(wèn)題。

  • 查詢刪除這些業(yè)務(wù)天生就是冪等的,新增修改這些業(yè)務(wù)就不是冪等的。

  • 給每一個(gè)消息都設(shè)置一個(gè)唯一id,利用id判斷是否重復(fù)消費(fèi)

    • 每一個(gè)消息都生成一個(gè)唯一id,與消息一起投遞給消費(fèi)者。

    • 消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息id保存到數(shù)據(jù)庫(kù)中。

    • 如果下次又收到相同的消息,去數(shù)據(jù)庫(kù)查詢判斷是否存在,存在則為重復(fù)消息,放棄處理。

    • 使用自帶的Jackson2JsonMessageConverter,可以實(shí)現(xiàn)自動(dòng)生成唯一id,當(dāng)將CreateMessageIds設(shè)置為true,底層會(huì)自動(dòng)創(chuàng)建唯一id,并返回。

    • 	@Bean
       	public MessageConverter jacksonMessageConverter(){
             // 定義消息轉(zhuǎn)換器
       		Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
             // 配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于id判斷是否重復(fù)消息
       		jjmc.setCreateMessageIds(true);
       		return jjmc;
       	}
      
  • 業(yè)務(wù)判斷

    • 結(jié)合業(yè)務(wù)邏輯,基于業(yè)務(wù)本身做判斷。以支付修改訂單業(yè)務(wù)為例,我們要在支付后修改訂單狀態(tài)為已支付,應(yīng)該在修改訂單狀態(tài)前先查詢訂單狀態(tài),判斷狀態(tài)是否未支付。只有未支付訂單才需要修改,其他狀態(tài)不做處理。

如何保證支付服務(wù)與交易服務(wù)之間的訂單狀態(tài)一致性

使用MQ完成訂單狀態(tài)同步->為了保證mq可靠,使用了生產(chǎn)者確認(rèn),消費(fèi)者確認(rèn),生產(chǎn)者重試,同時(shí)開(kāi)啟mq持久化,最后做了冪等性判斷。

如何保證消息不丟失

  • 可能導(dǎo)致消息丟失的場(chǎng)景:生產(chǎn)者發(fā)送消息沒(méi)有到達(dá)交換機(jī)或者沒(méi)有到達(dá)隊(duì)列,MQ宕機(jī),消費(fèi)者服務(wù)宕機(jī)。

  • 開(kāi)啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列。RabbitMQ中提供了一個(gè)確認(rèn)機(jī)制用來(lái)避免消息發(fā)送到MQ過(guò)程中丟失,消息發(fā)送到MQ之后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。

    • 如果消息發(fā)送到交換機(jī)失敗,交換機(jī)會(huì)返回一個(gè)nack,如果是發(fā)送到MQ失敗會(huì)返回一個(gè)ack。
    • 消息失敗之后,回調(diào)方法重新發(fā)送消息,如果還是失敗,可以記錄到日志中通過(guò)查看日志進(jìn)行補(bǔ)充,或者將失敗的消息記錄到數(shù)據(jù)庫(kù)中,做一個(gè)定時(shí)發(fā)送任務(wù),發(fā)送成功之后刪除數(shù)據(jù)庫(kù)中的數(shù)據(jù)。
  • 開(kāi)啟消息持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失。MQ默認(rèn)是在內(nèi)存中存儲(chǔ)消息,開(kāi)啟持久化功能可以將數(shù)據(jù)存儲(chǔ)在磁盤(pán)上,即使MQ宕機(jī)或重啟也不會(huì)丟失數(shù)據(jù)。

    • 持久化交換機(jī)
    • 持久化隊(duì)列
    • 持久化消息
  • 開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由Spring確認(rèn)消息處理成功后返回ack。

  • 開(kāi)啟消費(fèi)者確認(rèn)機(jī)制,RabbitMQ支持消費(fèi)者確認(rèn)機(jī)制,當(dāng)消費(fèi)者處理消息之后可以向MQ發(fā)送ack回執(zhí),MQ收到ack回執(zhí)之后才會(huì)去刪除該消息。SpringAMQP中允許配置三種確認(rèn)模式:

      rabbitmq:
    	listener:
          simple:
            prefetch: 1
            acknowledge-mode: none # none, manual手動(dòng), auto自動(dòng)
    
    • none:默認(rèn)情況,不處理,即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用。
    • manual:手動(dòng)模式,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack或reject,存在業(yè)務(wù)入侵,但是更靈活。
    • auto(一般選擇這種):自動(dòng)模式,由Spring監(jiān)聽(tīng)listener代碼是否出現(xiàn)異常,當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack.當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
      • 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack。
      • 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject。
    • 當(dāng)消費(fèi)者異常返回時(shí),我們可以開(kāi)啟消費(fèi)者失敗重試機(jī)制,利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,設(shè)置重試次數(shù),多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理。

消息重復(fù)消費(fèi)問(wèn)題

  • 重復(fù)消費(fèi)發(fā)生的地方:在消費(fèi)者消費(fèi)隊(duì)列中的消息的時(shí)候會(huì)向隊(duì)列中返回ack,此時(shí)如果因?yàn)榫W(wǎng)絡(luò)問(wèn)題或者隊(duì)列宕機(jī),沒(méi)有收到消費(fèi)者的ack,重連之后會(huì)重試機(jī)制導(dǎo)致重復(fù)消費(fèi)問(wèn)題。
  • 解決方法:每條消息設(shè)置一個(gè)唯一的標(biāo)識(shí)id,當(dāng)消費(fèi)者接收到消息時(shí)去校驗(yàn)這個(gè)業(yè)務(wù)id是否存在,根據(jù)這個(gè)id去表中查詢,如果id不存在則正常去接收消息,如果id已經(jīng)存在了就證明這個(gè)消息已經(jīng)消費(fèi)過(guò)了,就不需要去消費(fèi)了,這樣就解決了重復(fù)消費(fèi)的問(wèn)題。
  • 冪等方案:分布式鎖,數(shù)據(jù)庫(kù)鎖(悲觀鎖、樂(lè)觀鎖),但是加鎖的化性能會(huì)大大降低,如果數(shù)據(jù)庫(kù)中有唯一標(biāo)識(shí)id,則優(yōu)先采用第一種方案。

RabbitMQ中死信交換機(jī)?延遲隊(duì)列了解哪些?

  • 一般使用在下單的時(shí)候,當(dāng)下單之后當(dāng)下單之后會(huì)有一個(gè)過(guò)期的時(shí)間,當(dāng)在指定時(shí)間內(nèi)未支付,就會(huì)將這個(gè)訂單銷毀。如果使用定時(shí)任務(wù),設(shè)置key value在redis中設(shè)置過(guò)期時(shí)間,我們需要定時(shí)去查詢數(shù)據(jù)庫(kù)中用戶支付狀態(tài),如果到達(dá)過(guò)期時(shí)間還沒(méi)有支付,就會(huì)刪除訂單表,這個(gè)時(shí)候,如果設(shè)置時(shí)間間隔較短,對(duì)數(shù)據(jù)庫(kù)的壓力會(huì)非常巨大,但是如果設(shè)置間隔時(shí)間較長(zhǎng),就會(huì)導(dǎo)致時(shí)效性較差。

  • 延遲隊(duì)列就是進(jìn)入隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列,我們當(dāng)時(shí)的某一個(gè)業(yè)務(wù)使用到了延遲隊(duì)列(超時(shí)訂單、限時(shí)優(yōu)惠、定時(shí)發(fā)布。。)

  • 其中延遲隊(duì)列就用到了死信交換機(jī)和TTL實(shí)現(xiàn)的。

  • 當(dāng)隊(duì)列中的消息滿足下面情況之一,就可以成為死信

    • 消息消費(fèi)失敗,返回nack,并且請(qǐng)求參數(shù)為false。
    • 消息超時(shí)未消費(fèi)(設(shè)置TTL)。設(shè)置TTL一般有兩種方式(哪個(gè)存活時(shí)間短以哪個(gè)為準(zhǔn))
      • 消息所在隊(duì)列設(shè)置了存活時(shí)間。
      • 消息本身設(shè)置了存活時(shí)間。
    • 要傳遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信。
  • 一般死信消息是會(huì)被直接丟棄的,但是我們可以給該隊(duì)列配置一個(gè)dead-letter-exchange屬性,指定一個(gè)交換機(jī),隊(duì)列中的死信就會(huì)投遞到該交換機(jī)中,這個(gè)交換機(jī)就是死信交換機(jī)。這個(gè)交換機(jī)也可以綁定一個(gè)隊(duì)列,死信消息可以直接從交換機(jī)投遞到該隊(duì)列中,其他消費(fèi)者可以去消費(fèi)該隊(duì)列中的消息。

  • RabbitMQ中有一個(gè)延遲隊(duì)列插件實(shí)現(xiàn)延遲隊(duì)列DelayExchange

    • 聲明一個(gè)交換機(jī),添加delayed屬性為true,這個(gè)就是一個(gè)可以實(shí)現(xiàn)延遲隊(duì)列的交換機(jī)。
    • 發(fā)送消息時(shí),通過(guò)消息頭x-delay,設(shè)置消息存活時(shí)間。

消息堆積問(wèn)題怎么解決

產(chǎn)生消息堆積的情況,當(dāng)生產(chǎn)者發(fā)送消息的速度超過(guò)了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信??赡軙?huì)被丟棄,這就是消息堆積。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-832794.html

  • 增加更多消費(fèi)者,提高消費(fèi)速度。
  • 在消費(fèi)者內(nèi)開(kāi)啟線程池加快消息處理速度。消費(fèi)者只負(fù)責(zé)去接收消息,所有的處理消息,處理業(yè)務(wù)邏輯都交給線程池去處理,但是線程池的作用是最大程度的利用CPU的資源,需要根據(jù)硬件配置去設(shè)置線程池。
  • 擴(kuò)大隊(duì)列容積,提高堆積上限,采用惰性隊(duì)列,在聲明隊(duì)列的時(shí)候可以設(shè)置屬性x-queue-mode為lazy,即為惰性隊(duì)列。使用惰性隊(duì)列的好處是:
    • 接收到消息后直接存入磁盤(pán)而非內(nèi)存,消息的上限比較高。
    • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤(pán)中讀取并加載到內(nèi)存。
    • 支持?jǐn)?shù)百萬(wàn)條消息存儲(chǔ)。
    • 性能比較穩(wěn)定,但基于磁盤(pán)存儲(chǔ),受限于磁盤(pán)IO,時(shí)效性會(huì)降低。

RabbitMQ高可用機(jī)制

  • 普通集群,又叫標(biāo)準(zhǔn)集群,這個(gè)集群中每一個(gè)節(jié)點(diǎn)都有同一個(gè)交換機(jī)的信息,每個(gè)節(jié)點(diǎn)都有不同的隊(duì)列,但是其他節(jié)點(diǎn)會(huì)有隊(duì)列的引用信息。
    • 會(huì)在集群的各個(gè)節(jié)點(diǎn)間共享部分?jǐn)?shù)據(jù),包含交換機(jī)、隊(duì)列元信息。但是不包含隊(duì)列中的消息。
    • 當(dāng)訪問(wèn)集群中某節(jié)點(diǎn)時(shí),如果隊(duì)列不在該節(jié)點(diǎn),會(huì)從數(shù)據(jù)所在節(jié)點(diǎn)傳遞到當(dāng)前節(jié)點(diǎn)并返回。
    • 隊(duì)列所在節(jié)點(diǎn)宕機(jī),隊(duì)列中的消息就會(huì)丟失。
  • 鏡像集群,本質(zhì)是主從模式
    • 交換機(jī)、隊(duì)列、隊(duì)列中的消息會(huì)在各個(gè)mq鏡像節(jié)點(diǎn)之間同步備份。
    • 創(chuàng)建隊(duì)列的節(jié)點(diǎn)是該隊(duì)列的主節(jié)點(diǎn),備份到其他節(jié)點(diǎn)的該隊(duì)列是該隊(duì)列的鏡像節(jié)點(diǎn)。
    • 鏡像隊(duì)列結(jié)構(gòu)是一主多從(從就是鏡像),所有操作都是主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn)。
    • 主節(jié)點(diǎn)宕機(jī)后,鏡像節(jié)點(diǎn)會(huì)替代成為新的主節(jié)點(diǎn)(如果在主從同步前主節(jié)點(diǎn)就已經(jīng)宕機(jī),可能會(huì)出現(xiàn)數(shù)據(jù)丟失)
  • 如果擔(dān)心出現(xiàn)數(shù)據(jù)丟失我們可以采用仲裁隊(duì)列替代鏡像隊(duì)列,與鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步,主從協(xié)議基于Raft協(xié)議,是強(qiáng)一致性的,并且使用起來(lái)也非常簡(jiǎn)單,不需要額外的配置,在聲明隊(duì)列的時(shí)候只需要指定這個(gè)是仲裁隊(duì)列即可。

到了這里,關(guān)于RabbitMQ入門(mén)指南的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • (一)「消息隊(duì)列」之 RabbitMQ 入門(mén)

    (一)「消息隊(duì)列」之 RabbitMQ 入門(mén)

    想要實(shí)現(xiàn)兩個(gè)應(yīng)用程序之間的通信,我們可以借助“消息隊(duì)列”技術(shù)。本文將介紹使用 C# 語(yǔ)言在 .NET 下實(shí)現(xiàn) RabbitMQ 消息隊(duì)列;當(dāng)然無(wú)論是哪種編程語(yǔ)言,要使用消息隊(duì)列,都需要完成以下兩個(gè)基本步驟: 下載并安裝相應(yīng)的消息隊(duì)列服務(wù)器軟件,并根據(jù)需要進(jìn)行配置; 在您的

    2024年02月04日
    瀏覽(21)
  • 消息隊(duì)列(一)-- RabbitMQ入門(mén)(1)

    消息隊(duì)列(一)-- RabbitMQ入門(mén)(1)

    核心思想:接收并轉(zhuǎn)發(fā)消息。可以把它想象成一個(gè)郵局。 producer:生產(chǎn)者 queue:隊(duì)列 consumer:消費(fèi)者 什么是消息隊(duì)列 MQ(Message Queue):本質(zhì)是隊(duì)列,F(xiàn)IFO先入先出,只不過(guò)隊(duì)列中存放的內(nèi)容是 message 而已,還是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中,

    2024年02月15日
    瀏覽(19)
  • 消息隊(duì)列(一)-- RabbitMQ入門(mén)(4)

    消息隊(duì)列(一)-- RabbitMQ入門(mén)(4)

    冪等性 消息重復(fù)消費(fèi) 消費(fèi)者在消費(fèi)MQ 中的消息時(shí),MQ 已經(jīng)把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷,故MQ 未收到確認(rèn)消息,該消息會(huì)重新發(fā)給其他消費(fèi)者,或網(wǎng)絡(luò)重新連接后再次發(fā)給該消費(fèi)者,但是實(shí)際上該消息已被消費(fèi)過(guò)了,造成消費(fèi)者重復(fù)消費(fèi)同一條消

    2024年02月16日
    瀏覽(14)
  • Message queue 消息隊(duì)列--RabbitMQ 【基礎(chǔ)入門(mén)】

    Message queue 消息隊(duì)列--RabbitMQ 【基礎(chǔ)入門(mén)】

    ? ? ? ? ? ? ? ? ? ? ????????????歡迎來(lái)到我的CSDN主頁(yè)!???? ? ? ? ? ? ? ? ? ? ? ??我是平頂山大師,一個(gè)在CSDN分享筆記的博主。???? ? ? ??推薦給大家我的博客專欄《Message queue 消息隊(duì)列--RabbitMQ 【基礎(chǔ)入門(mén)】》。???? ? ? ? ? ? ? ? ? ? ? ??如果感覺(jué)還

    2024年01月20日
    瀏覽(25)
  • RabbitMQ入門(mén) 消息隊(duì)列快速入門(mén) SpringAMQP WorkQueue 隊(duì)列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    RabbitMQ入門(mén) 消息隊(duì)列快速入門(mén) SpringAMQP WorkQueue 隊(duì)列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延遲。 1.

    2024年04月08日
    瀏覽(19)
  • RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    假設(shè)有一個(gè)業(yè)務(wù)場(chǎng)景:超過(guò)30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)? RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。 隊(duì)列有一個(gè)消息過(guò)期屬性。就像豐巢超過(guò)24小時(shí)就收費(fèi)一樣,通過(guò)設(shè)置這個(gè)屬性,超過(guò)了指定事件的消息將會(huì)被丟棄。 這個(gè)屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • RabbitMq消息模型-隊(duì)列消息

    RabbitMq消息模型-隊(duì)列消息

    基本模型(SimpleQueue)、工作模型(WorkQueue) 隊(duì)列消息特點(diǎn): 消息不會(huì)丟失 并且 有先進(jìn)先出的順序。 消息接收是有順序的,不是隨機(jī)的,僅有一個(gè)消費(fèi)者能拿到數(shù)據(jù),而且不同消費(fèi)者拿不到同一份數(shù)據(jù)。 基本模型: SimpleQueue 在上圖的模型中,有以下幾個(gè)概念: P:為生產(chǎn)

    2024年02月09日
    瀏覽(30)
  • 【RabbitMQ】消息隊(duì)列-RabbitMQ篇章

    【RabbitMQ】消息隊(duì)列-RabbitMQ篇章

    RabbitMQ是一個(gè)開(kāi)源的 遵循AMQP協(xié)議 實(shí)現(xiàn)的基于Erlang語(yǔ)言編寫(xiě),支持多種客戶端(語(yǔ)言)。用于在分布式系統(tǒng)中 存儲(chǔ)消息,轉(zhuǎn)發(fā)消息 ,具有 高可用 , 高可擴(kuò)性 , 易用性 等特征。 1.1、RabbitMQ—使用場(chǎng)景 一般場(chǎng)景 像一般的下訂單業(yè)務(wù)如下圖: 將訂單信息寫(xiě)入數(shù)據(jù)庫(kù)成功后,發(fā)

    2024年02月12日
    瀏覽(20)
  • 【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    【RabbitMQ筆記10】消息隊(duì)列RabbitMQ之死信隊(duì)列的介紹

    這篇文章,主要介紹消息隊(duì)列RabbitMQ之死信隊(duì)列。 目錄 一、RabbitMQ死信隊(duì)列 1.1、什么是死信隊(duì)列 1.2、設(shè)置過(guò)期時(shí)間TTL 1.3、配置死信交換機(jī)和死信隊(duì)列(代碼配置) (1)設(shè)置隊(duì)列過(guò)期時(shí)間 (2)設(shè)置單條消息過(guò)期時(shí)間 (3)隊(duì)列設(shè)置死信交換機(jī) (4)配置的基本思路 1.4、配置

    2024年02月16日
    瀏覽(95)
  • 消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    1、延遲隊(duì)列概念 延時(shí)隊(duì)列內(nèi)部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時(shí)屬性 上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō), 延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。 延遲隊(duì)列使用場(chǎng)景: 訂單在十分鐘之內(nèi)未支付則

    2024年02月22日
    瀏覽(20)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包