国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

這篇具有很好參考價(jià)值的文章主要介紹了Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、前言

數(shù)據(jù)重復(fù)這個(gè)問(wèn)題其實(shí)也是挺正常,全鏈路都有可能會(huì)導(dǎo)致數(shù)據(jù)重復(fù)。

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

通常,消息消費(fèi)時(shí)候都會(huì)設(shè)置一定重試次數(shù)來(lái)避免網(wǎng)絡(luò)波動(dòng)造成的影響,同時(shí)帶來(lái)副作用是可能出現(xiàn)消息重復(fù)。

整理下消息重復(fù)的幾個(gè)場(chǎng)景:

  1. 生產(chǎn)端: 遇到異常,基本解決措施都是 重試
    • 場(chǎng)景一:leader分區(qū)不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區(qū)。
    • 場(chǎng)景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。
    • 場(chǎng)景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋 NetworkException 異常,等待網(wǎng)絡(luò)恢復(fù)。
  2. 消費(fèi)端: poll 一批數(shù)據(jù),處理完畢還沒(méi)提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì) poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

怎么解決?

先來(lái)了解下消息的三種投遞語(yǔ)義:

  • 最多一次(at most once): 消息只發(fā)一次,消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。例如: mqttQoS = 0。
  • 至少一次(at least once): 消息至少發(fā)一次,消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。例如: mqttQoS = 1
  • 精確一次(exactly once): 消息精確發(fā)一次,消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。例如: mqttQoS = 2。

了解了這三種語(yǔ)義,再來(lái)看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:

  1. Kafka 冪等性 Producer 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會(huì)話(重啟后就算新會(huì)話)
  2. Kafka 事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等 Producer 的局限性。
  3. **消費(fèi)端冪等: ** 保證消費(fèi)端接收消息冪等。蔸底方案。

1)Kafka 冪等性 Producer

**冪等性指:**無(wú)論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

冪等性使用示例:在生產(chǎn)端添加對(duì)應(yīng)配置即可

Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 設(shè)置冪等
props.put("acks", "all");               // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
復(fù)制代碼
  1. 設(shè)置冪等,啟動(dòng)冪等。
  2. 配置 acks,注意:一定要設(shè)置 acks=all,否則會(huì)拋異常。
  3. 配置 max.in.flight.requests.per.connection 需要 <= 5,否則會(huì)拋異常 OutOfOrderSequenceException。
    • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
    • Kafka >= 1.1, max.in.flight.request.per.connection <= 5

為了更好理解,需要了解下 Kafka 冪等機(jī)制:

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

  1. Producer 每次啟動(dòng)后,會(huì)向 Broker 申請(qǐng)一個(gè)全局唯一的 pid。(重啟后 pid 會(huì)變化,這也是弊端之一)
  2. Sequence Numbe:針對(duì)每個(gè) <Topic, Partition> 都對(duì)應(yīng)一個(gè)從0開(kāi)始單調(diào)遞增的 Sequence,同時(shí) Broker端會(huì)緩存這個(gè) seq num
  3. 判斷是否重復(fù):<pid, seq num>Broker 里對(duì)應(yīng)的隊(duì)列 ProducerStateEntry.Queue(默認(rèn)隊(duì)列長(zhǎng)度為 5)查詢(xún)是否存在
    • 如果 nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。
    • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。
    • 反之,要么重復(fù),要么丟消息,均拒絕。

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

這種設(shè)計(jì)針對(duì)解決了兩個(gè)問(wèn)題:

  1. 消息重復(fù): 場(chǎng)景 Broker 保存消息后還沒(méi)發(fā)送 ack 就宕機(jī)了,這時(shí)候 Producer 就會(huì)重試,這就造成消息重復(fù)。
  2. 消息亂序: 避免場(chǎng)景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。

那什么時(shí)候該使用冪等:

  1. 如果已經(jīng)使用 acks=all,使用冪等也可以。
  2. 如果已經(jīng)使用 acks=0 或者 acks=1,說(shuō)明你的系統(tǒng)追求高性能,對(duì)數(shù)據(jù)一致性要求不高。不要使用冪等。

2)Kafka 事務(wù)

使用 Kafka 事務(wù)解決冪等的弊端:?jiǎn)螘?huì)話且單分區(qū)冪等。

Tips 這塊篇幅較長(zhǎng),這先稍微提及下使用,之后另起一篇。

事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端

Properties props = new Properties();
props.put("enable.idempotence", ture);  // 1. 設(shè)置冪等
props.put("acks", "all");               // 2. 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數(shù)
props.put("transactional.id", "my-transactional-id");  // 4. 設(shè)定事務(wù) id

Producer<String, String> producer = new KafkaProducer<String, String>(props);

// 初始化事務(wù)
producer.initTransactions();

try{
    // 開(kāi)始事務(wù)
    producer.beginTransaction();

    // 發(fā)送數(shù)據(jù)
    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
 
    // 數(shù)據(jù)發(fā)送及 Offset 發(fā)送均成功的情況下,提交事務(wù)
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 數(shù)據(jù)發(fā)送或者 Offset 發(fā)送出現(xiàn)異常時(shí),終止事務(wù)
    producer.abortTransaction();
} finally {
    // 關(guān)閉 Producer 和 Consumer
    producer.close();
    consumer.close();
}
復(fù)制代碼

這里消費(fèi)端 Consumer 需要設(shè)置下配置:isolation.level 參數(shù)

  • read_uncommitted 這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫(xiě)入的任何消息,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫(xiě)入的消息都可以讀取。如果你用了事務(wù)型 Producer,那么對(duì)應(yīng)的 Consumer 就不要使用這個(gè)值。

  • read_committed 表明 Consumer 只會(huì)讀取事務(wù)型 Producer 成功提交事務(wù)寫(xiě)入的消息。當(dāng)然了,它也能看到非事務(wù)型 Producer 寫(xiě)入的所有消息。

3)消費(fèi)端冪等

“如何解決消息重復(fù)?” 這個(gè)問(wèn)題,其實(shí)換一種說(shuō)法:就是如何解決消費(fèi)端冪等性問(wèn)題。

只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問(wèn)題也就解決了。

典型的方案是使用:消息表,來(lái)去重:

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

  • 上述栗子中,消費(fèi)端拉取到一條消息后,開(kāi)啟事務(wù),將消息Id 新增到本地消息表中,同時(shí)更新訂單信息。
  • 如果消息重復(fù),則新增操作 insert 會(huì)異常,同時(shí)觸發(fā)事務(wù)回滾。


?

二、案例:Kafka 冪等性 Producer 使用

環(huán)境搭建可參考:鏈接

準(zhǔn)備工作如下:

  1. Zookeeper:本地使用 Docker 啟動(dòng)

    $ docker run -d --name zookeeper -p 2181:2181 zookeeper
    a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
    復(fù)制代碼
  2. Kafka:版本 2.7.1,源碼編譯啟動(dòng)(看上文源碼搭建啟動(dòng))

  3. 啟動(dòng)生產(chǎn)者:Kafka 源碼中 exmaple

  4. 啟動(dòng)消息者:可以用 Kafka 提供的腳本

    # 舉個(gè)栗子:topic 需要自己去修改
    $ cd ./kafka-2.7.1-src/bin
    $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
    復(fù)制代碼

創(chuàng)建 topic 1副本,2 分區(qū)

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2

# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
復(fù)制代碼

生產(chǎn)者代碼:

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

public class KafkaProducerApplication {

    private final Producer<String, String> producer;
    final String outTopic;

    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic) {
        this.producer = producer;
        outTopic = topic;
    }

    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord 
            = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
        );
    }

    public void shutdown() {
        producer.close();
    }

    public static void main(String[] args) {

        final Properties props = new Properties();

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = "myTopic";
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream().filter(l -> !l.trim().isEmpty())
                    .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
            producerApp.shutdown();
        }
    }
}
復(fù)制代碼

啟動(dòng)生產(chǎn)者后,控制臺(tái)輸出如下:

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

啟動(dòng)消費(fèi)者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
復(fù)制代碼

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

修改配置 acks

啟用冪等的情況下,調(diào)整 acks 配置,生產(chǎn)者啟動(dòng)后結(jié)果是怎樣的:

  • 修改配置 acks = 1
  • 修改配置 acks = 0

會(huì)直接報(bào)錯(cuò):

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. 
Otherwise we cannot guarantee idempotence.
復(fù)制代碼

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

修改配置 max.in.flight.requests.per.connection

啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:

  • max.in.flight.requests.per.connection > 5 會(huì)怎樣?

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)

當(dāng)然會(huì)報(bào)錯(cuò):

Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
復(fù)制代碼

Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-417250.html

到了這里,關(guān)于Kafka 數(shù)據(jù)重復(fù)怎么辦?(案例)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 領(lǐng)導(dǎo)臨時(shí)要數(shù)據(jù)庫(kù)文檔怎么辦?

    領(lǐng)導(dǎo)臨時(shí)要數(shù)據(jù)庫(kù)文檔怎么辦?

    很多時(shí)候,我們?yōu)榱酥泵炮s項(xiàng)目進(jìn)度,很容易忽略整理文檔這件事 某一天,領(lǐng)導(dǎo)心血來(lái)潮,要搞一次突擊檢查, 想看看我們的數(shù)據(jù)庫(kù)設(shè)計(jì)的是否規(guī)范, 但他又不想親自去數(shù)據(jù)庫(kù)查驗(yàn)(畢竟這么大領(lǐng)導(dǎo)) 那么,我們?cè)撛趺崔k? 第一種方法:離職,世界那么大,我想去看

    2024年02月08日
    瀏覽(22)
  • 電腦提示數(shù)據(jù)錯(cuò)誤循環(huán)冗余檢查怎么辦?

    電腦提示數(shù)據(jù)錯(cuò)誤循環(huán)冗余檢查怎么辦?

    有些時(shí)候,我們嘗試在磁盤(pán)上創(chuàng)建分區(qū)或清理硬盤(pán)時(shí),還可能會(huì)遇到這個(gè)問(wèn)題:數(shù)據(jù)錯(cuò)誤循環(huán)冗余檢查。這是如何導(dǎo)致的呢?我們又該如何解決這個(gè)問(wèn)題呢?下面我們就來(lái)了解一下。 數(shù)據(jù)錯(cuò)誤循環(huán)冗余檢查怎么解決呢?在回答這個(gè)問(wèn)題之前我們需要先了解一下導(dǎo)致問(wèn)題的原因

    2024年02月12日
    瀏覽(30)
  • 數(shù)據(jù)庫(kù)同步 Elasticsearch 后數(shù)據(jù)不一致,怎么辦?

    數(shù)據(jù)庫(kù)同步 Elasticsearch 后數(shù)據(jù)不一致,怎么辦?

    Q1:Logstash 同步 postgreSQL 到 Elasticsearch 數(shù)據(jù)不一致。 在使用 Logstash 從 pg 庫(kù)中將一張表導(dǎo)入到 ES 中時(shí),發(fā)現(xiàn) ES 中的數(shù)據(jù)量和 PG 庫(kù)中的這張表的數(shù)據(jù)量存在較大差距。如何快速比對(duì)哪些數(shù)據(jù)沒(méi)有插入?導(dǎo)入過(guò)程中,Logstash 日志沒(méi)有異常。PG 中這張表有 7600W。 Q2:mq 異步雙寫(xiě)數(shù)

    2024年02月15日
    瀏覽(23)
  • 數(shù)據(jù)分析過(guò)程中,發(fā)現(xiàn)數(shù)值缺失,怎么辦?

    按照數(shù)據(jù)缺失機(jī)制,數(shù)據(jù)分析過(guò)程中,我們可以將其分為以下幾類(lèi): (1)完全隨機(jī)缺失(MCAR):所缺失的數(shù)據(jù)發(fā)生的概率既與已觀察到的數(shù)據(jù)無(wú)關(guān),也與未觀察到的數(shù)據(jù)無(wú)關(guān)。 (2)隨機(jī)缺失(MAR):假設(shè)缺失數(shù)據(jù)發(fā)生的概率與所觀察到的變量是有關(guān)的,而與未觀察到的數(shù)

    2024年02月05日
    瀏覽(23)
  • app滲透測(cè)試抓不到數(shù)據(jù)包怎么辦?

    app滲透測(cè)試抓不到數(shù)據(jù)包怎么辦?

    1、app滲透測(cè)試常見(jiàn)的模擬器有夜神,閃電模擬器等。 2、遇到問(wèn)題,數(shù)據(jù)包抓不到? (1)最常見(jiàn)的解決方法調(diào)低模擬器安卓版本,使用MuMu模擬器,大部分app都適用,但是也有些app無(wú)法抓取。 (2)安裝證書(shū)到系統(tǒng)根證書(shū),可以解決上面的問(wèn)題(一般情況下手機(jī)在安裝了BurpSuite的偽證

    2024年02月13日
    瀏覽(22)
  • 服務(wù)器數(shù)據(jù)被盜了該怎么辦

    隨著互聯(lián)網(wǎng)的發(fā)展,如今人們習(xí)慣了在互聯(lián)網(wǎng)上分享生活、購(gòu)物等等。便捷了人們的生活,也讓互聯(lián)網(wǎng)企業(yè)蓬勃生機(jī),但同時(shí)也暗藏著危機(jī)。其中服務(wù)器被入侵是常見(jiàn)的黑客攻擊方式,不僅會(huì)給企業(yè)帶來(lái)經(jīng)濟(jì)損失,同時(shí)也讓企業(yè)在公眾面前失去了信譽(yù)。下面我們來(lái)看看一些案

    2024年02月04日
    瀏覽(27)
  • mysql數(shù)據(jù)庫(kù)忘記密碼了怎么辦

    mysql數(shù)據(jù)庫(kù)忘記密碼了怎么辦

    本人用的mysql8版本 看到網(wǎng)上很多教程,什么修改配置文件my.ini。在8版本根本沒(méi)用。以下是8版本解決辦法。親測(cè)可用。 1、用管理員身份打開(kāi)命令行工具。(強(qiáng)調(diào):管理員身份) 2、停止mysql服務(wù): 3、輸入以下命令無(wú)密碼啟動(dòng)mysql 4、 另開(kāi)一個(gè)命令行窗口,輸入mysql -u root無(wú)密

    2024年02月11日
    瀏覽(22)
  • MySQL數(shù)據(jù)庫(kù)忘記密碼怎么辦?教你一招

    MySQL數(shù)據(jù)庫(kù)忘記密碼怎么辦?教你一招

    文章目錄 1.以管理員身份打開(kāi)cmd,關(guān)閉Mysql服務(wù) 2. 跳過(guò)密碼授權(quán)登錄 ?3.再繼續(xù)以管理員身份打開(kāi)一個(gè)cmd窗口,進(jìn)行重置密碼 ?4.使用新密碼重新登錄mysql驗(yàn)證 ?5.使用Navicat可視化工具連接Mysql Mysql數(shù)據(jù)庫(kù)之前安裝好了,但是突然忘記當(dāng)初自己設(shè)置的登錄密碼了,導(dǎo)致使用Navi

    2024年02月04日
    瀏覽(430)
  • SQL Server 數(shù)據(jù)庫(kù)變成單個(gè)用戶(hù)怎么辦

    SQL Server 數(shù)據(jù)庫(kù)變成單個(gè)用戶(hù)怎么辦

    參考技術(shù)A 1、首先我們打開(kāi)SQL? SERVER的管理控制臺(tái),找到一個(gè)要設(shè)置角色的用戶(hù)。 2、下面我們將為這個(gè)用戶(hù)賦予創(chuàng)建數(shù)據(jù)庫(kù)的角色,我們先用這個(gè)用戶(hù)登錄管理工具看一下是否具有創(chuàng)建用戶(hù)的權(quán)限。 3、進(jìn)行數(shù)據(jù)庫(kù)創(chuàng)建的時(shí)候,提示如下的錯(cuò)誤,證明這個(gè)用戶(hù)不具備這個(gè)角色

    2024年02月03日
    瀏覽(89)
  • CAD數(shù)據(jù)導(dǎo)入到ArcGIS中出現(xiàn)亂碼怎么辦?

    CAD數(shù)據(jù)導(dǎo)入到ArcGIS中出現(xiàn)亂碼怎么辦?

    在項(xiàng)目中,我們通常會(huì)涉及到將甲方提供的CAD數(shù)據(jù)提取成標(biāo)準(zhǔn).SHP數(shù)據(jù),但是CAD它導(dǎo)入到ArcGIS Desktop中竟然亂碼了,標(biāo)注看不清了,好頭大。其實(shí)是字符集的問(wèn)題,將注冊(cè)表中的字符集改為簡(jiǎn)體中文就可以解決,下面來(lái)看解決方法。 在windows搜索框中輸入“regedit”。 點(diǎn)擊“注冊(cè)

    2024年02月05日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包