国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!

這篇具有很好參考價(jià)值的文章主要介紹了一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq

一、前言

數(shù)據(jù)重復(fù)這個(gè)問(wèn)題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。
一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq
通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。

整理下消息重復(fù)的幾個(gè)場(chǎng)景:
  1. 生產(chǎn)端: 遇到異常,基本解決措施都是 重試 。
  • 場(chǎng)景一:leader分區(qū)不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區(qū)。
  • 場(chǎng)景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。
  • 場(chǎng)景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋 NetworkException 異常,等待網(wǎng)絡(luò)恢復(fù)。
  1. 消費(fèi)端:poll一批數(shù)據(jù),處理完畢還沒(méi)提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì)poll上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

怎么解決?
先來(lái)了解下消息的三種投遞語(yǔ)義:

  • 最多一次( at most once): 消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如:mqttQoS = 0
  • 至少一次( at least once): 消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如:mqtt QoS = 1
  • 精確一次( exactly once): 消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如:mqtt QoS = 2。

了解了這三種語(yǔ)義,再來(lái)看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:

  1. Kafka 冪等性 Producer 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話)
  2. Kafka 事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等 Producer 的局限性。
  3. 消費(fèi)端冪等:保證消費(fèi)端接收消息冪等。蔸底方案。
Kafka 冪等性 Producer

冪等性指 :無(wú)論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可

Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設(shè)置冪等
props.put("acks", "all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
  1. 設(shè)置冪等,啟動(dòng)冪等。
  2. 配置 acks,注意:一定要設(shè)置 acks=all,否則會(huì)拋異常。
  3. 配置 max.in.flight.requests.per.connection 需要 <= 5 ,否則會(huì)拋異常 OutOfOrderSequenceException。
  • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
  • Kafka >= 1.1, max.in.flight.request.per.connection <= 5

為了更好理解,需要了解下Kafka 冪等機(jī)制:
一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq

  1. Producer 每次啟動(dòng)后,會(huì)向 Broker 申請(qǐng)一個(gè)全局唯一的 pid。(重啟后 pid 會(huì)變化,這也是弊端之一)
  2. Sequence Numbe:針對(duì)每個(gè) <Topic, Partition> 都對(duì)應(yīng)一個(gè)從0開(kāi)始單調(diào)遞增的 Sequence,同時(shí) Broker端會(huì)緩存這個(gè) seq num
  3. 判斷是否重復(fù): 拿 <pid, seq num>Broker 里對(duì)應(yīng)的隊(duì)列 ProducerStateEntry.Queue(默認(rèn)隊(duì)列長(zhǎng)度為 5)查詢是否存在
  • 如果 nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。
  • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。
  • 反之,要么重復(fù),要么丟消息,均拒絕。

一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq
這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題:

  1. 消息重復(fù): 場(chǎng)景 Broker 保存消息后還沒(méi)發(fā)送 ack 就宕機(jī)了,這時(shí)候Producer就會(huì)重試,這就造成消息重復(fù)。
  2. 消息亂序: 避免場(chǎng)景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。

那什么時(shí)候該使用冪等:
3. 如果已經(jīng)使用 acks=all,使用冪等也可以。
4. 如果已經(jīng)使用 acks=0 或者 acks=1,說(shuō)明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。

Kafka 事務(wù)

使用 Kafka 事務(wù)解決冪等的弊端:?jiǎn)螘?huì)話且單分區(qū)冪等。
Tips: 這塊篇幅較長(zhǎng),這先稍微提及下使用,之后另起一篇。

事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端

Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設(shè)置冪等
props.put("acks", "all"); // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數(shù)
props.put("transactional.id", "my-transactional-id"); // 4. 設(shè)定事務(wù) id

Producer<String, String> producer = new KafkaProducer<String, String>(props);

// 初始化事務(wù)
producer.initTransactions();

try{
    // 開(kāi)始事務(wù)
    producer.beginTransaction();

    // 發(fā)送數(shù)據(jù)
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
 
    // 數(shù)據(jù)發(fā)送及 Offset 發(fā)送均成功的情況下,提交事務(wù)
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 數(shù)據(jù)發(fā)送或者 Offset 發(fā)送出現(xiàn)異常時(shí),終止事務(wù)
    producer.abortTransaction();
} finally {
    // 關(guān)閉 Producer 和 Consumer
    producer.close();
    consumer.close();
}

這里消費(fèi)端 Consumer 需要設(shè)置下配置:isolation.level 參數(shù)

  • read_uncommitted: 這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫(xiě)入的任何消息,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫(xiě)入的消息都可以讀取。如果你用了事務(wù)型 Producer,那么對(duì)應(yīng)的 Consumer 就不要使用這個(gè)值。
  • read_committed: 表明 Consumer 只會(huì)讀取事務(wù)型 Producer 成功提交事務(wù)寫(xiě)入的消息。當(dāng)然了,它也能看到非事務(wù)型 Producer 寫(xiě)入的所有消息。
消費(fèi)端冪等

“如何解決消息重復(fù)?” 這個(gè)問(wèn)題,其實(shí)換一種說(shuō)法:就是如何解決消費(fèi)端冪等性問(wèn)題。
只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。

典型的方案是使用:消息表,來(lái)去重:
一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq

  • 上述例子中,消費(fèi)端拉取到一條消息后,開(kāi)啟事務(wù),將消息Id 新增到本地消息表中,同時(shí)更新訂單信息。
  • 如果消息重復(fù),則新增操作insert會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。

二、案例:Kafka 冪等性 Producer 使用

環(huán)境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

準(zhǔn)備工作如下:

  1. Zookeeper:本地使用 Docker 啟動(dòng)
$ docker run -d --name zookeeper -p 2181:2181 zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
  1. Kafka:版本 2.7.1,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng))
  2. 啟動(dòng)生產(chǎn)者:Kafka 源碼中 exmaple
  3. 啟動(dòng)消息者:可以用 Kafka 提供的腳本
# 舉個(gè)栗子:topic 需要自己去修改
$ cd ./kafka-2.7.1-src/bin
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

創(chuàng)建 topic 1副本,2 分區(qū)

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

生產(chǎn)者代碼:
一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq

public class KafkaProducerApplication {

    private final Producer<String, String> producer;
    final String outTopic;

    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic) {
        this.producer = producer;
        outTopic = topic;
    }

    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord
            = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }

    public void shutdown() {
        producer.close();
    }

    public static void main(String[] args) {

        final Properties props = new Properties();

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}

啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:
一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq
啟動(dòng)消費(fèi)者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic

一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq

修改配置 acks
啟用冪等的情況下,調(diào)整 acks 配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:

  • 修改配置 acks = 1
  • 修改配置 acks = 0

會(huì)直接報(bào)錯(cuò):

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
Otherwise we cannot guarantee idempotence.

一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq
修改配置 max.in.flight.requests.per.connection

啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:
max.in.flight.requests.per.connection > 5 會(huì)怎樣?
一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq
當(dāng)然會(huì)報(bào)錯(cuò):

Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.

一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!,隨筆,日常開(kāi)發(fā)問(wèn)題集錦,kafka,linq文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-818720.html

到了這里,關(guān)于一碰就頭疼的 Kafka 消息重復(fù)問(wèn)題,立馬解決!的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka數(shù)據(jù)重復(fù)問(wèn)題解決方案

    通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。 冪等性指: 冪等性使用示例: 為了更好理解,需要了解下Kafka冪等機(jī)制 這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題: 那什么時(shí)候該使用冪等: 事務(wù)使用示例:分為生產(chǎn)端 和

    2024年02月07日
    瀏覽(20)
  • 【RabbitMQ | 第六篇】消息重復(fù)消費(fèi)問(wèn)題及解決方案

    【RabbitMQ | 第六篇】消息重復(fù)消費(fèi)問(wèn)題及解決方案

    什么是 消息重復(fù)消費(fèi) ?首先我們來(lái)看一下消息的傳輸流程。消息生產(chǎn)者–MQ–消息消費(fèi)者;消息生產(chǎn)者發(fā)送消息到MQ服務(wù)器,MQ服務(wù)器存儲(chǔ)消息,消息消費(fèi)者監(jiān)聽(tīng)MQ的消息,發(fā)現(xiàn)有消息就消費(fèi)消息。 所以消息重復(fù)也就出現(xiàn)在 兩個(gè)階段 1 :生產(chǎn)者多發(fā)送了消息給MQ; 2 :MQ的一條

    2024年04月26日
    瀏覽(25)
  • 阿里三面:MQ 消息丟失、重復(fù)、積壓?jiǎn)栴},如何解決?

    阿里三面:MQ 消息丟失、重復(fù)、積壓?jiǎn)栴},如何解決?

    作者:美得讓人心動(dòng) 來(lái)源:https://blog.csdn.net/gu131007416553/article/details/120934738 面試官在面試候選人時(shí),如果發(fā)現(xiàn)候選人的簡(jiǎn)歷中寫(xiě)了在項(xiàng)目中使用了 MQ 技術(shù)(如 Kafka、RabbitMQ、RocketMQ),基本都會(huì)拋出一個(gè)問(wèn)題:在使用 MQ 的時(shí)候,怎么確保消息 100% 不丟失? 這個(gè)問(wèn)題在實(shí)際工

    2024年02月09日
    瀏覽(26)
  • RabbitMQ消息丟失、消息重復(fù)消費(fèi)、消息順序性無(wú)法保證、消息積壓、一致性問(wèn)題、系統(tǒng)可用性降低等這些常見(jiàn)問(wèn)題怎么解決

    該文章專注于面試,面試只要回答關(guān)鍵點(diǎn)即可,不需要對(duì)框架有非常深入的回答,如果你想應(yīng)付面試,是足夠了,抓住關(guān)鍵點(diǎn) 1. 消息丟失 問(wèn)題 :在生產(chǎn)者發(fā)送消息到MQ、MQ內(nèi)部處理、消費(fèi)者接收消息的任一環(huán)節(jié)都可能導(dǎo)致消息丟失。 解決方案 : 生產(chǎn)者確認(rèn)機(jī)制 :確保消息

    2024年04月25日
    瀏覽(27)
  • 解決Kafka新消費(fèi)者組導(dǎo)致重復(fù)消費(fèi)的問(wèn)題

    ???????? 問(wèn)題描述 :在使用Kafka時(shí),當(dāng)我們向新的消費(fèi)者組中添加消費(fèi)者時(shí),可能會(huì)遇到重復(fù)消費(fèi)的問(wèn)題。本文將介紹一些解決這個(gè)問(wèn)題的方法,幫助開(kāi)發(fā)者更好地處理Kafka中的消費(fèi)者組和消費(fèi)偏移量。 ????????Kafka是一個(gè)強(qiáng)大的分布式消息隊(duì)列系統(tǒng),但在使用過(guò)程中

    2024年02月07日
    瀏覽(18)
  • 記一次線上kafka重復(fù)消費(fèi)的問(wèn)題解決及思考

    線上ELK日志發(fā)現(xiàn)kafka消費(fèi)者消費(fèi)到重復(fù)消息 由于生產(chǎn)方本身就發(fā)送了重復(fù)的消息,導(dǎo)致消費(fèi)到重復(fù)消息 消費(fèi)方采用的是循環(huán)poll的模式,具體是在多線程分租戶去批量處理的消息

    2024年02月10日
    瀏覽(19)
  • kafka如何避免消息重復(fù)消費(fèi)

    kafka如何避免消息重復(fù)消費(fèi)

    Kafka 避免消息重復(fù)消費(fèi)通常依賴于以下策略和機(jī)制: Kafka使用Consumer Group ID來(lái)跟蹤每個(gè)消費(fèi)者所讀取的消息。確保每個(gè)消費(fèi)者都具有唯一的Group ID。如果多個(gè)消費(fèi)者屬于同一個(gè)Group ID,那么它們將共享消息,但每個(gè)分區(qū)的消息只能由一個(gè)消費(fèi)者處理。 Kafka會(huì)記錄每個(gè)消費(fèi)者組消

    2024年01月15日
    瀏覽(23)
  • Kafka如何解決消息丟失的問(wèn)題

    在 Kafka 的整個(gè)架構(gòu)中可以總結(jié)出消息有三次傳遞的過(guò)程: Producer 端發(fā)送消息給 Broker 端 Broker 將消息進(jìn)行并持久化數(shù)據(jù) Consumer 端從 Broker 將消息拉取并進(jìn)行消費(fèi) 在以上這三步中每一步都可能會(huì)出現(xiàn)丟失數(shù)據(jù)的情況, 那么 Kafka 到底在什么情況下才能保證消息不丟失呢? Produ

    2024年02月12日
    瀏覽(17)
  • kafka如何保證消息不被重復(fù)消費(fèi)

    kafka如何保證消息不被重復(fù)消費(fèi)

    (1)kafka有個(gè)offset的概念,當(dāng)每個(gè)消息被寫(xiě)進(jìn)去后,都有一個(gè)offset,代表他的序號(hào),然后consumer消費(fèi)該數(shù)據(jù)之后,隔一段時(shí)間,會(huì)把自己消費(fèi)過(guò)的消息的offset提交一下,代表我已經(jīng)消費(fèi)過(guò)了。下次我要是重啟,就會(huì)繼續(xù)從上次消費(fèi)到的offset來(lái)繼續(xù)消費(fèi)。但是當(dāng)我們直接kill進(jìn)程

    2024年02月11日
    瀏覽(26)
  • 防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實(shí)踐

    防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實(shí)踐

    上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka 架構(gòu)必備能力——kafka的選型對(duì)比及應(yīng)用場(chǎng)景 Kafka存取原理與實(shí)現(xiàn)分析,打破面試難關(guān) 在上一章內(nèi)容中,我們解析了Kafka在讀寫(xiě)層面上的原理,介紹了很多Kafka在讀出與寫(xiě)入時(shí)的

    2024年02月08日
    瀏覽(26)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包