国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ如何避免丟失消息

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ如何避免丟失消息。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

消息丟失

消息從生產(chǎn)到消費(fèi),要經(jīng)歷三個(gè)階段,分別是生產(chǎn)、隊(duì)列轉(zhuǎn)發(fā)與消費(fèi),每個(gè)環(huán)節(jié)都可能丟失消息。
rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式
以下以RabbitMQ為例,來(lái)說(shuō)明各個(gè)階段會(huì)產(chǎn)生的問(wèn)題以及解決方式。在說(shuō)明之前,先回顧一下RabbitMQ的一個(gè)基本架構(gòu)圖
rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式

1. 生產(chǎn)者生產(chǎn)消息到RabbitMQ Server 消息丟失場(chǎng)景

1. 網(wǎng)絡(luò)問(wèn)題

外界環(huán)境問(wèn)題導(dǎo)致:發(fā)生網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障等造成RabbitMQ Server端收不到消息,因?yàn)樯a(chǎn)環(huán)境的網(wǎng)絡(luò)是很復(fù)雜的,網(wǎng)絡(luò)抖動(dòng),丟包現(xiàn)象很常見(jiàn),下面會(huì)講到針對(duì)這個(gè)問(wèn)題是如何解決的。

2. 代碼層面,配置層面,考慮不全導(dǎo)致消息丟失

一般情況下,生產(chǎn)者使用Confirm模式投遞消息,如果方案不夠嚴(yán)謹(jǐn),比如RabbitMQ Server 接收消息失敗后會(huì)發(fā)送nack消息通知生產(chǎn)者,生產(chǎn)者監(jiān)聽(tīng)消息失敗或者沒(méi)做任何事情,消息存在丟失風(fēng)險(xiǎn);
生產(chǎn)者發(fā)送消息到exchange后,發(fā)送的路由和queue沒(méi)有綁定,消息會(huì)存在丟失情況,下面會(huì)講到具體的例子,保證意外情況的發(fā)生,即使發(fā)生,也在可控范圍內(nèi)。

解決方案:開(kāi)啟confirm模式

首先生產(chǎn)者通過(guò)調(diào)用channel.confirmSelect方法將信道設(shè)置為confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一deliveryTag和multiple參數(shù))。

其實(shí)Confirm模式有三種方式實(shí)現(xiàn):
串行confirm模式:producer每發(fā)送一條消息后,調(diào)用waitForConfirms()方法,等待broker端confirm,如果服務(wù)器端返回false或者在超時(shí)時(shí)間內(nèi)未返回,客戶端進(jìn)行消息重傳。

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
    if (channel.waitForConfirms()) {
        System.out.println("發(fā)送成功");
    } else {
        //發(fā)送失敗這里可進(jìn)行消息重新投遞的邏輯
        System.out.println("發(fā)送失敗");
    }
}

批量confirm模式:producer每發(fā)送一批消息后,調(diào)用waitForConfirms()方法,等待broker端confirm。
問(wèn)題:一旦出現(xiàn)confirm返回false或者超時(shí)的情況時(shí),客戶端需要將這一批次的消息全部重發(fā),這會(huì)帶來(lái)明顯的重復(fù)消息數(shù)量,并且當(dāng)消息經(jīng)常丟失時(shí),批量confirm性能應(yīng)該是不升反降的。

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
}
if (channel.waitForConfirms()) {
    System.out.println("發(fā)送成功");
} else {
    System.out.println("發(fā)送失敗");
}

異步confirm模式:提供一個(gè)回調(diào)方法,broker confirm了一條或者多條消息后producer端會(huì)回調(diào)這個(gè)方法。 我們分別來(lái)看看這三種confirm模式。

    public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
        List<Object> list = new ArrayList<>();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put(DeviceConstant.COMMAND, DELETE);
        jsonObject.put(DeviceConstant.BODY, list );
        String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
        String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
        //rabbitTemplate.convertAndSend(topicExchange, routingKey, jsonObject.toJSONString());
        try {
            Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
            channel.confirmSelect();
            channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
            channel.addConfirmListener(new ConfirmListener() {
                //消息失敗處理
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
                    try {
                        Thread.sleep(3000l);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    //重發(fā)
                    channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
                }
                //消息成功處理
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
                }
            });
        } catch (Exception e) {
            log.error("sendQueue-ack-發(fā)送消息失敗:{}", ExceptionUtils.getStackTrace(e));
        }
    }

2. 隊(duì)列本身可能丟失消息

1. 消息未完全持久化,當(dāng)機(jī)器重啟后,消息會(huì)全部丟失,甚至Queue也不見(jiàn)了

僅僅持久化了Message,而Exchange,Queue沒(méi)有持久化,這個(gè)持久化是無(wú)效的。

解決方案:

交換機(jī)持久化:在聲明交換器時(shí)將 durable 設(shè)為 true。
// 參數(shù)1:交換機(jī)的名字
// 參數(shù)2:交換機(jī)的類(lèi)型,topic/direct/fanout/headers
// 參數(shù)3:是否持久化
channel.exchangeDeclare(exchangeName,exchangeType,true);
隊(duì)列持久化:在聲明隊(duì)列的時(shí)候把 durable 參數(shù)設(shè)置為true。

rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式

消息持久化:

想讓消息實(shí)現(xiàn)持久化需要在消息生產(chǎn)者推送消息的方法中修改參數(shù),MessageProperties.PERSISTENT_TEXT_PLAIN 添加這個(gè)屬性。
rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式

2. 單節(jié)點(diǎn)模式問(wèn)題,節(jié)點(diǎn)掛了,消息只存在當(dāng)前節(jié)點(diǎn)。硬盤(pán)壞了,那消息真的就無(wú)法恢復(fù)了

如果做了消息持久化方案,消息會(huì)持久化硬盤(pán),機(jī)器重啟后消息不會(huì)丟失;但是還有一個(gè)極端情況,這臺(tái)服務(wù)器磁盤(pán)突然壞了(公司遇到過(guò)磁盤(pán)問(wèn)題還是很多的),消息持久化不了,非高可用狀態(tài),這個(gè)模式生產(chǎn)環(huán)境慎重考慮。

3. 默認(rèn)的集群模式,消息只會(huì)存在與當(dāng)前節(jié)點(diǎn)中,并不會(huì)同步到其他節(jié)點(diǎn),其他節(jié)點(diǎn)也僅只會(huì)同步該節(jié)點(diǎn)的隊(duì)列結(jié)構(gòu)

rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式
上圖中的三個(gè)節(jié)點(diǎn)組成了一個(gè)RabbitMQ的集群。其中exchange是交換器,它的元數(shù)據(jù)信息(交換器名稱(chēng)、交換器屬性、綁定鍵等)在所有節(jié)點(diǎn)上都是一致的,而隊(duì)列中的實(shí)際消息數(shù)據(jù)則只會(huì)存在于所創(chuàng)建的那個(gè)節(jié)點(diǎn)上,其它節(jié)點(diǎn)只知道這個(gè)隊(duì)列的元數(shù)據(jù)信息和一個(gè)指向擁有這個(gè)消息的隊(duì)列的節(jié)點(diǎn)指針。
RabbitMQ集群會(huì)同步四種類(lèi)型的內(nèi)部元數(shù)據(jù):隊(duì)列元數(shù)據(jù)(隊(duì)列名和屬性)、交換器元數(shù)據(jù)(交換器名和屬性)、綁定鍵和虛擬機(jī)。在用戶訪問(wèn)其中任何一個(gè)rabbitmq節(jié)點(diǎn)時(shí)查詢(xún)到的queue、user、exchange和vhost等信息都是一致的。

那為什么普通集群只保持元數(shù)據(jù)同步,消息內(nèi)容卻沒(méi)同步呢?這里涉及到存儲(chǔ)空間和性能的問(wèn)題,如果保持每個(gè)節(jié)點(diǎn)都有一份消息,那會(huì)導(dǎo)致每個(gè)節(jié)點(diǎn)的空間都非常大,消息的積壓量會(huì)增加且無(wú)法通過(guò)擴(kuò)容節(jié)點(diǎn)解決積壓?jiǎn)栴}。另外如果要使每個(gè)節(jié)點(diǎn)存儲(chǔ)一份消息,對(duì)于持久化的消息而言,內(nèi)存和磁盤(pán)同步復(fù)制機(jī)制會(huì)導(dǎo)致性能受到很大影響。

工作原理

rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式
上圖中的三個(gè)節(jié)點(diǎn),其中節(jié)點(diǎn)1是數(shù)據(jù)節(jié)點(diǎn)(即實(shí)際存儲(chǔ)消息內(nèi)容的節(jié)點(diǎn))。
如果客戶端(生產(chǎn)者或消費(fèi)者)與節(jié)點(diǎn)1建立了連接,那么關(guān)于消息的收發(fā)就只在節(jié)點(diǎn)1上進(jìn)行(可以理解為簡(jiǎn)單的單機(jī)模式);

如果客戶端(消費(fèi)者)是與節(jié)點(diǎn)2或者節(jié)點(diǎn)3建立的連接,此時(shí)由于數(shù)據(jù)在節(jié)點(diǎn)1上,那么節(jié)點(diǎn)2或節(jié)點(diǎn)3只會(huì)起到一個(gè)消息轉(zhuǎn)發(fā)的作用,例如此客戶端是消費(fèi)者,那么消息將由節(jié)點(diǎn)2或節(jié)點(diǎn)3從節(jié)點(diǎn)1中拉取,再經(jīng)自身節(jié)點(diǎn)路由給消費(fèi)者端;
如果客戶端(生產(chǎn)者),那么消息先發(fā)給節(jié)點(diǎn)2或3,再路由到節(jié)點(diǎn)1的隊(duì)列中存儲(chǔ)。

一個(gè)節(jié)點(diǎn)可以是磁盤(pán)節(jié)點(diǎn)和內(nèi)存節(jié)點(diǎn),磁盤(pán)節(jié)點(diǎn)將元數(shù)據(jù)存儲(chǔ)在磁盤(pán),內(nèi)存節(jié)點(diǎn)將元數(shù)據(jù)存儲(chǔ)在內(nèi)存。
這里需要注意的是,內(nèi)存節(jié)點(diǎn)只是將元數(shù)據(jù)(比如隊(duì)列名和屬性、交換器名和屬性和虛擬機(jī)等)存儲(chǔ)在內(nèi)存,因此在對(duì)資源管理(創(chuàng)建和刪除隊(duì)列、交換器和虛擬機(jī)等)時(shí)的性能有所提升,但是對(duì)發(fā)布和訂閱的消息速率并沒(méi)有提升。
RabbitMQ要求集群中至少有一個(gè)磁盤(pán)節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)加入和離開(kāi)集群時(shí),必須通知磁盤(pán)節(jié)點(diǎn)(如果集群中唯一的磁盤(pán)節(jié)點(diǎn)崩潰了,則不能進(jìn)行創(chuàng)建隊(duì)列、創(chuàng)建交換器、創(chuàng)建綁定、添加用戶、更改權(quán)限、添加和刪除集群節(jié)點(diǎn))。如果唯一磁盤(pán)的磁盤(pán)節(jié)點(diǎn)崩潰,集群是可以保持運(yùn)行的,但不能更改任何東西。因此建議在集群中設(shè)置兩個(gè)磁盤(pán)節(jié)點(diǎn),只要一個(gè)即可正常操作??傊跓o(wú)法得知它們?nèi)绾问褂貌拍鼙WC最佳時(shí)建議最好都用磁盤(pán)節(jié)點(diǎn)。

總結(jié):普通集群模式并不能保證服務(wù)的高可用,因?yàn)槠渌?jié)點(diǎn)只復(fù)制了隊(duì)列和交換器等元數(shù)據(jù)信息,并沒(méi)有將真實(shí)的消息內(nèi)容復(fù)制到自身節(jié)點(diǎn)。該部署模式只解決了單節(jié)點(diǎn)的壓力問(wèn)題,但是當(dāng)數(shù)據(jù)節(jié)點(diǎn)宕機(jī)之后便無(wú)法提供服務(wù)了,消息的路由線路受到了阻隔,客戶端則無(wú)法繼續(xù)與服務(wù)交互。為了解決這個(gè)問(wèn)題,就需要此消息數(shù)據(jù)也能被復(fù)制到集群的其它節(jié)點(diǎn)中,因此rabbitmq引入了鏡像部署模式。

解決方案:鏡像部署,消息會(huì)同步到其他節(jié)點(diǎn)上,可以設(shè)置同步的節(jié)點(diǎn)個(gè)數(shù),但吞吐量會(huì)下降。

rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式
Rabbitmq的鏡像集群實(shí)際上是在普通集群的基礎(chǔ)上增加了策略,它需要先按照普通集群的方式進(jìn)行部署,部署完成之后再通過(guò)創(chuàng)建鏡像隊(duì)列的策略實(shí)現(xiàn)主備節(jié)點(diǎn)消息同步。也就是說(shuō),每個(gè)備用節(jié)點(diǎn)都有和主節(jié)點(diǎn)一樣的隊(duì)列,這個(gè)隊(duì)列是由主節(jié)點(diǎn)通過(guò)創(chuàng)建鏡像隊(duì)列所產(chǎn)生的,且這些備用節(jié)點(diǎn)能及時(shí)的同步主節(jié)點(diǎn)中隊(duì)列的入隊(duì)消息。當(dāng)消息設(shè)置了持久化時(shí),每個(gè)節(jié)點(diǎn)都有屬于自己的本地消息持久化存儲(chǔ)機(jī)制。當(dāng)消息入隊(duì)和出隊(duì)時(shí),所有關(guān)于對(duì)主節(jié)點(diǎn)的操作都會(huì)同步給備用節(jié)點(diǎn)用來(lái)更新。此集群模式在主節(jié)點(diǎn)宕機(jī)之后備用節(jié)點(diǎn)所保留的消息與主節(jié)點(diǎn)完全一致,即可實(shí)現(xiàn)高可用。

工作原理

rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式
上圖就是鏡像集群模式的實(shí)現(xiàn)流程,其中有三個(gè)節(jié)點(diǎn)(主節(jié)點(diǎn)、備節(jié)點(diǎn)1、備節(jié)點(diǎn)2)和三個(gè)鏡像隊(duì)列queue(其中備節(jié)點(diǎn)上的queue是由主節(jié)點(diǎn)鏡像生成的)。要注意的是,這里的主節(jié)點(diǎn)和備節(jié)點(diǎn)是針對(duì)某個(gè)隊(duì)列而言的,并不能認(rèn)為一個(gè)節(jié)點(diǎn)作為了所有隊(duì)列的主節(jié)點(diǎn),因?yàn)樵谡麄€(gè)鏡像集群模式下,會(huì)存在多個(gè)節(jié)點(diǎn)和多個(gè)隊(duì)列,這時(shí)候任何一個(gè)節(jié)點(diǎn)都能作為某一個(gè)隊(duì)列的鏡像主節(jié)點(diǎn),其它節(jié)點(diǎn)則成了鏡像備節(jié)點(diǎn)(例如:有A、B、C三個(gè)節(jié)點(diǎn)和Q1、Q2、Q3三個(gè)隊(duì)列,如果A作為Q1的鏡像主節(jié)點(diǎn),那么B和C就作為了Q1的鏡像備節(jié)點(diǎn),在此基礎(chǔ)上,如果B作為了Q2的鏡像主節(jié)點(diǎn),那么A和C就是Q2的鏡像備節(jié)點(diǎn))。

每一個(gè)隊(duì)列都是由兩部分組成的,一個(gè)是queue,用來(lái)接收消息和發(fā)布消息,另外還有一個(gè)BackingQueue,它是用來(lái)做本地消息持久化處理。客戶端發(fā)送給主節(jié)點(diǎn)隊(duì)列的消息和ack應(yīng)答都將會(huì)同步到其它備節(jié)點(diǎn)上。

所有關(guān)于鏡像主隊(duì)列(mirror_queue_master)的操作,都會(huì)通過(guò)組播GM的方式同步到其它備用節(jié)點(diǎn)上,這里的GM負(fù)責(zé)消息的廣播,mirror_queue_slave則負(fù)責(zé)回調(diào)處理(更新本次同步內(nèi)容),因此當(dāng)消息發(fā)送給備用節(jié)點(diǎn)時(shí),則由mirror_queue_slave來(lái)做實(shí)際處理,將消息存儲(chǔ)在queue中,如果是持久化消息則同時(shí)存儲(chǔ)在BackingQueue中。master上的回調(diào)則由coordinator來(lái)處理(發(fā)布本次同步內(nèi)容)。在主節(jié)點(diǎn)中,BackingQueue的存儲(chǔ)則是由Queue進(jìn)行調(diào)用。對(duì)于生產(chǎn)者而言,消息發(fā)送給queue之后,接著調(diào)用mirror_queue_master進(jìn)行持久化處理,之后再通過(guò)GM廣播發(fā)送本次同步消息給備用節(jié)點(diǎn),備用節(jié)點(diǎn)通過(guò)回調(diào)mirror_queue_slave同步本次消息到queue和BackingQueue;對(duì)于消費(fèi)者而言,從queue中獲取消息之后,消息隊(duì)列會(huì)等待消費(fèi)者的ack應(yīng)答,ack應(yīng)答收到之后刪除queue和BackingQueue中的該條消息,并將本次ack內(nèi)容通過(guò)GM廣播發(fā)送給備用節(jié)點(diǎn)同步本次操作。如果slave宕機(jī)了,那對(duì)于客戶端的服務(wù)提供將不會(huì)有任何影響。如果master宕機(jī)了,則其它備用節(jié)點(diǎn)就提升為master繼續(xù)服務(wù)消息不會(huì)丟失。那這其中多個(gè)備用節(jié)點(diǎn)是如何選擇其中一個(gè)來(lái)作為master的呢?這里通過(guò)選取出“最年長(zhǎng)的”節(jié)點(diǎn)作為master,因?yàn)檫@個(gè)備用節(jié)點(diǎn)相對(duì)于其它節(jié)點(diǎn)而言是同步時(shí)間最長(zhǎng)、同步狀態(tài)最好的一個(gè)節(jié)點(diǎn),但如果存在沒(méi)有任何一個(gè)slave與master完全同步的情況,那么master中未同步的消息將會(huì)丟失。

GM

GM模塊實(shí)現(xiàn)的一種可靠的組播通訊協(xié)議,該協(xié)議能夠保證組播消息的原子性,即保證組中活著的節(jié)點(diǎn)要么都收到消息要么都收不到。
它的實(shí)現(xiàn)大致為:將所有的節(jié)點(diǎn)形成一個(gè)循環(huán)鏈表,每個(gè)節(jié)點(diǎn)都會(huì)監(jiān)控位于自己左右兩邊的節(jié)點(diǎn),當(dāng)有節(jié)點(diǎn)新增時(shí),相鄰的節(jié)點(diǎn)保證當(dāng)前廣播的消息會(huì)復(fù)制到新的節(jié)點(diǎn)上;當(dāng)有節(jié)點(diǎn)失效時(shí),相鄰的節(jié)點(diǎn)會(huì)接管保證本次廣播的消息會(huì)復(fù)制到下一個(gè)節(jié)點(diǎn)。在master節(jié)點(diǎn)和slave節(jié)點(diǎn)上的這些gm形成一個(gè)group,group(gm_group)的信息會(huì)記錄在mnesia中。不同的鏡像隊(duì)列形成不同的group。消息從master節(jié)點(diǎn)對(duì)應(yīng)的gm發(fā)出后,順著鏈表依次傳送到所有的節(jié)點(diǎn),由于所有節(jié)點(diǎn)組成一個(gè)循環(huán)鏈表,master節(jié)點(diǎn)對(duì)應(yīng)的gm最終會(huì)收到自己發(fā)送的消息,這個(gè)時(shí)候master節(jié)點(diǎn)就知道消息已經(jīng)復(fù)制到所有的slave節(jié)點(diǎn)了。另外需要注意的是,每一個(gè)新節(jié)點(diǎn)的加入都會(huì)先清空這個(gè)節(jié)點(diǎn)原有數(shù)據(jù),下圖是新節(jié)點(diǎn)加入集群的一個(gè)簡(jiǎn)單模型:
rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式
消息的同步:
將新節(jié)點(diǎn)加入已存在的鏡像隊(duì)列,在默認(rèn)情況下ha-sync-mode=manual,鏡像隊(duì)列中的消息不會(huì)主動(dòng)同步到新節(jié)點(diǎn),除非顯式調(diào)用同步命令。當(dāng)調(diào)用同步命令后,隊(duì)列開(kāi)始阻塞,無(wú)法對(duì)其進(jìn)行操作,直到同步完畢。

總結(jié)

鏡像集群模式通過(guò)從主節(jié)點(diǎn)拷貝消息的方式使所有節(jié)點(diǎn)都能保留一份數(shù)據(jù),一旦主節(jié)點(diǎn)崩潰,備節(jié)點(diǎn)就能完成替換從而繼續(xù)對(duì)外提供服務(wù)。這解決了節(jié)點(diǎn)宕機(jī)帶來(lái)的困擾,提高了服務(wù)穩(wěn)定性,但是它并不能實(shí)現(xiàn)負(fù)載均衡,因?yàn)槊總€(gè)操作都要在所有節(jié)點(diǎn)做一遍,這無(wú)疑降低了系統(tǒng)性能。再者當(dāng)消息大量入隊(duì)時(shí),集群內(nèi)部的網(wǎng)絡(luò)帶寬會(huì)因此時(shí)的同步通訊被大大消耗掉,因此對(duì)于可靠性要求高、性能要求不高且消息量并不多的場(chǎng)景比較適用。如果對(duì)高可用和負(fù)載均衡都有要求的場(chǎng)景則需要結(jié)合HAProxy(實(shí)現(xiàn)節(jié)點(diǎn)間負(fù)載均衡)和keepalived(實(shí)現(xiàn)HAproxy的主備模式)中間件搭配使用,下面我們將對(duì)這種場(chǎng)景的部署進(jìn)行全流程概述。

3. 消費(fèi)端可能丟失消息

消費(fèi)端采用自動(dòng)ack機(jī)制,還沒(méi)有處理完畢,消費(fèi)端宕機(jī)。

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成了部分突然它掛掉了,會(huì)導(dǎo)致消息丟失。因?yàn)镽abbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消息標(biāo)記為刪除。

解決方案:改為手動(dòng)ack,當(dāng)消息正確處理完成后,再通知mq。消費(fèi)端處理消息異常后,回傳nack,這樣mq會(huì)把這條消息投遞到另外一個(gè)消費(fèi)端上。

消息應(yīng)答的方法
  1. Channel.basicAck(long deliveryTag, boolean multiple):用于肯定確認(rèn)。RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了

deliveryTag:該消息的index
multiple:是否批量.。true:將一次性ack所有小于deliveryTag的消息。

multiple參數(shù)解析
true 代表批量應(yīng)答
比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8
那么此時(shí) 5-8 的消息都會(huì)被確認(rèn)收到消息應(yīng)答
false 同上面相比
只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答
rabbitmq 保證數(shù)據(jù)不丟失,面試題-隊(duì)列,java-rabbitmq,rabbitmq,分布式

  1. Channel.void basicNack(long deliveryTag, boolean multiple, boolean requeue) :用于否定確認(rèn)

deliveryTag:該消息的index。
multiple:是否批量。true:將一次性拒絕所有小于deliveryTag的消息。
requeue:被拒絕的是否重新入隊(duì)列。

  1. Channel.basicReject(long deliveryTag, boolean requeue):用于否定確認(rèn) (推薦使用)

deliveryTag:該消息的index。
requeue:被拒絕的是否重新入隊(duì)列。

basicNack()和basicReject()的區(qū)別在于:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。

demo
    @RabbitHandler
    @RabbitListener(queues = RabbitMqConstant.xxx , concurrency = "1-1")
    public void receiveQueueCommonLocal(Channel channel, Message message) {
        String messageBody = new String(message.getBody());
        //System.out.println("messageBody===>"+messageBody);
        try {
            //todo 業(yè)務(wù)邏輯
            /*手動(dòng)確認(rèn)成功
             * 參數(shù):
             * deliveryTag:該消息的index
             * multiple:是否批量處理.true:將一次性ack所有小于deliveryTag的消息
             * **/
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", ExceptionUtil.getMessage(e), messageBody);
            try {
                //手動(dòng)確認(rèn)回滾 拒絕deliveryTag對(duì)應(yīng)的消息,第二個(gè)參數(shù)是否requeue,true則重新入隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列。
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }

文章來(lái)源:https://blog.51cto.com/u_15840568/5784352
https://zhuanlan.zhihu.com/p/79545722
集群:https://blog.csdn.net/weixin_43498985/article/details/122185972
消費(fèi)者ack:https://blog.csdn.net/m0_64337991/article/details/122755297
https://zhuanlan.zhihu.com/p/483289106?utm_id=0文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-690488.html

到了這里,關(guān)于RabbitMQ如何避免丟失消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ如何保證消息不丟失呢?

    RabbitMQ如何保證消息不丟失呢?

    RabbitMQ 是一個(gè)流行的消息隊(duì)列系統(tǒng),用于在分布式應(yīng)用程序之間傳遞消息。要確保消息不會(huì)丟失,可以采取以下一些措施: 持久化消息: RabbitMQ 允許你將消息標(biāo)記為持久化的。這意味著消息將被寫(xiě)入磁盤(pán),即使 RabbitMQ 服務(wù)器崩潰,也能夠在恢復(fù)后重新發(fā)送消息。要使消息持

    2024年02月07日
    瀏覽(24)
  • rabbitMQ如何保證數(shù)據(jù)不丟失

    Q: 當(dāng)訂單服務(wù)發(fā)送一條消息到rabbitMQ, rabbitMQ成功接收到了消息并保存在內(nèi)存中, 但是在倉(cāng)儲(chǔ)服務(wù)沒(méi)有拿走此消息之前, rabbitMQ宕機(jī)了. 怎么辦? A:此問(wèn)題需要考慮消息持久化(durable機(jī)制), 通過(guò)設(shè)置隊(duì)列的durable參數(shù)為true, 則當(dāng)rabbitMQ重啟之后, 會(huì)恢復(fù)之前的隊(duì)列. 它的工作原理是rab

    2024年02月15日
    瀏覽(20)
  • RabbitMQ 保證消息不丟失的幾種手段

    RabbitMQ 保證消息不丟失的幾種手段

    在使用消息隊(duì)列時(shí),面對(duì)復(fù)雜的網(wǎng)絡(luò)狀況,我們必須要考慮如何確保消息能夠正常消費(fèi)。在分析如何保證消息不丟失的問(wèn)題之前,我們需要對(duì)癥下藥,什么樣的情況會(huì)導(dǎo)致消息丟失。 在弄清消息丟失的情況之前,我們先看看一條消息從產(chǎn)生到最終消費(fèi)會(huì)經(jīng)歷哪些過(guò)程。 上面

    2024年02月08日
    瀏覽(24)
  • 消息中間件之八股面試回答篇:一、問(wèn)題概覽+MQ的應(yīng)用場(chǎng)景+RabbitMQ如何保證消息不丟失(生產(chǎn)者確認(rèn)機(jī)制、持久化、消費(fèi)者確認(rèn)機(jī)制)+回答模板

    消息中間件之八股面試回答篇:一、問(wèn)題概覽+MQ的應(yīng)用場(chǎng)景+RabbitMQ如何保證消息不丟失(生產(chǎn)者確認(rèn)機(jī)制、持久化、消費(fèi)者確認(rèn)機(jī)制)+回答模板

    目前主流的消息隊(duì)列技術(shù)(MQ技術(shù))分為RabbitMQ和Kafka,其中深藍(lán)色為只要是MQ,一般都會(huì)問(wèn)到的問(wèn)題。淺藍(lán)色是針對(duì)RabbitMQ的特性的問(wèn)題。藍(lán)紫色為針對(duì)Kafka的特性的問(wèn)題。 MQ主要提供的功能為:異步 解耦 削峰 。 展開(kāi)來(lái)講就是 異步發(fā)送(驗(yàn)證碼、短信、郵件…) MYSQL和Redi

    2024年01月24日
    瀏覽(56)
  • RabbitMQ消息丟失、消息重復(fù)消費(fèi)、消息順序性無(wú)法保證、消息積壓、一致性問(wèn)題、系統(tǒng)可用性降低等這些常見(jiàn)問(wèn)題怎么解決

    該文章專(zhuān)注于面試,面試只要回答關(guān)鍵點(diǎn)即可,不需要對(duì)框架有非常深入的回答,如果你想應(yīng)付面試,是足夠了,抓住關(guān)鍵點(diǎn) 1. 消息丟失 問(wèn)題 :在生產(chǎn)者發(fā)送消息到MQ、MQ內(nèi)部處理、消費(fèi)者接收消息的任一環(huán)節(jié)都可能導(dǎo)致消息丟失。 解決方案 : 生產(chǎn)者確認(rèn)機(jī)制 :確保消息

    2024年04月25日
    瀏覽(28)
  • 【RabbitMQ】RabbitMQ如何確認(rèn)消息被消費(fèi)、以及保證消息的冪等

    【RabbitMQ】RabbitMQ如何確認(rèn)消息被消費(fèi)、以及保證消息的冪等

    目錄 一、如何保證消息被消費(fèi) 二、如何保證消息冪等性 RabbitMQ提供了消息補(bǔ)償機(jī)制來(lái)保證消息被消費(fèi),當(dāng)一條消費(fèi)被發(fā)送后,到達(dá)隊(duì)列后發(fā)給消費(fèi)者。消費(fèi)者消費(fèi)成功后會(huì)給MQ服務(wù)器的隊(duì)列發(fā)送一個(gè)確認(rèn)消息,此時(shí)會(huì)有一個(gè)回調(diào)檢測(cè)服務(wù)監(jiān)聽(tīng)該接收確認(rèn)消息的隊(duì)列,然將消費(fèi)

    2024年02月16日
    瀏覽(24)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場(chǎng)景 1.2 MQ導(dǎo)致消息丟失 1.3 消費(fèi)者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機(jī)制 2.2 生產(chǎn)者確認(rèn)機(jī)制 2.3 實(shí)現(xiàn)生產(chǎn)者確認(rèn) 2.3.1 配置yml開(kāi)啟生產(chǎn)者確認(rèn) 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(25)
  • 如何保證RabbitMQ消息的順序性

    針對(duì)以上問(wèn)題,一個(gè)解決思路是:保證消息的唯一性,就算是多次傳輸,不要讓消息的多次消費(fèi)帶來(lái)影響;保證消息等冪性;比如:在寫(xiě)入消息隊(duì)列的數(shù)據(jù)做唯一標(biāo)示,消費(fèi)消 息時(shí),根據(jù)唯一標(biāo)識(shí)判斷是否消費(fèi)過(guò);假設(shè)你有個(gè)系統(tǒng),消費(fèi)一條消息就往數(shù)據(jù)庫(kù)里插入一條數(shù)據(jù),

    2024年02月07日
    瀏覽(21)
  • 如何保證 RabbitMQ 的消息可靠性?

    如何保證 RabbitMQ 的消息可靠性?

    項(xiàng)目開(kāi)發(fā)中經(jīng)常會(huì)使用消息隊(duì)列來(lái) 完成異步處理、應(yīng)用解耦、流量控制等功能 。雖然消息隊(duì)列的出現(xiàn)解決了一些場(chǎng)景下的問(wèn)題,但是同時(shí)也引出了一些問(wèn)題,其中使用消息隊(duì)列時(shí)如何保證消息的可靠性就是一個(gè)常見(jiàn)的問(wèn)題。 如果在項(xiàng)目中遇到需要保證消息一定被消費(fèi)的場(chǎng)景

    2024年02月07日
    瀏覽(27)
  • RabbitMQ如何保證消息的發(fā)送和接收

    一、RabbitMQ如何保證消息的發(fā)送和接收 1.ConfirmCallback方法 ConfirmCallback是一個(gè)回調(diào)接口,消息發(fā)送到broker后觸發(fā)回調(diào),確認(rèn)消息是否到達(dá)broker服務(wù)器,也就是只確認(rèn)消息是否正確到達(dá)Exchange交換機(jī)中。 2.ReturnCallback方法 通過(guò)實(shí)現(xiàn)ReturnCallback接口,啟動(dòng)消息失敗返回,此接口是在交

    2024年02月15日
    瀏覽(23)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包