国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

實現(xiàn) Kafka 分區(qū)內(nèi)消費者多線程順序消費

這篇具有很好參考價值的文章主要介紹了實現(xiàn) Kafka 分區(qū)內(nèi)消費者多線程順序消費。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

在1個topic中,有3個partition,那么如何保證數(shù)據(jù)的順序消費?

生產(chǎn)者在寫的時候,可以指定一個 key,被分發(fā)到同一個 partition 中去,而且這個 partition 中的數(shù)據(jù)一定是有順序的。

消費者從 partition 中取出來數(shù)據(jù)的時候,也一定是有順序的。到這里,順序還是沒有錯亂的。

但是消費者里可能會有多個線程來并發(fā)處理消息,而多個線程并發(fā)處理的話,順序可能就亂掉了。

解決方案

寫?n 個 queue,將具有相同key的數(shù)據(jù)都存儲在同一個 queue,然后對于 n 個線程,每個線程分別消費一個 queue 即可,并手動提交位點。由于 kafka consumer 實例不支持多線程同時提交位點,這里采取全局記數(shù)器的方式,在每一批次記錄的消費過程中,每消費完一條記錄則全局記數(shù)器加 1,全局記數(shù)器等于這一批記錄的總條數(shù)時提交位點。

在Java中,可以使用多線程和隊列來實現(xiàn)對具有相同 key 的數(shù)據(jù)進(jìn)行消費,并通過手動提交位點來保證數(shù)據(jù)的消費。以下是一個帶有手動位點提交的解決方案的示例代碼:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class DataConsumer {
    private Map<String, BlockingQueue<String>> queues;
    private Map<String, Integer> offsets;

    public DataConsumer(int numThreads) {
        queues = new HashMap<>();
        offsets = new HashMap<>();

        // 創(chuàng)建N個隊列和位點
        for (int i = 0; i < numThreads; i++) {
            BlockingQueue<String> queue = new LinkedBlockingQueue<>();
            String key = Integer.toString(i);
            queues.put(key, queue);
            offsets.put(key, 0);

            // 創(chuàng)建并啟動消費線程
            Thread consumerThread = new Thread(new Consumer(queue, key));
            consumerThread.start();
        }
    }

    public void consumeData(String key, String data) {
        BlockingQueue<String> queue = queues.get(key);
        if (queue != null) {
            try {
                // 將數(shù)據(jù)放入對應(yīng)的隊列
                queue.put(data);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void commitOffset(String key, int offset) {
        offsets.put(key, offset);
        System.out.println("Committed offset for key " + key + ": " + offset);
    }

    private static class Consumer implements Runnable {
        private final BlockingQueue<String> queue;
        private final String key;
        private int offset;

        public Consumer(BlockingQueue<String> queue, String key) {
            this.queue = queue;
            this.key = key;
            this.offset = 0;
        }

        @Override
        public void run() {
            // 消費隊列中的數(shù)據(jù)
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String data = queue.take();
                    // 進(jìn)行消費邏輯
                    System.out.println("Consumed data: " + data);
                    offset++;

                    // 模擬提交位點
                    if (offset % 10 == 0) {
                        DataConsumer.getInstance().commitOffset(key, offset);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private static DataConsumer instance;

    public static synchronized DataConsumer getInstance() {
        if (instance == null) {
            instance = new DataConsumer(3);
        }
        return instance;
    }

    public static void main(String[] args) {
        DataConsumer dataConsumer = DataConsumer.getInstance();

        // 模擬產(chǎn)生數(shù)據(jù)
        for (int i = 0; i < 30; i++) {
            dataConsumer.consumeData(Integer.toString(i % 3), "Data " + (i + 1));
        }
    }
}

在以上代碼中,DataConsumer 類維護(hù)了一個 Map 來存儲隊列和位點的關(guān)系。每個消費者線程都有一個對應(yīng)的位點來記錄消費的進(jìn)度。

在 commitOffset 方法中,根據(jù) key 提交位點的偏移值。

消費線程在每次成功消費一條數(shù)據(jù)后,更新位點,并判斷是否滿足提交位點的條件。這里模擬每消費10條數(shù)據(jù)提交一次位點。

在 main 方法中,通過 consumeData 方法模擬了產(chǎn)生了30條數(shù)據(jù),并將它們放入不同的隊列中進(jìn)行消費。文章來源地址http://www.zghlxwxcb.cn/news/detail-729626.html

到了這里,關(guān)于實現(xiàn) Kafka 分區(qū)內(nèi)消費者多線程順序消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • kafka消費者api和分區(qū)分配和offset消費

    kafka消費者api和分區(qū)分配和offset消費

    消費者的消費方式為主動從broker拉取消息,由于消費者的消費速度不同,由broker決定消息發(fā)送速度難以適應(yīng)所有消費者的能力 拉取數(shù)據(jù)的問題在于,消費者可能會獲得空數(shù)據(jù) Consumer Group(CG):消費者組 由多個consumer組成。形成一個消費者組的條件,是所有消費者的groupid相同

    2024年02月16日
    瀏覽(25)
  • kafka復(fù)習(xí):(22)一個分區(qū)只能被消費者組中的一個消費者消費嗎?

    kafka復(fù)習(xí):(22)一個分區(qū)只能被消費者組中的一個消費者消費嗎?

    默認(rèn)情況下,一個分區(qū)只能被消費者組中的一個消費者消費。但可以自定義PartitionAssignor來打破這個限制。 一、自定義PartitionAssignor. 二、定義兩個消費者,給其配置上述PartitionAssignor. 在kafka創(chuàng)建只有一個分區(qū)的topic : study2023 創(chuàng)建一個生產(chǎn)者往study2023這個 topic發(fā)送消息: 分別

    2024年02月10日
    瀏覽(25)
  • Kafka有幾種消費者分區(qū)分配策略?

    Kafka有幾種消費者分區(qū)分配策略?

    Range范圍分配策略是Kafka默認(rèn)的分配策略,它可以確保每個消費者消費的分區(qū)數(shù)量是均衡的。 注意:Rangle范圍分配策略是針對每個Topic的。 配置 配置消費者的partition.assignment.strategy為org.apache.kafka.clients.consumer.RangeAssignor。 算法公式 n = 分區(qū)數(shù)量 / 消費者數(shù)量 m = 分區(qū)數(shù)量 % 消費

    2024年02月08日
    瀏覽(18)
  • kafka消費者組的分區(qū)分配策略

    一個consumer group有多個consumer,一個topic有多個partition,所以就會設(shè)計到分區(qū)分配的問題,需要確定哪些分區(qū)由哪些消費者消費。 當(dāng)消費者組中的消費者發(fā)生變化,減少或者增加的時候,就會執(zhí)行分區(qū)分配策略,需要重新洗牌。 分區(qū)分配策略主要有兩種,第一種是Range范圍分區(qū)

    2024年02月16日
    瀏覽(24)
  • Kafka3.0.0版本——消費者(獨立消費者消費某一個主題中某個分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    Kafka3.0.0版本——消費者(獨立消費者消費某一個主題中某個分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    1.1、案例需求 創(chuàng)建一個獨立消費者,消費firstTopic主題 0 號分區(qū)的數(shù)據(jù),所下圖所示: 1.2、案例代碼 生產(chǎn)者往firstTopic主題 0 號分區(qū)發(fā)送數(shù)據(jù)代碼 消費者消費firstTopic主題 0 分區(qū)數(shù)據(jù)代碼 1.3、測試 在 IDEA 中執(zhí)行消費者程序,如下圖: 在 IDEA 中執(zhí)行生產(chǎn)者程序 ,在控制臺觀察

    2024年02月09日
    瀏覽(34)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負(fù)載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準(zhǔn)備 創(chuàng)建主題 test 有5個分區(qū),準(zhǔn)備 3 個消費者并進(jìn)行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    01. Kafka 消費者分區(qū)再均衡是什么? 消費者群組里的消費者共享主題分區(qū)的所有權(quán)。當(dāng)一個新消費者加入群組時,它將開始讀取一部分原本由其他消費者讀取的消息。當(dāng)一個消費者被關(guān)閉或發(fā)生崩潰時,它將離開群組,原本由它讀取的分區(qū)將由群組里的其他消費者讀取。 分區(qū)

    2024年02月12日
    瀏覽(31)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號從0開始計算 Partition:分區(qū)數(shù),該主題有3個分區(qū) Replica:副本數(shù),該主題有3個副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會報錯 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka及Kafka消費者的消費問題及線程問題

    Topic:是 Kafka 消息發(fā)布和訂閱的基本單元,同時也是消息的容器。Topic 中的消息被分割成多個分區(qū)進(jìn)行存儲和處理。 Partition:是 Topic 分區(qū),將 Topic 細(xì)分成多個分區(qū),每個分區(qū)可以獨立地存儲在不同的 Broker 中,從而增加了消息的并發(fā)性、可擴展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    瀏覽(29)
  • Kafka系列之:記錄一次Kafka Topic分區(qū)擴容,但是下游flink消費者沒有自動消費新的分區(qū)的解決方法

    Kafka系列之:記錄一次Kafka Topic分區(qū)擴容,但是下游flink消費者沒有自動消費新的分區(qū)的解決方法

    生產(chǎn)環(huán)境Kafka集群壓力大,Topic讀寫壓力大,消費的lag比較大,因此通過擴容Topic的分區(qū),增大Topic的讀寫性能 理論上下游消費者應(yīng)該能夠自動消費到新的分區(qū),例如flume消費到了新的分區(qū),但是實際情況是存在flink消費者沒有消費到新的分區(qū) 出現(xiàn)無法消費topic新的分區(qū)這種情況

    2024年02月14日
    瀏覽(63)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包