RabbitMQ–基礎(chǔ)–8.1–消息確認(rèn)機(jī)制–接受確認(rèn)機(jī)制(ACK)
代碼位置
https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03
1、場(chǎng)景和問(wèn)題
1.1、需求
消費(fèi)者收到Queue中的消息,但沒(méi)有處理完成就宕機(jī)的情況,這種情況下就可能會(huì)導(dǎo)致消息丟失。
為了避免這種情況發(fā)生,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknowledgment)后才將該消息從Queue中移除。
如果RabbitMQ沒(méi)有收到回執(zhí)并檢測(cè)到消費(fèi)者的RabbitMQ連接斷開(kāi),則RabbitMQ會(huì)將該消息發(fā)送給其他消費(fèi)者(如果存在多個(gè)消費(fèi)者)進(jìn)行處理。
這里不存在Timeout概念,一個(gè)消費(fèi)者處理消息時(shí)間再長(zhǎng)也不會(huì)導(dǎo)致該消息被發(fā)送給其他消費(fèi)者,除非它的RabbitMQ連接斷開(kāi)。
1.2、消息確認(rèn)消息引發(fā)的問(wèn)題
如果我們的開(kāi)發(fā)人員在處理完業(yè)務(wù)邏輯后,忘記發(fā)送回執(zhí)給RabbitMQ,這將會(huì)導(dǎo)致嚴(yán)重的問(wèn)題,Queue中堆積的消息會(huì)越來(lái)越多,消費(fèi)者重啟后會(huì)重復(fù)消費(fèi)這些消息并重復(fù)執(zhí)行業(yè)務(wù)邏輯。
2、channel.basicConsume(queueName,autoAck,callback)方法
2.1、參數(shù)
- queueName:隊(duì)列名稱
- autoAck:設(shè)置是否自動(dòng)確認(rèn)
- true:自動(dòng)確認(rèn),消息一旦被消費(fèi)者接收,隊(duì)列中的消息就會(huì)被刪除
- false:手動(dòng)確認(rèn)
- callback:設(shè)置消費(fèi)者的回調(diào)函數(shù),用來(lái)處理 RabbitMQ 推送過(guò)來(lái)的消息
3、消息確認(rèn)機(jī)制 ACK
- 為了保證消息從隊(duì)列可靠地達(dá)到消費(fèi)者
- 當(dāng)消費(fèi)者獲取消息后,會(huì)向 RabbitMQ 發(fā)送回執(zhí) ACK,告知消息已經(jīng)被接收。
- ACK分兩種情況
- 自動(dòng) ACK
- 手動(dòng) ACK
3.1、自動(dòng) ACK
- autoAck=false。
- RabbitMQ 會(huì)自動(dòng)把發(fā)送出去的消息設(shè)置為確認(rèn),然后從內(nèi)存或磁盤中刪除,而不管消費(fèi)者是否真正地消費(fèi)了這些消息
3.2、手動(dòng) ACK
- autoAck=false。
- RabbitMQ 會(huì)等待消費(fèi)者顯示地回復(fù)確認(rèn)信號(hào)后才從內(nèi)存或磁盤中移去消息
3.2.1、好處
- autoAck=false,消費(fèi)者就有足夠的時(shí)間處理消息,不用擔(dān)心處理消息過(guò)程中,消費(fèi)者進(jìn)程掛掉后消息丟失的問(wèn)題。因?yàn)椋琑abbitMQ 會(huì)一直等待持有消息,直到消費(fèi)者顯示調(diào)用 Basic.Ack 命令為止。
3.2.2、原理
- autoAck=false,隊(duì)列中的消息分成了兩部分
- 等待投遞給消費(fèi)者的消息
- 已經(jīng)投遞給消費(fèi)者,但還沒(méi)有收到消費(fèi)者確認(rèn)信號(hào)的消息。
- 如果RabbitMQ一直沒(méi)有收到消費(fèi)者的確認(rèn)信號(hào),并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開(kāi)連接,則RabbitMQ會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者。這樣就保證消息不丟失了。
3.3、使用場(chǎng)景
- 如果消息不太重要,丟失也沒(méi)有影響,那么autoAck=ture。
- 如果消息非常重要,不容丟失,那么autoAck=ture。
4、代碼實(shí)現(xiàn)(手動(dòng) ACK)
4.1、代碼結(jié)構(gòu)
4.2、生產(chǎn)者
package com.example.rabbitmq03.business.test7;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 簡(jiǎn)單模式
public static void main(String[] args) {
// 1. 獲取連接
Connection connection = null;
try {
connection = RabbitMqUtil.getConnection("生產(chǎn)者");
} catch (Exception e) {
System.out.println("獲取連接時(shí),出現(xiàn)異常");
}
Channel channel = null;
try {
// 2. 通過(guò)連接獲取通道 Channel
channel = connection.createChannel();
String queueName = "code_simple_queue1";
// 3. 通過(guò)通道創(chuàng)建聲明隊(duì)列
channel.queueDeclare(queueName, false, false, false, null);
// 4. 準(zhǔn)備消息內(nèi)容
String message = "你好";
// 5. 發(fā)送消息給隊(duì)列 Queue
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息發(fā)送完成~~~發(fā)送的消息為:" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
RabbitMqUtil.close(connection, channel);
} catch (Exception e) {
System.out.println("關(guān)閉時(shí),出現(xiàn)異常");
}
}
}
}
4.3、消費(fèi)者
修改的地方
package com.example.rabbitmq03.business.test7;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.*;
public class Consumer {
public static void main(String[] args) throws Exception{
// 獲取連接
Connection connection = RabbitMqUtil.getConnection("消費(fèi)者");
final Channel channel = connection.createChannel();
String queueName = "code_simple_queue1";
// 定義消費(fèi)者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息id,mq 在 channel 中用來(lái)標(biāo)識(shí)消息的 id,可用于確認(rèn)消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 消息體
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
/**
* @param1:deliveryTag:用來(lái)標(biāo)識(shí)消息的id
* @param2:multiple:是否批量。true:將一次性 ACK 所有小于 deliveryTag 的消息
*/
// 手動(dòng)確認(rèn)
channel.basicAck(deliveryTag, false);
}
};
// 監(jiān)聽(tīng)隊(duì)列 手動(dòng) ACK
channel.basicConsume(queueName, false, consumer);
System.out.println("開(kāi)始接收消息~~~");
System.in.read();
// 關(guān)閉信道、連接
RabbitMqUtil.close(connection, channel);
}
}
4.4、測(cè)試
5、自動(dòng) ACK 帶來(lái)的問(wèn)題
5.1、執(zhí)行生產(chǎn)者,產(chǎn)生一條記錄
5.2、設(shè)置修改消費(fèi)者為自動(dòng)ACK
package com.example.rabbitmq03.business.test7;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.*;
public class Consumer {
public static void main(String[] args) throws Exception {
// 獲取連接
Connection connection = RabbitMqUtil.getConnection("消費(fèi)者");
final Channel channel = connection.createChannel();
String queueName = "code_simple_queue1";
// 定義消費(fèi)者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
int result = 1 / 0;
// body 消息體
String msg = new String(body, "utf-8");
System.out.println("收到消息:" + msg);
}
};
// 監(jiān)聽(tīng)隊(duì)列 自動(dòng) ACK
channel.basicConsume(queueName, true, consumer);
System.out.println("開(kāi)始接收消息~~~");
System.in.read();
// 關(guān)閉信道、連接
RabbitMqUtil.close(connection, channel);
}
}
5.3、執(zhí)行消費(fèi)者
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-499046.html
消費(fèi)者代碼報(bào)錯(cuò),沒(méi)有收到消息,但是隊(duì)列的消息少了,原因就是,MQ將異常內(nèi)部消化了。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-499046.html
到了這里,關(guān)于RabbitMQ--基礎(chǔ)--8.1--消息確認(rèn)機(jī)制--接受確認(rèn)機(jī)制(ACK)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!