目錄
開(kāi)始語(yǔ)
??簡(jiǎn)述
???模式NONE
application配置
生產(chǎn)者
消費(fèi)者
結(jié)果驗(yàn)證
???模式AUTO
application配置
生產(chǎn)者
消費(fèi)者
結(jié)果驗(yàn)證
???模式ACK(重點(diǎn))
application配置
生產(chǎn)者
消費(fèi)者
結(jié)果驗(yàn)證
???生產(chǎn)者確認(rèn)機(jī)制
yml添加配置
修改生產(chǎn)者代碼
結(jié)果驗(yàn)證
結(jié)束語(yǔ)
開(kāi)始語(yǔ)
一位普通的程序員,慢慢在努力變強(qiáng)!
在此文章之前,想學(xué)習(xí)前面部分的請(qǐng)看下列列表
RabbitMQ部署方式(第一節(jié))??
SpringBoot集成RabbitMQ(第二節(jié))??
??簡(jiǎn)述
ACK模式代表的是mq的確認(rèn)機(jī)制,簡(jiǎn)單來(lái)講就是【生產(chǎn)者】在發(fā)送消息的時(shí)候,發(fā)送成功mq有一個(gè)消息收到confirm回調(diào)機(jī)制,發(fā)送失敗有一個(gè)return回調(diào)機(jī)制, 【消費(fèi)者】在接收消息后,在執(zhí)行消費(fèi)完消息需要有一個(gè)確認(rèn)機(jī)制,要告訴mq,這個(gè)消息我消費(fèi)成功了,請(qǐng)將隊(duì)列中的消息刪除,如果是失敗了,你也進(jìn)行確認(rèn)、或者拒絕要告訴mq,不然消息會(huì)一直存在于隊(duì)列中
在RabbitMQ消費(fèi)者中一共有三種模式:
NODE:對(duì)于消息的成功和失敗都不管,MQ隊(duì)列中都會(huì)將消息刪除。(不安全)
AUTO:自動(dòng)確認(rèn)模式,對(duì)于消息消費(fèi)成功,MQ隊(duì)列中的消息將會(huì)自動(dòng)刪除,消費(fèi)失敗則會(huì)一直對(duì)消息進(jìn)行消費(fèi),有沒(méi)有解決方案,當(dāng)然是有的,文章中會(huì)注明(不穩(wěn)定,如果消費(fèi)者不能保證百分百消息成功,auto模式還是不建議使用)
MANUAL:此模式就是對(duì)AUTO模式下新增了一個(gè)確認(rèn)機(jī)制,消費(fèi)者對(duì)消息的消費(fèi)成功和失敗都需要給出一個(gè)消費(fèi)確認(rèn)的標(biāo)識(shí)和動(dòng)作!
開(kāi)啟生產(chǎn)者確認(rèn)機(jī)制:
# 消息發(fā)送交換機(jī),開(kāi)啟確認(rèn)回調(diào)模式
publisher-confirm-type: correlated
# 消息發(fā)送交換機(jī),開(kāi)啟確認(rèn)機(jī)制,并且返回回調(diào)
publisher-returns: true
???模式NONE
application配置
spring:
application:
name: rabbitmq-deadLetter
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 20000
listener:
simple:
acknowledge-mode: none
生產(chǎn)者
@Test
public void workTest() throws InterruptedException {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("work", "[workTest] send 消息發(fā)送" + i);
}
Thread.sleep(10000);
System.out.println("模式:"+rabbitProperties.getListener().getSimple().getAcknowledgeMode());
}
?消費(fèi)者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消費(fèi)者監(jiān)聽(tīng) (第二種模型:工作模式)
*
* @author 猿仁
* @data 2023-01-31 09:38
*/
@Component
@Slf4j
public class WorkCustomer {
/**
* 消費(fèi)者1
*
* @param data Body響應(yīng)內(nèi)容
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue("work"))
public void work1(String data) {
log.info("[work1]消費(fèi)者消費(fèi)成功success:{}",data);
}
/**
* 消費(fèi)者2
*
* @param data Body響應(yīng)內(nèi)容
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue("work"))
public void work2(String data) {
// log.info("[work2]消費(fèi)者消費(fèi)成功success:{}",data);
try {
int a = 0 / 0;
}catch (Exception e){
log.error("[work2]消費(fèi)者消費(fèi)失敗fail:{}",data);
throw e;
}
}
}
結(jié)果驗(yàn)證
結(jié)果解析:
從上面途中可以看出,此模式針對(duì)成功和失敗的消息都是直接成功,只要消費(fèi)者接受到消息(不管消費(fèi)者是否有異常),都當(dāng)作是消費(fèi)成功處理。
優(yōu)點(diǎn)L:此模式很難被阻塞,消費(fèi)能力不足,多開(kāi)幾個(gè)消費(fèi)來(lái)消費(fèi)即可。
缺點(diǎn):失敗的消息被丟棄了,在現(xiàn)實(shí)開(kāi)發(fā)中不允許丟棄消息的(比如:發(fā)貨隊(duì)列,某個(gè)賬號(hào)在通知進(jìn)銷(xiāo)存系統(tǒng)進(jìn)行發(fā)貨時(shí)消息發(fā)送了,但是處理異常直接丟失了,此時(shí)用戶(hù)不知道自己的貨還沒(méi)有發(fā)出,沒(méi)有沒(méi)有短信和物流,貨未發(fā)就沒(méi)有接下來(lái)的一系列消息)
???模式AUTO
application配置
spring:
application:
name: rabbitmq-deadLetter
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 20000
listener:
simple:
acknowledge-mode: auto # 此處開(kāi)啟了,沒(méi)有設(shè)置死信,過(guò)期時(shí)間,最好設(shè)置一下異常不會(huì)回歸隊(duì)列的配置,不然會(huì)出現(xiàn)一直重新消費(fèi)的問(wèn)題。配置如下:
default-requeue-rejected: false # 是否將失敗消息回歸隊(duì)列
生產(chǎn)者
/**
* auto模式
*/
@Test
public void autoTest() throws InterruptedException {
// 開(kāi)啟ack模式 完全消費(fèi),隊(duì)列中無(wú)消息
rabbitTemplate.convertAndSend("auto_no_err", "測(cè)試auto消費(fèi)者模式");
// 未開(kāi)啟ack模式 結(jié)果是隊(duì)列中還存在一條等待被消費(fèi)的消息
rabbitTemplate.convertAndSend("auto_err", "測(cè)試auto消費(fèi)者模式");
Thread.sleep(3000);
}
消費(fèi)者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* none消費(fèi)者模式
*
* @author tianyu.Ge
* @date 2023/2/6 12:55
*/
@Component
@Slf4j
public class AutoCustomer {
/**
* @param data Body響應(yīng)內(nèi)容
* @param headers 請(qǐng)求頭
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "auto_no_err", durable = "true", autoDelete = "false"))
public void autoNoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) {
System.out.println("獲取到了通道[autoNoErr]的數(shù)據(jù)!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
}
/**
* @param data Body響應(yīng)內(nèi)容
* @param headers 請(qǐng)求頭
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "auto_err", durable = "true", autoDelete = "false"))
public void autoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) {
int a = 0 / 0;
System.out.println("獲取到了通道[autoErr]的數(shù)據(jù)!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
}
}
結(jié)果驗(yàn)證
?
結(jié)果分析:
生產(chǎn)者發(fā)送了兩條消息,一條正常,一條異常,那么在開(kāi)啟了default-requeue-rejected: false這個(gè)回歸隊(duì)列配置,消息只要是消費(fèi)者接收了,就當(dāng)做消費(fèi)成功,不關(guān)心你消費(fèi)者是否在消費(fèi)的途中出現(xiàn)異常,隊(duì)列都將會(huì)刪除隊(duì)列中對(duì)應(yīng)的消息。 如果沒(méi)有配置default-requeue-rejected那么消費(fèi)者出現(xiàn)異常,消息會(huì)重回隊(duì)列,然后由消費(fèi)者重新進(jìn)行消費(fèi),導(dǎo)致一直重復(fù)消費(fèi)!
???模式ACK(重點(diǎn))
?application配置
spring:
application:
name: rabbitmq-ack
rabbitmq:
host: tianyu.com.cn
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 2000000
listener:
direct:
# 采用手動(dòng)應(yīng)答
acknowledge-mode: manual
simple:
# 指定最大的消費(fèi)者數(shù)量
max-concurrency: 50
# 指定最小的消費(fèi)者數(shù)量
concurrency: 1
# 采用手動(dòng)應(yīng)答
acknowledge-mode: manual
retry:
# 是否開(kāi)啟重試機(jī)制
enabled: true
# 默認(rèn)是3,是一共三次,而不是重試三次,三次包含了第一執(zhí)行,所以只重試了兩次
max-attempts: 3
# 重試間隔時(shí)間。毫秒
initial-interval: 2000
default-requeue-rejected: false
生產(chǎn)者
/**
* ack模式
*/
@Test
public void ackTest() throws InterruptedException {
// 開(kāi)啟ack模式 完全消費(fèi),隊(duì)列中無(wú)消息
rabbitTemplate.convertAndSend("ack_no_err", "測(cè)試ack消費(fèi)者模式");
// 未開(kāi)啟ack模式 結(jié)果是隊(duì)列中還存在一條等待被消費(fèi)的消息
rabbitTemplate.convertAndSend("ack_err", "測(cè)試ack消費(fèi)者模式");
Thread.sleep(6000);
System.out.println("一共執(zhí)行"+ AckCustomer.count +"次!");
}
消費(fèi)者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 手動(dòng)確認(rèn)模式 消費(fèi)者模式
*
* @author tianyu.Ge
* @date 2023/2/6 12:55
*/
@Component
@Slf4j
public class AckCustomer {
// 重試的次數(shù)
public static int count = 0;
/**
* @param data Body響應(yīng)內(nèi)容
* @param headers 請(qǐng)求頭
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "ack_no_err", durable = "true", autoDelete = "false"))
public void ackNoErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) throws IOException {
System.out.println("獲取到了通道[ackNoErr]的數(shù)據(jù)!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* @param data Body響應(yīng)內(nèi)容
* @param headers 請(qǐng)求頭
* @param channel 通道
* @param message 消息
* @return void
* @author 猿仁
* @date 2023/1/31 9:38
*/
@RabbitListener(queuesToDeclare = @Queue(value = "ack_err", durable = "true", autoDelete = "false"))
public void ackErr(@Payload String data, @Headers Map<String, Object> headers, Channel channel, Message message) throws IOException {
try {
++count;
int a = 0 / 0;
/**
* 參數(shù)1:消息標(biāo)簽
* 參數(shù)2:是否批量確認(rèn),屬于一個(gè)隊(duì)列中的消息,全部確認(rèn),false:只確認(rèn)當(dāng)前消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e) {
log.error("[ackErr]消費(fèi)者出現(xiàn)異常:{}",e.getMessage());
/**
* 【否認(rèn)策略】
*
* 參數(shù)1:消息標(biāo)簽
* 參數(shù)2:是否批量處理 true:批量 (通道中A\B\C\D接收到第一個(gè)那么后面的不管成沒(méi)成功都會(huì)被應(yīng)答,不安全,只有在確保通道中的消息百分百消費(fèi)成功時(shí)才可使用),false:只確認(rèn)當(dāng)前消息
* 參數(shù)3:被拒絕的消息是否回歸隊(duì)列 true:回歸,false:丟棄 【注意】:如果只有一個(gè)消費(fèi)者的話(huà),true將導(dǎo)致無(wú)限循壞, 應(yīng)該改為false:并且通知mq丟棄或者不處理
*/
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
if(count == 3){
// 當(dāng)執(zhí)行最后一次的時(shí)候,失敗了,那么直接丟棄,從隊(duì)列中刪除
log.info("[ackErr]一共執(zhí)行{}次,還是失敗,開(kāi)啟確認(rèn)失敗!",count);
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
throw e;
}
System.out.println("獲取到了通道[ackErr]的數(shù)據(jù)!" + data);
log.info("Payload: {}", data);
log.info("Headers: {}", headers);
log.info("Channel: {}", channel);
log.info("Message: {}", message);
}
}
結(jié)果驗(yàn)證
結(jié)果分析:
開(kāi)啟ack模式,那么不管消費(fèi)者是否執(zhí)行成功或者失敗,都需要給予一個(gè)消息的確認(rèn),成功就確認(rèn)成功,失敗就確認(rèn)失敗,如果沒(méi)有確認(rèn),那么消息一直會(huì)存在隊(duì)列,并且沒(méi)有確認(rèn)ack的消息會(huì)再u(mài)nacked中顯示數(shù)量。
???生產(chǎn)者確認(rèn)機(jī)制
yml添加配置
spring: application: name: rabbitmq-ack rabbitmq: # 消息發(fā)送交換機(jī),開(kāi)啟確認(rèn)回調(diào)模式 publisher-confirm-type: correlated # 消息發(fā)送交換機(jī),開(kāi)啟確認(rèn)機(jī)制,并且返回回調(diào) publisher-returns: true # template: # # 指定消息在沒(méi)有被隊(duì)列接收時(shí)是否強(qiáng)行退回還是直接丟棄:ReturnCallback.returnedMessage消息未送達(dá)回調(diào)(true) # mandatory: true
修改生產(chǎn)者代碼
/**
* ack模式之生產(chǎn)者確認(rèn)機(jī)制
*/
@Test
public void ackPublisherTest() throws InterruptedException {
// 生產(chǎn)者消息確認(rèn)機(jī)制開(kāi)啟
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
if (ack) {
System.out.println("ConfirmCallback:correlationData: " + correlationData);
System.out.println("ConfirmCallback:correlationData.body: " + new String(correlationData.getReturnedMessage().getBody()));
System.out.println("ConfirmCallback:ack: " + ack);
System.out.println("ConfirmCallback:cause: " + cause);
}else {
System.out.println("沒(méi)有ack,又是怎樣的,猿友們有空可以研究研究");
}
});
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
System.out.println("ReturnCallback: " + message);
System.out.println("ReturnCallback: " + replyCode);
System.out.println("ReturnCallback: " + replyText);
System.out.println("ReturnCallback: " + exchange);
System.out.println("ReturnCallback: " + routingKey);
});
rabbitTemplate.setMandatory(true);
// 開(kāi)啟ack模式 完全消費(fèi),隊(duì)列中無(wú)消息
CorrelationData correlationData = new CorrelationData();
correlationData.setReturnedMessage(new Message("測(cè)試ack消費(fèi)者模式".getBytes(), new MessageProperties()));
correlationData.setId("A");
rabbitTemplate.convertAndSend("ack_no_err", "測(cè)試ack消費(fèi)者模式", (Message message) -> {
// 可以配置一些request請(qǐng)求參數(shù)
// message.getMessageProperties().setHeader("token", "123-213-we-123-sd-ad2-");
//message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}, correlationData);
// 未開(kāi)啟ack模式 結(jié)果是隊(duì)列中還存在一條等待被消費(fèi)的消息
CorrelationData correlationData1 = new CorrelationData();
correlationData1.setReturnedMessage(new Message("測(cè)試ack消費(fèi)者模式".getBytes(), new MessageProperties()));
correlationData1.setId("B");
rabbitTemplate.convertAndSend("ack_err", "測(cè)試ack消費(fèi)者模式", (Message message) -> {
return message;
}, correlationData1);
// 沒(méi)有的隊(duì)列名稱(chēng) 這條將會(huì)被ReturnCallback監(jiān)聽(tīng)到,因?yàn)闆](méi)有隊(duì)列ack_err_123,消息不可達(dá)
rabbitTemplate.convertAndSend("ack_err_123", "測(cè)試ack消費(fèi)者模式");
Thread.sleep(6000);
System.out.println("一共執(zhí)行" + AckCustomer.count + "次!");
}
結(jié)果驗(yàn)證
?
結(jié)果分析:
開(kāi)啟消息回調(diào),那么生產(chǎn)者在發(fā)送消息的時(shí)候就可以捕捉消息是否發(fā)送成功,發(fā)送成功會(huì)進(jìn)入ConfirmCallback回調(diào)代碼塊,消息發(fā)送失敗會(huì)進(jìn)入?ReturnCallback代碼塊。到這里就完成了消息的確認(rèn)機(jī)制,保證了消息可靠性!
結(jié)束語(yǔ)
溫馨提示:如有問(wèn)題,可在下方留言,作者看到了會(huì)第一時(shí)間回復(fù)!文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-765073.html
本章節(jié)完成了,各位正在努力的程序員們,如果你們覺(jué)得本文章對(duì)您有用的話(huà),你學(xué)到了一些東西,希望猿友們點(diǎn)個(gè)贊+關(guān)注,支持一下猿仁!
持續(xù)更新中…文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-765073.html
到了這里,關(guān)于SpringBoot集成RabbitMQ之ACK確認(rèn)機(jī)制(第三節(jié))的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!