国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、RabbitMQ消息可靠性概述

1、引出問題

我們先看一串代碼,并思考一下為什么要先入庫然后發(fā)MQ:

public int add(Merchant merchant) {
    int k = merchantMapper.add(merchant);
    System.out.println("aaa : "+merchant.getId());
    JSONObject title = new JSONObject();
    String jsonBody = JSONObject.toJSONString(merchant);
    title.put("type","add");
    title.put("desc","新增商戶");
    title.put("content",jsonBody);
    rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());
    return k;
}

如果先發(fā)MQ的話,如果入庫失敗,就會(huì)導(dǎo)致MQ消息無法回滾了。今天我們就好好聊一聊RabbitMQ消息可靠投遞的問題。

2、RabbitMQ消息可靠性保證的四個(gè)環(huán)節(jié)

RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞
① 消息從生產(chǎn)者發(fā)送到Broker
生產(chǎn)者把消息發(fā)送到Broker之后,如何知道自己的消息有沒有被Broker成功接收?如果Broker不給應(yīng)答,生產(chǎn)者發(fā)送的消息也無法知道成功還是失敗。

② 消息從Exchange路由到Queue
Exchange交換機(jī)維護(hù)一個(gè)與Queue的綁定列表,它的職責(zé)是分發(fā)消息。如果交換機(jī)處理消息的分發(fā)出現(xiàn)問題怎么辦?

③ 消息存儲(chǔ)在Queue
RabbitMQ的隊(duì)列有自己的數(shù)據(jù)庫(Mnesia),它是真正用來存儲(chǔ)消息的。如果還沒有消費(fèi)者來消費(fèi),消息要一直存儲(chǔ)在隊(duì)列中。隊(duì)列中的消息有沒有丟失的可能?怎么保證消息在隊(duì)列中穩(wěn)定的存儲(chǔ)呢?

④ 消費(fèi)者訂閱Queue并消費(fèi)消息
隊(duì)列是先進(jìn)先出的,消息投遞給消費(fèi)者是一條一條投遞的,如何能保證消費(fèi)者正確地消費(fèi)了消息?

二、保證生產(chǎn)者消息發(fā)送到RabbitMQ服務(wù)器

生產(chǎn)者發(fā)送RabbitMQ消息時(shí),如果遇到了網(wǎng)絡(luò)問題或者Broker的問題(硬盤故障、磁盤寫滿等),就會(huì)導(dǎo)致消息發(fā)送失敗,生產(chǎn)者無法確定Broker是否正確地接收到消息。

在RabbitMQ中提供了兩種生產(chǎn)者確認(rèn)機(jī)制,也就是說生產(chǎn)者發(fā)送消息給RabbitMQ時(shí),服務(wù)端會(huì)通過某種方式返回一個(gè)應(yīng)答,只要生產(chǎn)者收到了這個(gè)應(yīng)答,就知道消息發(fā)送成功了。

1、服務(wù)端確認(rèn):Transaction模式

在事務(wù)模式里面,只有收到了服務(wù)端的Commit-OK指令,才能提交成功。所以可以解決生產(chǎn)者和服務(wù)端確認(rèn)的問題。但是事務(wù)模式有一個(gè)缺點(diǎn),它是阻塞的,一條消息沒有發(fā)送完畢,不能發(fā)送下一條消息,它會(huì)榨干RabbitMQ服務(wù)器的性能。所以不建議在生產(chǎn)環(huán)境使用。
RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生產(chǎn)者,測(cè)試事務(wù)模式。發(fā)送消息的效率比較低,建議使用Confirm模式
 * 參考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class TransactionProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ";
        // 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
        	// 開啟事務(wù)
            channel.txSelect();
            // 發(fā)送消息,發(fā)布了4條,但只確認(rèn)了3條
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit(); // 提交
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            int i =1/0;
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            System.out.println("消息發(fā)送成功");
        } catch (Exception e) {
            channel.txRollback(); // 回滾
            System.out.println("消息已經(jīng)回滾");
        }

        channel.close();
        conn.close();
    }
}

(2)springbootAPI

rabbitTemplate.setChannelTransacted(true);

2、服務(wù)端確認(rèn):Confirm模式

Confirm模式有三種,單個(gè)確認(rèn)、批量確認(rèn)、異步確認(rèn)。

RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 普通確認(rèn)模式
 */
public class NormalConfirmProducer {

    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Normal Confirm";
        // 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 開啟發(fā)送方確認(rèn)模式
        channel.confirmSelect();

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        // 普通Confirm,發(fā)送一條,確認(rèn)一條
        if (channel.waitForConfirms()) {
            System.out.println("消息發(fā)送成功" );
        }else{
            System.out.println("消息發(fā)送失敗");
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生產(chǎn)者,測(cè)試批量Confirm模式
 */
public class BatchConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Batch Confirm";
        // 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
            // 開啟confirm模式
            channel.confirmSelect();
            for (int i = 0; i < 5; i++) {
                // 發(fā)送消息
                // String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            }
            // 批量確認(rèn)結(jié)果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被確認(rèn)了
            // 比如5條消息可能只收到1個(gè)ACK,也可能收到2個(gè)(抓包才看得到)
            // 直到所有信息都發(fā)布,只要有一個(gè)未被Broker確認(rèn)就會(huì)IOException
            channel.waitForConfirmsOrDie();
            System.out.println("消息發(fā)送完畢,批量確認(rèn)成功");
        } catch (Exception e) {
            // 發(fā)生異常,可能需要對(duì)所有消息進(jìn)行重發(fā)
            e.printStackTrace();
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * 消息生產(chǎn)者,測(cè)試異步Confirm模式
 *  參考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class AsyncConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立連接
        Connection conn = factory.newConnection();
        // 創(chuàng)建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, Async Confirm";
        // 聲明隊(duì)列(默認(rèn)交換機(jī)AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 用來維護(hù)未確認(rèn)消息的deliveryTag
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 這里不會(huì)打印所有響應(yīng)的ACK;ACK可能有多個(gè),有可能一次確認(rèn)多條,也有可能一次確認(rèn)一條
        // 異步監(jiān)聽確認(rèn)和未確認(rèn)的消息
        // 如果要重復(fù)運(yùn)行,先停掉之前的生產(chǎn)者,清空隊(duì)列
        channel.addConfirmListener(new ConfirmListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Broker未確認(rèn)消息,標(biāo)識(shí):" + deliveryTag);
                if (multiple) {
                    // headSet表示后面參數(shù)之前的所有元素,全部刪除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
                // 這里添加重發(fā)的方法
            }
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 如果true表示批量執(zhí)行了deliveryTag這個(gè)值以前(小于deliveryTag的)的所有消息,如果為false的話表示單條確認(rèn)
                System.out.println(String.format("Broker已確認(rèn)消息,標(biāo)識(shí):%d,多個(gè)消息:%b", deliveryTag, multiple));
                if (multiple) {
                    // headSet表示后面參數(shù)之前的所有元素,全部刪除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    // 只移除一個(gè)元素
                    confirmSet.remove(deliveryTag);
                }
                System.out.println("未確認(rèn)的消息:"+confirmSet);
            }
        });

        // 開啟發(fā)送方確認(rèn)模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            // 發(fā)送消息
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            confirmSet.add(nextSeqNo);
        }
        System.out.println("所有消息:"+confirmSet);

        // 這里注釋掉的原因是如果先關(guān)閉了,可能收不到后面的ACK
        //channel.close();
        //conn.close();
    }
}

(2)springbootAPI

Confirm模式是在Channel上開啟的,RabbitTemplate對(duì)Channel進(jìn)行了封裝。

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("--------收到服務(wù)端異步確認(rèn)--------");
        System.out.println("received ack: "+ack);
        System.out.println("cause: "+cause);
        System.out.println("correlationId: "+correlationData.getId());
        if (!ack) {
            System.out.println("發(fā)送消息失?。? + cause);
            throw new RuntimeException("發(fā)送異常:" + cause);
            // 做一些回滾、重試、記錄處理
        } else {
			System.out.println("消息確認(rèn)成功");
		}
    }
});

三、保證消息從交換機(jī)路由到隊(duì)列

如果routingkey錯(cuò)誤,或者隊(duì)列不存在(但是生產(chǎn)環(huán)境基本不會(huì)出現(xiàn)這么低級(jí)的錯(cuò)誤),就會(huì)導(dǎo)致消息從交換機(jī)無法路由到隊(duì)列。

我們有兩種方式處理無法路由的消息,一種是服務(wù)端回發(fā)給生產(chǎn)者,一種是讓交換機(jī)路由到另一個(gè)備份的交換機(jī)。

1、消息回發(fā)

(1)JavaAPI

channel.addReturnListener(new ReturnListener() {
	   public void handleReturn(int replyCode,
	                            String replyText,
	                            String exchange,
	                            String routingKey,
	                            AMQP.BasicProperties properties,
	                            byte[] body)
	           throws IOException {
	       System.out.println("=========監(jiān)聽器收到了無法路由,被返回的消息============");
	       System.out.println("replyText:"+replyText);
	       System.out.println("exchange:"+exchange);
	       System.out.println("routingKey:"+routingKey);
	       System.out.println("message:"+new String(body));
	   }
});

(2)SpringbootAPI

Springboot消息回發(fā)是使用mandatory參數(shù)和ReturnListener(在SPringAMQP中是ReturnCallback)

// 為RabbitTemplate設(shè)置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            System.out.println("--------收到無法路由回發(fā)的消息--------");
            System.out.println("replyCode:" + replyCode);
            System.out.println("replyText:" + replyText);
            System.out.println("exchange:" + exchange);
            System.out.println("routingKey:" + routingKey);
            System.out.println("properties:" + message.getMessageProperties());
            System.out.println("body:" + new String(message.getBody(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
});

2、消息路由到備份交換機(jī)

在創(chuàng)建交換機(jī)時(shí),從屬性中指定備份交換機(jī)。

(1)JavaAPI

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
        contentEncoding("UTF-8").build();

// 備份交換機(jī)
channel.exchangeDeclare("ALTERNATE_EXCHANGE","topic", false, false, false, null);
channel.queueDeclare("ALTERNATE_QUEUE", false, false, false, null);
channel.queueBind("ALTERNATE_QUEUE","ALTERNATE_EXCHANGE","#");

// 在聲明交換機(jī)的時(shí)候指定備份交換機(jī)
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);


// 發(fā)送到了默認(rèn)的交換機(jī)上,由于沒有任何隊(duì)列使用這個(gè)關(guān)鍵字跟交換機(jī)綁定,所以會(huì)被退回
// 第三個(gè)參數(shù)是設(shè)置的mandatory,如果mandatory是false,消息也會(huì)被直接丟棄
channel.basicPublish("TEST_EXCHANGE","error.routingKey",true, properties,"只為更好的你".getBytes());

四、保證消息在隊(duì)列中可靠存儲(chǔ)

如果消息一直沒有被消費(fèi)者消費(fèi),消息會(huì)一直存儲(chǔ)在隊(duì)列中,并且隊(duì)列中的數(shù)據(jù)存儲(chǔ)在數(shù)據(jù)庫中。

如果RabbitMQ服務(wù)或者硬件發(fā)生了故障,比如系統(tǒng)宕機(jī)、重啟、關(guān)閉等等,可能會(huì)導(dǎo)致內(nèi)存中的消息丟失,所以我們要把消息本身和元數(shù)據(jù)(隊(duì)列、交換機(jī)、綁定)都保存到磁盤中。

1、隊(duì)列持久化

聲明隊(duì)列時(shí)可以設(shè)置幾個(gè)重要的參數(shù):

// 聲明隊(duì)列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// springboot中
@Bean("reliableQueue")
public Queue queue() {
	// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    return new Queue("RELIABLE_QUEUE", true, false, false, new HashMap<>());
}

參數(shù)解析:
durable:沒有持久化的隊(duì)列,保存在內(nèi)存中,服務(wù)重啟后隊(duì)列和消息都會(huì)丟失。設(shè)置為true可以保證消息持久化
exclusive:排他性隊(duì)列的特點(diǎn)是:只對(duì)首次聲明它的連接(Connection)可見;會(huì)在其連接斷開的時(shí)候自動(dòng)刪除。
autoDelete:沒有消費(fèi)者連接的時(shí)候,自動(dòng)刪除。

2、交換機(jī)持久化

聲明交換機(jī)時(shí)可以設(shè)置這幾個(gè)重要的參數(shù),和隊(duì)列類似:

// 聲明交換機(jī)
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

// Springboot中
@Bean("directExchange")
public DirectExchange exchange() {
	// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    return new DirectExchange("RELIABLE_EXCHANGE", true, false, new HashMap<>());
}

3、消息持久化

在生產(chǎn)者發(fā)送消息時(shí),可以指定消息的配置:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)   // 2代表持久化
        .contentEncoding("UTF-8")  // 編碼
        .expiration("10000")  // TTL,過期時(shí)間
        .headers(headers) // 自定義屬性
        .priority(5) // 優(yōu)先級(jí),默認(rèn)為5,配合隊(duì)列的 x-max-priority 屬性使用
        .messageId(String.valueOf(UUID.randomUUID()))
        .build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

// springboot中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("一條正常的消息".getBytes(), messageProperties);

4、搭建集群

如果只有一個(gè)RabbitMQ節(jié)點(diǎn),即使交換機(jī)、隊(duì)列、消息做了持久化,如果服務(wù)崩潰或者硬件故障,RabbitMQ的服務(wù)一樣是不可以的。

所以為了提高M(jìn)Q服務(wù)的可用性,保證消息的傳輸,我們需要有多個(gè)RabbitMQ的節(jié)點(diǎn)。

RabbitMQ集群搭建與高可用實(shí)現(xiàn)

五、保證消費(fèi)者成功消費(fèi)消息

如果消費(fèi)者收到消息后沒來得及處理或者發(fā)生了一場(chǎng),就會(huì)導(dǎo)致消費(fèi)失敗。RabbitMQ提供了消費(fèi)者的消息確認(rèn)機(jī)制(message acknowledgement),消費(fèi)者可以自動(dòng)或者手動(dòng)地發(fā)送ACK給服務(wù)端。

如果消費(fèi)者拿到消息沒有ACK會(huì)怎么樣?
沒有收到ACK的消息,消費(fèi)者斷開連接之后,RabbitMQ會(huì)把這條消息發(fā)送給其他消費(fèi)者。如果沒有其他消費(fèi)者,消費(fèi)者重啟后會(huì)重新消費(fèi)這條消息,重復(fù)執(zhí)行業(yè)務(wù)邏輯。

1、自動(dòng)ACK

默認(rèn)就是自動(dòng)ACK。消費(fèi)者收到消息的時(shí)候就會(huì)自動(dòng)發(fā)送ACK,而不是方法執(zhí)行成功的時(shí)候發(fā)送ACK。這種情況RabbitMQ只會(huì)關(guān)心消費(fèi)者是否接收到了消息,對(duì)消息的處理結(jié)果是不關(guān)心的,所以通常來說我們會(huì)選擇手動(dòng)ACK。

// 創(chuàng)建消費(fèi)者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");
        System.out.println("consumerTag : " + consumerTag );
        System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
    }
};
// 開始獲取消息 autoAck參數(shù)為true代表自動(dòng)ACK
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);

同樣的,在Springboot中如果沒有特殊配置,默認(rèn)的就是自動(dòng)ACK。

2、手動(dòng)ACK

// 創(chuàng)建消費(fèi)者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");

        if (msg.contains("拒收")){
            // 拒絕消息
            // requeue:是否重新入隊(duì)列,true:是,會(huì)再發(fā)給其他消費(fèi)者;false:直接丟棄,相當(dāng)于告訴隊(duì)列可以直接刪除掉
            // TODO 如果只有這一個(gè)消費(fèi)者,requeue 為true 的時(shí)候會(huì)造成消息重復(fù)消費(fèi)
            // basicReject(long deliveryTag, boolean requeue)
            channel.basicReject(envelope.getDeliveryTag(), false);
        } else if (msg.contains("異常")){
            // 批量拒絕
            // requeue:是否重新入隊(duì)列
            // TODO 如果只有這一個(gè)消費(fèi)者,requeue 為true 的時(shí)候會(huì)造成消息重復(fù)消費(fèi)
            // basicNack(long deliveryTag, boolean multiple, boolean requeue)
            channel.basicNack(envelope.getDeliveryTag(), true, false);
        } else {
            // 手工應(yīng)答
            // 如果不應(yīng)答,隊(duì)列中的消息會(huì)一直存在,重新連接的時(shí)候會(huì)重復(fù)消費(fèi)
            // basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(envelope.getDeliveryTag(), true);
        }
    }
};

// 開始獲取消息,注意這里開啟了手工應(yīng)答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

在Springboot中,可以這樣配置應(yīng)答方式:

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

或者在SimpleMessageListenerContainer或者SimpleRabbitListenerContainerFactory中這樣設(shè)置:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

消費(fèi)者中獲取到Channel對(duì)象,就可以進(jìn)行應(yīng)答了:

@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.queue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msgContent,Channel channel, Message message) throws IOException {
        System.out.println("Second Queue received msg : " + msgContent );
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

注意這個(gè)枚舉值:
NONE:自動(dòng)ACK
MANUAL:手動(dòng)ACK
AUTO:如果方法未拋出異常,則發(fā)送ACK。如果方法拋出異常,并且不是AmqpRejectAndDontRequeueException則發(fā)送nack,并且重新入隊(duì)。如果拋出異常是AmqpRejectAndDontRequeueException則發(fā)送nack并不會(huì)重新入隊(duì)。

注意!拒絕消息時(shí)如果requeue參數(shù)設(shè)置為true,可以把這條消息重新存入隊(duì)列,以便發(fā)給下一個(gè)消費(fèi)者處理,如果只有一個(gè)消費(fèi)者時(shí),這種方式可能會(huì)出現(xiàn)無限循環(huán)重復(fù)消費(fèi)的情況。

六、保證消息一致之消費(fèi)者回調(diào)

消費(fèi)者成功消費(fèi)了消息之后,如何告知生產(chǎn)者該消息已經(jīng)被成功消費(fèi)了?

雖然說使用MQ的目的之一是解耦,但是某些一致性要求很高的場(chǎng)景,比如說金融業(yè)務(wù),還是很有必要通知生產(chǎn)者的。

1、調(diào)用生產(chǎn)者API

例如,提單系統(tǒng)給其他系統(tǒng)發(fā)送了保險(xiǎn)消息后,其他系統(tǒng)必須在處理完消息之后調(diào)用提單系統(tǒng)提供的API,來修改提單系統(tǒng)中這筆數(shù)據(jù)的狀態(tài)。只要API沒有被調(diào)用,數(shù)據(jù)狀態(tài)沒有被修改,提單系統(tǒng)就認(rèn)為下游系統(tǒng)沒有收到這條消息。

但是這種方式又從解耦的狀態(tài)變成了耦合狀態(tài),還是需要根據(jù)實(shí)際情況來判斷是否要采用這種方式。

2、發(fā)送響應(yīng)消息給生產(chǎn)者

例如商業(yè)銀行與人民銀行二代支付系統(tǒng)(使用IBM MQ),無論是人行收到了商業(yè)銀行的消息,還是商業(yè)銀行收到了人行的消息,都必須發(fā)送一條響應(yīng)消息(叫做回執(zhí)報(bào)文)。

整個(gè)通信的流程設(shè)計(jì)非常復(fù)雜,但是對(duì)于金融場(chǎng)景下的消息可靠性保證,是很有用的。
RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞

七、保證消息一致性之補(bǔ)償機(jī)制

如果消費(fèi)者消費(fèi)了消息之后,一直遲遲沒有通知給生產(chǎn)者,我們?nèi)绾沃老M(fèi)者已經(jīng)成功消費(fèi)了消息?
就像是銀行A給銀行B發(fā)起了一筆轉(zhuǎn)賬,銀行B一直沒有給銀行A回調(diào),此時(shí)我們?nèi)绾未_定銀行B確定是已經(jīng)處理了該消息?

此時(shí)生產(chǎn)者與消費(fèi)者之間應(yīng)該約定一個(gè)超時(shí)時(shí)間,對(duì)于超出這個(gè)時(shí)間沒有得到響應(yīng)的消息,才確定為消費(fèi)失敗,比如說5分鐘。

1、消息結(jié)果查證

假如說生產(chǎn)者5分鐘內(nèi)沒有收到消費(fèi)者消費(fèi)成功的回調(diào),生產(chǎn)者可以主動(dòng)發(fā)起一次結(jié)果查證,通過業(yè)務(wù)要素或者唯一流水號(hào),查證該業(yè)務(wù)在消費(fèi)者是否正常消費(fèi)。

2、消息重發(fā)

假如說消息一直沒有結(jié)果,就需要考慮消息重發(fā)了。

可以啟動(dòng)一個(gè)定時(shí)任務(wù),比如30秒跑一次,查詢業(yè)務(wù)表業(yè)務(wù)狀態(tài)是中間狀態(tài)的記錄,查詢出來,構(gòu)建MQ消息,重新發(fā)送。也可以單獨(dú)設(shè)計(jì)一張消息表,把本系統(tǒng)所有發(fā)送出去的消息全部異步登記,狀態(tài)是未回復(fù)的消息,進(jìn)行重發(fā)(這種方式會(huì)對(duì)數(shù)據(jù)庫性能造成一定損耗)。

重發(fā)機(jī)制也不可能一直重復(fù)發(fā),如果消費(fèi)者確實(shí)是有bug或者其他問題,如果一直重復(fù)發(fā)送會(huì)導(dǎo)致死循環(huán)了。我們可以設(shè)置一個(gè)衰減機(jī)制,第一次間隔一分鐘,第二次間隔2分鐘,最終發(fā)送三次,三次過后如果還沒有收到回復(fù),就需要將消息設(shè)置為特殊狀態(tài),進(jìn)行人工干預(yù)。

3、消息冪等性

消息重發(fā)就意味著要處理消息的冪等。

什么是服務(wù)的冪等?為什么要實(shí)現(xiàn)冪等?
接口的冪等性——詳細(xì)談?wù)劷涌诘膬绲燃唇鉀Q方案

4、最終一致性

在一些一致性很強(qiáng)的接口調(diào)用中,比如說轉(zhuǎn)賬操作,通常會(huì)在一天的業(yè)務(wù)結(jié)束之后,第二天營(yíng)業(yè)之前,生產(chǎn)者和消費(fèi)者之間會(huì)進(jìn)行一次消息對(duì)賬。生成一個(gè)對(duì)賬文件,兩者分別解析對(duì)方的文件進(jìn)行對(duì)賬,如果確實(shí)有消息不一致的情況,會(huì)通過短信或者郵件的方式通知業(yè)務(wù)人員進(jìn)行手動(dòng)處理,要么把錢退回,要么把錢補(bǔ)上。

八、消息的順序性

在RabbitMQ中,一個(gè)隊(duì)列有多個(gè)消費(fèi)者時(shí),由于不同的消費(fèi)者消費(fèi)消息的速度是不一樣的,順序無法保證。只有一個(gè)隊(duì)列僅有一個(gè)消費(fèi)者的情況才能保證順序消費(fèi)(不同的業(yè)務(wù)消息發(fā)送到不同的專用隊(duì)列)。

除非負(fù)載的場(chǎng)景,不要用多個(gè)消費(fèi)者消費(fèi)消息,可以保證消息的順序性。文章來源地址http://www.zghlxwxcb.cn/news/detail-504697.html

到了這里,關(guān)于RabbitMQ保證消息的可靠投遞,Java實(shí)現(xiàn)RabbitMQ消息的可靠投遞,Springboot實(shí)現(xiàn)RabbitMQ消息的可靠投遞的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ-保證消息可靠性

    RabbitMQ-保證消息可靠性

    消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對(duì)這些問題,RabbitMQ分別給出了

    2024年02月07日
    瀏覽(31)
  • RabbitMQ如何保證消息可靠性

    RabbitMQ如何保證消息可靠性

    目錄 1、RabbitMQ消息丟失的可能性 1.1 生產(chǎn)者消息丟失場(chǎng)景 1.2 MQ導(dǎo)致消息丟失 1.3 消費(fèi)者丟失 2、如何保證生產(chǎn)者消息的可靠性 2.1 生產(chǎn)者重試機(jī)制 2.2 生產(chǎn)者確認(rèn)機(jī)制 2.3 實(shí)現(xiàn)生產(chǎn)者確認(rèn) 2.3.1 配置yml開啟生產(chǎn)者確認(rèn) 2.3.2 定義ReturnCallback 2.3.3 定義ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    瀏覽(25)
  • RabbitMQ保證消息的可靠性

    RabbitMQ保證消息的可靠性

    消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收到消息后未消費(fèi)就宕機(jī) 針對(duì)這些問題,RabbitMQ分別給出了

    2024年02月19日
    瀏覽(23)
  • 如何保證 RabbitMQ 的消息可靠性?

    如何保證 RabbitMQ 的消息可靠性?

    項(xiàng)目開發(fā)中經(jīng)常會(huì)使用消息隊(duì)列來 完成異步處理、應(yīng)用解耦、流量控制等功能 。雖然消息隊(duì)列的出現(xiàn)解決了一些場(chǎng)景下的問題,但是同時(shí)也引出了一些問題,其中使用消息隊(duì)列時(shí)如何保證消息的可靠性就是一個(gè)常見的問題。 如果在項(xiàng)目中遇到需要保證消息一定被消費(fèi)的場(chǎng)景

    2024年02月07日
    瀏覽(27)
  • rabbitmq如何保證消息的可靠性

    RabbitMQ可以通過以下方式來保證消息的可靠性: 在發(fā)布消息時(shí),可以設(shè)置消息的delivery mode為2,這樣消息會(huì)被持久化存儲(chǔ)在磁盤上,即使RabbitMQ服務(wù)器重啟,消息也不會(huì)丟失。 可以創(chuàng)建持久化的隊(duì)列,這樣即使RabbitMQ服務(wù)器重啟,隊(duì)列也不會(huì)丟失。 在消費(fèi)者端,可以 設(shè)置手動(dòng)

    2024年01月23日
    瀏覽(27)
  • RabbitMQ 能保證消息可靠性嗎

    RabbitMQ 能保證消息可靠性嗎

    手把手教你,本地RabbitMQ服務(wù)搭建(windows) 消息隊(duì)列選型——為什么選擇RabbitMQ RabbitMQ靈活運(yùn)用,怎么理解五種消息模型 推或拉? RabbitMQ 消費(fèi)模式該如何選擇 死信是什么,如何運(yùn)用RabbitMQ的死信機(jī)制? 前面我們?cè)谧鯩Q組件選型時(shí),提到了rabbitMQ的消息可靠性,那么它到底可靠

    2024年02月16日
    瀏覽(30)
  • Rabbitmq怎么保證消息的可靠性?

    Rabbitmq怎么保證消息的可靠性?

    一、消費(fèi)端消息可靠性保證 : 消息確認(rèn)(Acknowledgements) : 消費(fèi)者在接收到消息后,默認(rèn)情況下RabbitMQ會(huì)自動(dòng)確認(rèn)消息(autoAck=true)。為保證消息可靠性,可以設(shè)置autoAck=false,使得消費(fèi)者在處理完消息后手動(dòng)發(fā)送確認(rèn)(basicAck)。如果消費(fèi)者在處理過程中發(fā)生異?;蛘呶赐瓿?/p>

    2024年04月14日
    瀏覽(25)
  • rabbitmq如何保證消息的可靠性傳輸(簡(jiǎn)述版本)?

    我需要從三點(diǎn)去考慮, 生產(chǎn)者弄丟了數(shù)據(jù),生產(chǎn)者將消息發(fā)送的Exchange并且路由到隊(duì)列 隊(duì)列需要將消息給它持久化 消費(fèi)者要成功消費(fèi)隊(duì)列中的消息 RabbitMQ提供了confirm機(jī)制,保證了消息消息發(fā)送的Exchange交換機(jī),那么還提供了return機(jī)制,可以保證消息從exchange路由到隊(duì)列中,如

    2024年02月13日
    瀏覽(26)
  • RabbitMQ如何保證消息的可靠性6000字詳解

    RabbitMQ如何保證消息的可靠性6000字詳解

    RabbitMQ通過生產(chǎn)者、消費(fèi)者以及MQ Broker達(dá)到了解耦的特點(diǎn),實(shí)現(xiàn)了異步通訊等一些優(yōu)點(diǎn),但是在消息的傳遞中引入了MQ Broker必然會(huì)帶來一些其他問題,比如如何保證消息在傳輸過程中可靠性(即不讓數(shù)據(jù)丟失,發(fā)送一次消息就會(huì)被消費(fèi)一次)?這篇博客將詳細(xì)從生產(chǎn)者,MQ B

    2024年02月16日
    瀏覽(26)
  • RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))

    RabbitMQ可靠性消息發(fā)送(java實(shí)現(xiàn))

    本博客屬于 《RabbitMQ基礎(chǔ)組件封裝—整體結(jié)構(gòu)》的子博客 step1:消息落庫,業(yè)務(wù)數(shù)據(jù)存庫的同時(shí),也要將消息記錄存入數(shù)據(jù)庫,二者要保證原子性; step2:Producer發(fā)送消息到MQ Broker; step3:Producer收到 broker 返回的確認(rèn)消息; step4:更改消息記錄庫的狀態(tài)(定義三種狀態(tài):0待確

    2024年02月04日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包