国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMq(七) -- 常見問(wèn)題:冪等性問(wèn)題(消息重復(fù)消費(fèi))、消息丟失

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMq(七) -- 常見問(wèn)題:冪等性問(wèn)題(消息重復(fù)消費(fèi))、消息丟失。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1. 冪等性

用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用。 舉個(gè)最簡(jiǎn)單的例子,那就是支付,用戶購(gòu)買商品后支付,支付扣款成功,但是返回結(jié)果的時(shí)候網(wǎng)絡(luò)異常, 此時(shí)錢已經(jīng)扣了,用戶再次點(diǎn)擊按鈕,此時(shí)會(huì)進(jìn)行第二次扣款,返回結(jié)果成功,用戶查詢余額發(fā)現(xiàn)多扣錢 了,流水記錄也變成了兩條。在以前的單應(yīng)用系統(tǒng)中,我們只需要把數(shù)據(jù)操作放入事務(wù)中即可,發(fā)生錯(cuò)誤立即回滾,但是再響應(yīng)客戶端的時(shí)候也有可能出現(xiàn)網(wǎng)絡(luò)中斷或者異常等等。

消息冪等性,其實(shí)就是保證同一個(gè)消息不被消費(fèi)者重復(fù)消費(fèi)兩次

1.1 消息重復(fù)消費(fèi)&重復(fù)投遞

  • 重復(fù)投遞:
    • 生產(chǎn)在往MQ發(fā)送消息時(shí),MQ收到消息并持久化到本地后,進(jìn)行發(fā)布確認(rèn)告訴生產(chǎn)者,消息已經(jīng)被持久化的過(guò)程中出現(xiàn)網(wǎng)絡(luò)中斷,生產(chǎn)者沒(méi)有收到消息發(fā)布確認(rèn)的消息,故而重新發(fā)送一條消息
  • 重復(fù)消費(fèi):
    • 消費(fèi)者在消費(fèi) MQ 中的消息時(shí),MQ 已把消息發(fā)送給消費(fèi)者,消費(fèi)者在給 MQ 返回 ack 時(shí)網(wǎng)絡(luò)中斷, 故 MQ 未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息。

1.2 解決思路

MQ 消費(fèi)者的冪等性的解決一般使用全局 ID 或者寫個(gè)唯一標(biāo)識(shí),比如時(shí)間戳或者 UUID ,訂單消費(fèi)者消費(fèi) MQ 中的消息也可利用 MQ 的該 id 來(lái)判斷,或者可按自己的規(guī)則生成一個(gè)全局唯一 id,每次消費(fèi)消息時(shí)用該 id 先判斷該消息是否已消費(fèi)過(guò)。

1.3 消費(fèi)端的冪等性保障

在海量訂單生成的業(yè)務(wù)高峰期,生產(chǎn)端有可能就會(huì)重復(fù)發(fā)生了消息,這時(shí)候消費(fèi)端就要實(shí)現(xiàn)冪等性, 這就意味著我們的消息永遠(yuǎn)不會(huì)被消費(fèi)多次,即使我們收到了一樣的消息。

業(yè)界主流的冪等性有兩種操作:

  • 方式1: 消息全局 ID 或者寫個(gè)唯一標(biāo)識(shí)(如時(shí)間戳、UUID 等) :每次消費(fèi)消息之前根據(jù)消息 id 去判斷該消息是否已消費(fèi)過(guò),如果已經(jīng)消費(fèi)過(guò),則不處理這條消息,否則正常消費(fèi)消息,并且進(jìn)行入庫(kù)操作。(消息全局 ID 作為數(shù)據(jù)庫(kù)表的主鍵,防止重復(fù))。
    • 這里可以結(jié)合業(yè)務(wù),根據(jù)業(yè)務(wù)的唯一ID+消息的業(yè)務(wù)需求,拼接成唯一ID。在插入的時(shí)候通過(guò)主鍵校驗(yàn)來(lái)避免重復(fù)投遞,在消費(fèi)的時(shí)候通過(guò)狀態(tài)判斷來(lái)避免重復(fù)消費(fèi)
  • 方式2: 利用 Redis 的 setnx 命令:給消息分配一個(gè)全局 ID,消費(fèi)該消息時(shí),先去 Redis 中查詢有沒(méi)消費(fèi)記錄,無(wú)則以鍵值對(duì)形式寫入 Redis ,有則不消費(fèi)該消息。

1.4 唯一 ID 代碼演示

1.4.1 配置:
spring.rabbitmq.host=192.168.0.68
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
# 開啟消息發(fā)布確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirm-type=correlated
# 發(fā)布消息返回監(jiān)聽回調(diào)
spring.rabbitmq.publisher-returns=true
# 指定消息確認(rèn)模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 未正確路由的消息發(fā)送到備份隊(duì)列
# 使用備份交換機(jī)模式,mandatory 將無(wú)效,即就算 mandatory設(shè) 置為 false,路由失敗的消息同樣會(huì)被投遞到綁定的備份交換機(jī)
spring.rabbitmq.template.mandatory=true
1.4.2 隊(duì)列和交換機(jī)配置:
@Configuration
public class RevisitConfig {

    /**
     * 創(chuàng)建 direct 隊(duì)列
     * */
    @Bean
    Queue DirectQueue01() {
        return new Queue("DirectQueue-01",true);
    }

    /**
     * 創(chuàng)建 direct 交換機(jī)
     * */
    @Bean
    DirectExchange DirectExchange01() {
        return new DirectExchange("DirectExchange-01");
    }

    /**
     * 綁定 direct 隊(duì)列和交換機(jī)
     * */
    @Bean
    Binding bindingDirect01() {
        return BindingBuilder.bind(DirectQueue01()).to(DirectExchange01()).with("DirectRouting01");
    }
}
1.4.3 自定義消息應(yīng)答回調(diào)方法
@Component
@Slf4j
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //依賴注入 rabbitTemplate 之后再設(shè)置它的回調(diào)對(duì)象
    // 此注解會(huì)在其他注解執(zhí)行完成后再執(zhí)行,所以rabbitTemplate先注入,再執(zhí)行此初始化方法
    @PostConstruct
    public void init() {
        // 設(shè)置rabbitTemplate的ConfirmCallBack為我們重寫后的類
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 交換機(jī)不管是否收到消息都會(huì)執(zhí)行的一個(gè)回調(diào)方法
     *
     * @param correlationData 消息相關(guān)數(shù)據(jù)
     * @param ack             交換機(jī)是否收到消息
     * @param cause           未收到消息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交換機(jī)已經(jīng)收到 id 為:{}的消息", id);
        } else {
            log.info("交換機(jī)還未收到 id 為:{}消息,原因:{}", id, cause);
        }
    }

    // 確認(rèn)消息是否從交換機(jī)成功到達(dá)隊(duì)列中,失敗將會(huì)執(zhí)行,成功則不執(zhí)行
    @Override
    public void returnedMessage(Message message, int replayCode, String replayText, String exchange, String routingKey) {
        log.info("消息{},被交換機(jī){}退回,退回原因:{},路由key:", new String(message.getBody()), exchange, replayText, routingKey);
    }
}
1.4.4 數(shù)據(jù)庫(kù)對(duì)象相關(guān)配置:

數(shù)據(jù)庫(kù)腳本:

CREATE TABLE `message_idempotent` (
  `message_id` varchar(50) NOT NULL COMMENT '消息ID',
  `message_content` varchar(2000) DEFAULT NULL COMMENT '消息內(nèi)容',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

對(duì)象:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageIdempotent extends Model<MessageIdempotent> {

    @TableId("message_id")
    private String messageId;

    @TableField("message_content")
    private String messageContent;
}

mapper:

@Mapper
public interface MessageIdempotentMapper extends BaseMapper<MessageIdempotent> {
}
1.4.5 生產(chǎn)者編寫:
/**
* 消息冪等性
* */
@GetMapping("/sendMessage")
public void sendMessage(String msg, String id) {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(id);
    messageProperties.setContentType("text/plain");
    messageProperties.setContentEncoding("utf-8");
    Message message = new Message(msg.getBytes(), messageProperties);
    log.info("生產(chǎn)消息:" + message.toString());
    // 消息發(fā)送確認(rèn)回調(diào)
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("DirectExchange-01", "DirectRouting01", message, correlationData);
}

訪問(wèn)接口:

http://localhost:8091/shiro/revisit/sendMessage?msg=你好啊&id=1
http://localhost:8091/shiro/revisit/sendMessage?msg=&id=1

日志:(此處有confirmCallback未回調(diào)問(wèn)題待解決,按道理打印完生產(chǎn)消息后應(yīng)該打?。航粨Q機(jī)已經(jīng)收到 id 為:{}的消息)

2023-04-10 14:31:12.859  INFO 19232 --- [nio-8091-exec-1] c.y.t.r.TestRevisit.RevisitController    : 生產(chǎn)消息:(Body:'你好啊' MessageProperties [headers={}, messageId=1, contentType=text/plain, contentEncoding=utf-8, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2023-04-10 14:31:29.002  INFO 19232 --- [nio-8091-exec-2] c.y.t.r.TestRevisit.RevisitController    : 生產(chǎn)消息:(Body:'' MessageProperties [headers={}, messageId=1, contentType=text/plain, contentEncoding=utf-8, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

客戶端中:
rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)

1.4.6 消費(fèi)者編寫:
@RabbitListener(queues = "DirectQueue-01")
public void receiveMessage02(Message message, Channel channel) throws IOException {
    String messageId = message.getMessageProperties().getMessageId();
    String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);

    MessageIdempotent messageIdempotent = new MessageIdempotent();
    messageIdempotent.setMessageId(messageId);
    messageIdempotent.setMessageContent(messageContent);

    try {
        if (messageIdempotentMapper.insert(messageIdempotent) <= 0) {
            log.info("DirectQueue-01-消費(fèi)者收到消息,消息ID:" + messageId + " 消息內(nèi)容:" + messageContent);
            // 消息確認(rèn)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            log.info("消息 " + messageId + " 已經(jīng)消費(fèi)過(guò)!");
        }
    } catch (Exception e) {
        log.info("消息 " + messageId + " 已經(jīng)消費(fèi)過(guò)!");
    }
}

結(jié)果:

2023-04-10 14:47:06.738  INFO 25416 --- [ntContainer#6-1] c.y.t.r.TestRevisit.RevisitConsumer      : DirectQueue-01-消費(fèi)者收到消息,消息ID:1 消息內(nèi)容:你好啊
2023-04-10 14:47:06.745  INFO 25416 --- [ntContainer#6-1] c.y.t.r.TestRevisit.RevisitConsumer      : 消息 1 已經(jīng)消費(fèi)過(guò)!

數(shù)據(jù)庫(kù)中:
rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)
隊(duì)列中:
rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)

1.5 note Redis 原子性

利用 redis 執(zhí)行 setnx 命令,天然具有冪等性,從而實(shí)現(xiàn)不重復(fù)消費(fèi)。利用redis的操作的好處是緩存更快。

代碼這里不再演示,無(wú)非是一個(gè)插入數(shù)據(jù)庫(kù),一個(gè)setnx進(jìn)redis。

2. 消息丟失

2.1 消息丟失的場(chǎng)景

rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)

  • 第一種:生產(chǎn)者弄丟了數(shù)據(jù)。生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了,因?yàn)榫W(wǎng)絡(luò)問(wèn)題啥的,都有可能。
  • 第二種:RabbitMQ 弄丟了數(shù)據(jù)。MQ還沒(méi)有持久化自己掛了
  • 第三種:消費(fèi)端弄丟了數(shù)據(jù)。剛消費(fèi)到,還沒(méi)處理,結(jié)果進(jìn)程掛了,比如重啟了。

2.2 RabbitMQ消息丟失解決方案

rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)

2.2.1 針對(duì)生產(chǎn)者
1. 方案1 :開啟RabbitMQ事務(wù)

可以選擇用 RabbitMQ 提供的事務(wù)功能,就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開啟 RabbitMQ 事務(wù)channel.txSelect,然后發(fā)送消息,如果消息沒(méi)有成功被 RabbitMQ 接收到,那么生產(chǎn)者會(huì)收到異常報(bào)錯(cuò),此時(shí)就可以回滾事務(wù)channel.txRollback,然后重試發(fā)送消息;如果收到了消息,那么可以提交事務(wù)channel.txCommit。

// 開啟事務(wù)
channel.txSelect
try {
      // 這里發(fā)送消息
} catch (Exception e) {
      channel.txRollback

// 這里再次重發(fā)這條消息

}
// 提交事務(wù)
channel.txCommit

缺點(diǎn)
RabbitMQ 事務(wù)機(jī)制是同步的,你提交一個(gè)事務(wù)之后會(huì)阻塞在那兒,采用這種方式基本上吞吐量會(huì)下來(lái),因?yàn)樘男阅堋?/p>

2. 方案2: 使用confirm機(jī)制

事務(wù)機(jī)制和 confirm 機(jī)制最大的不同在于,事務(wù)機(jī)制是同步的,你提交一個(gè)事務(wù)之后會(huì)阻塞在那兒,但是 confirm 機(jī)制是異步的

在生產(chǎn)者開啟了confirm模式之后,每次寫的消息都會(huì)分配一個(gè)唯一的id,然后如果寫入了rabbitmq之中,rabbitmq會(huì)給你回傳一個(gè)ack消息,告訴你這個(gè)消息發(fā)送OK了;如果rabbitmq沒(méi)能處理這個(gè)消息,會(huì)回調(diào)你一個(gè)nack接口,告訴你這個(gè)消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個(gè)機(jī)制知道自己在內(nèi)存里維護(hù)每個(gè)消息的id,如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào),那么你可以進(jìn)行重發(fā)。

即第一節(jié)MyCallback中:

/**
 * 交換機(jī)不管是否收到消息都會(huì)執(zhí)行的一個(gè)回調(diào)方法
 *
 * @param correlationData 消息相關(guān)數(shù)據(jù)
 * @param ack             交換機(jī)是否收到消息
 * @param cause           未收到消息的原因
 */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    String id = correlationData != null ? correlationData.getId() : "";
    if (ack) {
        log.info("交換機(jī)已經(jīng)收到 id 為:{}的消息", id);
    } else {
        log.info("交換機(jī)還未收到 id 為:{}消息,原因:{}", id, cause);
    }
}
2.2.2 針對(duì)RabbitMQ

說(shuō)三點(diǎn):

  1. 要保證rabbitMQ不丟失消息,那么就需要開啟rabbitMQ的持久化機(jī)制,即把消息持久化到硬盤上,這樣即使rabbitMQ掛掉在重啟后仍然可以從硬盤讀取消息;
  2. 如果rabbitMQ單點(diǎn)故障怎么辦,這種情況倒不會(huì)造成消息丟失,這里就要提到rabbitMQ的3種安裝模式,單機(jī)模式、普通集群模式、鏡像集群模式,這里要保證rabbitMQ的高可用就要配合HAPROXY做鏡像集群模式
  3. 如果硬盤壞掉怎么保證消息不丟失
1. 消息持久化

RabbitMQ 的消息默認(rèn)存放在內(nèi)存上面,如果不特別聲明設(shè)置,消息不會(huì)持久化保存到硬盤上面的,如果節(jié)點(diǎn)重啟或者意外crash掉,消息就會(huì)丟失。

所以就要對(duì)消息進(jìn)行持久化處理。如何持久化,下面具體說(shuō)明下:

要想做到消息持久化,必須滿足以下三個(gè)條件,缺一不可。

  1. Exchange 設(shè)置持久化
  2. Queue 設(shè)置持久化
  3. Message持久化發(fā)送:發(fā)送消息設(shè)置發(fā)送模式deliveryMode=2,代表持久化消息
2. 設(shè)置集群鏡像模式

我們先來(lái)介紹下RabbitMQ三種部署模式:

  1. 單節(jié)點(diǎn)模式:最簡(jiǎn)單的情況,非集群模式,節(jié)點(diǎn)掛了,消息就不能用了。業(yè)務(wù)可能癱瘓,只能等待。
  2. 普通模式:消息只會(huì)存在與當(dāng)前節(jié)點(diǎn)中,并不會(huì)同步到其他節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)宕機(jī),有影響的業(yè)務(wù)會(huì)癱瘓,只能等待節(jié)點(diǎn)恢復(fù)重啟可用(必須持久化消息情況下)。
  3. 鏡像模式:消息會(huì)同步到其他節(jié)點(diǎn)上,可以設(shè)置同步的節(jié)點(diǎn)個(gè)數(shù),但吞吐量會(huì)下降。屬于RabbitMQ的HA方案

為什么設(shè)置鏡像模式集群,因?yàn)殛?duì)列的內(nèi)容僅僅存在某一個(gè)節(jié)點(diǎn)上面,不會(huì)存在所有節(jié)點(diǎn)上面,所有節(jié)點(diǎn)僅僅存放消息結(jié)構(gòu)和元數(shù)據(jù)。下面自己畫了一張圖介紹普通集群丟失消息情況:
rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)
如果想解決上面途中問(wèn)題,保證消息不丟失,需要采用HA 鏡像模式隊(duì)列。

下面介紹下三種HA策略模式:

  1. 同步至所有的
  2. 同步最多N個(gè)機(jī)器
  3. 只同步至符合指定名稱的nodes

命令處理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

  1. 為每個(gè)以“rock.wechat”開頭的隊(duì)列設(shè)置所有節(jié)點(diǎn)的鏡像,并且設(shè)置為自動(dòng)同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
  1. 為每個(gè)以“rock.wechat.”開頭的隊(duì)列設(shè)置兩個(gè)節(jié)點(diǎn)的鏡像,并且設(shè)置為自動(dòng)同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  1. 為每個(gè)以“node.”開頭的隊(duì)列分配指定的節(jié)點(diǎn)做鏡像
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 鏡像隊(duì)列有一個(gè)很大的缺點(diǎn)就是: 系統(tǒng)的吞吐量會(huì)有所下降

3. 消息補(bǔ)償機(jī)制

為什么還要消息補(bǔ)償機(jī)制呢?難道消息還會(huì)丟失,沒(méi)錯(cuò),系統(tǒng)是在一個(gè)復(fù)雜的環(huán)境,不要想的太簡(jiǎn)單了,雖然以上的三種方案,基本可以保證消息的高可用不丟失的問(wèn)題,

但是作為有追求的程序員來(lái)講,要絕對(duì)保證我的系統(tǒng)的穩(wěn)定性,有一種危機(jī)意識(shí)。

比如:持久化的消息,保存到硬盤過(guò)程中,當(dāng)前隊(duì)列節(jié)點(diǎn)掛了,存儲(chǔ)節(jié)點(diǎn)硬盤又壞了,消息丟了,怎么辦?

  1. 生產(chǎn)端首先將業(yè)務(wù)數(shù)據(jù)以及消息數(shù)據(jù)入庫(kù),需要在同一個(gè)事務(wù)中,消息數(shù)據(jù)入庫(kù)失敗,則整體回滾。字段包括:消息id,消息狀態(tài),重試次數(shù),創(chuàng)建時(shí)間等
  2. 根據(jù)消息表中消息狀態(tài),失敗則進(jìn)行消息補(bǔ)償措施,重新發(fā)送消息處理。
    rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)
2.2.3 針對(duì)消費(fèi)者
1. 方案一:ACK確認(rèn)機(jī)制

多個(gè)消費(fèi)者同時(shí)收取消息,比如消息接收到一半的時(shí)候,一個(gè)消費(fèi)者死掉了(邏輯復(fù)雜時(shí)間太長(zhǎng),超時(shí)了或者消費(fèi)被停機(jī)或者網(wǎng)絡(luò)斷開鏈接),如何保證消息不丟?

使用rabbitmq提供的ack機(jī)制,服務(wù)端首先關(guān)閉rabbitmq的自動(dòng)ack,然后每次在確保處理完這個(gè)消息之后,在代碼里手動(dòng)調(diào)用ack。這樣就可以避免消息還沒(méi)有處理完就ack。才把消息從內(nèi)存刪除。

這樣就解決了,即使一個(gè)消費(fèi)者出了問(wèn)題,但不會(huì)同步消息給服務(wù)端,會(huì)有其他的消費(fèi)端去消費(fèi),保證了消息不丟的case。

2.3 總結(jié):

rabbitmq 重復(fù)投遞,rabbitmq,java-rabbitmq,rabbitmq,網(wǎng)絡(luò)

如果需要保證消息在整條鏈路中不丟失,那就需要生產(chǎn)端、mq自身與消費(fèi)端共同去保障。

生產(chǎn)端:對(duì)生產(chǎn)的消息進(jìn)行狀態(tài)標(biāo)記,開啟confirm機(jī)制,依據(jù)mq的響應(yīng)來(lái)更新消息狀態(tài),使用定時(shí)任務(wù)重新投遞超時(shí)的消息,多次投遞失敗進(jìn)行報(bào)警。

mq自身:開啟持久化,并在落盤后再進(jìn)行ack。如果是鏡像部署模式,需要在同步到多個(gè)副本之后再進(jìn)行ack。

消費(fèi)端:開啟手動(dòng)ack模式,在業(yè)務(wù)處理完成后再進(jìn)行ack,并且需要保證冪等。

通過(guò)以上的處理,理論上不存在消息丟失的情況,但是系統(tǒng)的吞吐量以及性能有所下降。

在實(shí)際開發(fā)中,需要考慮消息丟失的影響程度,來(lái)做出對(duì)可靠性以及性能之間的權(quán)衡。

3. 消息積壓:

所謂消息積壓一般是由于消費(fèi)端消費(fèi)的速度遠(yuǎn)小于生產(chǎn)者發(fā)消息的速度,導(dǎo)致大量消息在 RabbitMQ 的隊(duì)列中無(wú)法消費(fèi)。

其實(shí)這玩意我也不知道為什么面試這么喜歡問(wèn)…既然消費(fèi)者速度跟不上生產(chǎn)者,那么提高消費(fèi)者的速度就行了呀!個(gè)人認(rèn)為有以下幾種思路:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-745496.html

  • 對(duì)生產(chǎn)者發(fā)消息接口進(jìn)行適當(dāng)限流(不太推薦,影響用戶體驗(yàn))
  • 多部署幾臺(tái)消費(fèi)者實(shí)例(推薦)
  • 適當(dāng)增加 prefetch 的數(shù)量,讓消費(fèi)端一次多接受一些消息(推薦,可以和第二種方案一起用)

4. 消息消費(fèi)順序性問(wèn)題:

到了這里,關(guān)于RabbitMq(七) -- 常見問(wèn)題:冪等性問(wèn)題(消息重復(fù)消費(fèi))、消息丟失的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ常見問(wèn)題之消息堆積

    RabbitMQ常見問(wèn)題之消息堆積

    當(dāng)生產(chǎn)者發(fā)送消息的速度超過(guò)了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。最 早接收到的消息,可能就會(huì)成為死信,會(huì)被丟棄,這就是消息堆積問(wèn)題。 解決消息堆積有三種種思路: 增加 更多消費(fèi)者 ,提高消費(fèi)速度 在消費(fèi)者內(nèi)開啟 線程

    2024年01月18日
    瀏覽(23)
  • RabbitMQ常見問(wèn)題之延遲消息

    RabbitMQ常見問(wèn)題之延遲消息

    當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信( dead letter ): 消費(fèi)者使用 basic.reject 或 basic.nack 聲明消費(fèi)失敗,并且消息的 requeue 參數(shù)設(shè)置為 false 消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi) 要投遞的隊(duì)列消息堆積滿了,最早的消息可能成為死信 如果該隊(duì)列配置了 dead

    2024年01月18日
    瀏覽(20)
  • RabbitMQ常見問(wèn)題之消息可靠性

    RabbitMQ常見問(wèn)題之消息可靠性

    MQ 的消息可靠性,將從以下四個(gè)方面展開并實(shí)踐: 生產(chǎn)者消息確認(rèn) 消息持久化 消費(fèi)者消息確認(rèn) 消費(fèi)失敗重試機(jī)制 對(duì)于 publisher ,如果 message 到達(dá) exchange 與否, rabbitmq 提供 publiser-comfirm 機(jī)制,如果 message 達(dá)到 exchange 但是是否到達(dá) queue , rabbitmq 提供 publisher-return 機(jī)制。這兩

    2024年01月18日
    瀏覽(97)
  • RaabitMQ(三) - RabbitMQ隊(duì)列類型、死信消息與死信隊(duì)列、懶隊(duì)列、集群模式、MQ常見消息問(wèn)題

    RaabitMQ(三) - RabbitMQ隊(duì)列類型、死信消息與死信隊(duì)列、懶隊(duì)列、集群模式、MQ常見消息問(wèn)題

    這是RabbitMQ最為經(jīng)典的隊(duì)列類型。在單機(jī)環(huán)境中,擁有比較高的消息可靠性。 經(jīng)典隊(duì)列可以選擇是否持久化(Durability)以及是否自動(dòng)刪除(Auto delete)兩個(gè)屬性。 Durability有兩個(gè)選項(xiàng),Durable和Transient。 Durable表示隊(duì)列會(huì)將消息保存到硬盤,這樣消息的安全性更高。但是同時(shí),由于需

    2024年02月14日
    瀏覽(1047)
  • Kafka如何保證消息的消費(fèi)順序【全局有序、局部有序】、Kafka如何保證消息不被重復(fù)消費(fèi)、Kafka為什么這么快?【重點(diǎn)】、Kafka常見問(wèn)題匯總【史上最全】

    Kafka如何保證消息的消費(fèi)順序【全局有序、局部有序】、Kafka如何保證消息不被重復(fù)消費(fèi)、Kafka為什么這么快?【重點(diǎn)】、Kafka常見問(wèn)題匯總【史上最全】

    目錄 Kafka消息生產(chǎn) 一個(gè)Topic對(duì)應(yīng)一個(gè)Partition 一個(gè)Topic對(duì)應(yīng)多個(gè)Partition Kafka消息的順序性保證(Producer、Consumer) 全局有序 局部有序? max.in.flight.requests.per.connection參數(shù)詳解 Kafka的多副本機(jī)制 Kafka的follower從leader同步數(shù)據(jù)的流程 Kafka的follower為什么不能用于消息消費(fèi) Kafka的多分區(qū)

    2024年04月11日
    瀏覽(24)
  • 【重復(fù)處理】CRUD接口冪等性處理

    非并發(fā)情況下,查詢業(yè)務(wù)單號(hào)有沒(méi)有操作過(guò),沒(méi)有則執(zhí)行操作 針對(duì)第一次執(zhí)行業(yè)務(wù)時(shí)間,有大量并發(fā)情況下,整個(gè)操作過(guò)程加鎖,通過(guò)分布式鎖來(lái)加鎖 Select操作:不會(huì)對(duì)業(yè)務(wù)數(shù)據(jù)有影響,天然冪等 Delete操作:第一次已經(jīng)刪除,第二次刪除也不會(huì)有影響 根據(jù)唯一的業(yè)務(wù)號(hào)刪除

    2024年02月12日
    瀏覽(29)
  • 【安全】Java冪等性校驗(yàn)解決重復(fù)點(diǎn)擊(6種實(shí)現(xiàn)方式)

    【安全】Java冪等性校驗(yàn)解決重復(fù)點(diǎn)擊(6種實(shí)現(xiàn)方式)

    1.1 什么是冪等? 冪等 是一個(gè)數(shù)學(xué)與計(jì)算機(jī)科學(xué)概念,英文 idempotent [a??demp?t?nt]。 在數(shù)學(xué)中,冪等用函數(shù)表達(dá)式就是: f(x) = f(f(x)) 。比如 求絕對(duì)值 的函數(shù),就是冪等的,abs(x) = abs(abs(x))。 計(jì)算機(jī)科學(xué)中,冪等表示 一次和多次請(qǐng)求某一個(gè)資源應(yīng)該具有同樣的作用 。 滿足冪

    2024年02月05日
    瀏覽(21)
  • RabbitMQ-業(yè)務(wù)的冪等性

    生產(chǎn)者和消費(fèi)者都需要添加配置類: 消費(fèi)者拿到id之后,保存到數(shù)據(jù)庫(kù),后續(xù)消費(fèi)時(shí),需要查數(shù)據(jù)庫(kù)進(jìn)行比較,因此這種方案的缺點(diǎn)就是有業(yè)務(wù)的入侵,對(duì)性有一定的影響。 (1)查詢和刪除操作本身就是冪等性操作。 (2)可以使用分布式鎖,對(duì)單據(jù)id鎖定,防止多次提交,

    2024年01月21日
    瀏覽(20)
  • RabbitMQ如何保證冪等性

    RabbitMQ如何保證冪等性

    一、簡(jiǎn)介 冪等性是分布式中比較重要的一個(gè)概念,是指在多作業(yè)操作時(shí)候避免造成重復(fù)影響,其實(shí)就是保證同一個(gè)消息不被消費(fèi)者重復(fù)消費(fèi)兩次,但是可能存在網(wǎng)絡(luò)波動(dòng)等問(wèn)題,生產(chǎn)者無(wú)法接受消費(fèi)者發(fā)送的ack信息,因此這條消息將會(huì)被重復(fù)發(fā)送給其他消費(fèi)者進(jìn)行消費(fèi),實(shí)際

    2024年02月15日
    瀏覽(22)
  • 【RabbitMQ教程】第八章 —— RabbitMQ - 冪等性、優(yōu)先級(jí)、惰性

    【RabbitMQ教程】第八章 —— RabbitMQ - 冪等性、優(yōu)先級(jí)、惰性

    ?????????????????????????????????????????????????????????????????? ?? 【 R a b b i t M Q 教程】第八章—— R a b b i t M Q ? 冪等性、優(yōu)先級(jí)、惰性 color{#FF1493}{【RabbitMQ教程】第八章 —— RabbitMQ - 冪等性、優(yōu)先級(jí)、惰性} 【 R abbi tMQ 教程】第八章

    2024年02月09日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包