国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【項(xiàng)目實(shí)戰(zhàn)】Kafka 生產(chǎn)者寫入分區(qū)的策略

這篇具有很好參考價(jià)值的文章主要介紹了【項(xiàng)目實(shí)戰(zhàn)】Kafka 生產(chǎn)者寫入分區(qū)的策略。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

??博主介紹: 博主從事應(yīng)用安全和大數(shù)據(jù)領(lǐng)域,有8年研發(fā)經(jīng)驗(yàn),5年面試官經(jīng)驗(yàn),Java技術(shù)專家,WEB架構(gòu)師,阿里云專家博主,華為云云享專家,51CTO TOP紅人

Java知識圖譜點(diǎn)擊鏈接:體系化學(xué)習(xí)Java(Java面試專題)

???? 感興趣的同學(xué)可以收藏關(guān)注下 ,不然下次找不到喲????

?? 感覺對你有幫助的朋友,可以給博主一個(gè)三連,非常感謝 ??????

【項(xiàng)目實(shí)戰(zhàn)】Kafka 生產(chǎn)者寫入分區(qū)的策略,中間件,kafka,linq,分布式,原力計(jì)劃

1、生產(chǎn)者寫入分區(qū)的策略有哪些?

生產(chǎn)者寫入分區(qū)的策略主要有以下幾種:

  1. 輪詢分區(qū)策略:生產(chǎn)者可以使用輪詢策略將消息依次寫入每個(gè)分區(qū),實(shí)現(xiàn)負(fù)載均衡。在每次發(fā)送消息時(shí),生產(chǎn)者會按照輪詢的方式選擇下一個(gè)可用的分區(qū),并將消息寫入該分區(qū)。這樣可以確保消息均勻地分布在各個(gè)分區(qū)中。

  2. 隨機(jī)分區(qū)策略:Kafka生產(chǎn)者隨機(jī)的將消息寫入分區(qū),有可能會造成消息的分布不均,所以這個(gè)策略基本上也很少用。

  3. 按 key 分區(qū)策略:Kafka生產(chǎn)者基于消息的鍵(key)進(jìn)行哈希計(jì)算,然后將消息寫入對應(yīng)的分區(qū)。這種策略可以保證具有相同鍵的消息被寫入到相同的分區(qū),從而保證消息的順序性。

  4. 自定義分區(qū)策略:Kafka生產(chǎn)者可以使用自定義分區(qū)策略來決定將消息寫入哪個(gè)分區(qū)。

2、輪詢分區(qū)策略

輪詢分區(qū)的代碼如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class RoundRobinPartitioner implements Partitioner {
   
    private int currentPartition;
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化當(dāng)前分區(qū)索引
        currentPartition = 0;
    }
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         // 輪詢選擇下一個(gè)分區(qū)
        int selectedPartition = currentPartition;
        currentPartition = (currentPartition + 1) % numPartitions;
         return selectedPartition;
    }
    
    @Override
    public void close() {
        // 可選:清理資源
    }
}

partition 方法會使用一個(gè)變量 currentPartition 來記錄當(dāng)前選擇的分區(qū)索引。每次調(diào)用 partition 方法時(shí),會將 currentPartition 增加 1,并通過取模運(yùn)算來確保選擇的分區(qū)索引始終在分區(qū)數(shù)范圍內(nèi)。

要使用輪詢分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RoundRobinPartitioner");

3、隨機(jī)分區(qū)策略

隨機(jī)分區(qū)的代碼如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
import java.util.Map;
import java.util.Random;

public class RandomPartitioner implements Partitioner {
    
    private final Random random = new Random();
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }
    
    @Override
    public void close() {
   
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
     
    }
}

partition 方法會隨機(jī)選擇一個(gè)分區(qū)返回。 random.nextInt(numPartitions) 方法會生成一個(gè)小于分區(qū)數(shù)的隨機(jī)數(shù),作為分區(qū)的索引。

要使用隨機(jī)分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RandomPartitioner");

4、按 key 分區(qū)策略

按 key 分區(qū)的代碼如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
 import java.util.List;
import java.util.Map;

public class KeyPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         if (keyBytes == null) {
            // 如果 key 為 null,則使用輪詢分區(qū)策略
            return Math.abs(key.hashCode()) % numPartitions;
        } else {
            // 使用 key 的哈希碼來確定分區(qū)
            return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    
    @Override
    public void close() {
        // 可選:清理資源
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 可選:配置方法
    }
}

partition 方法會檢查 key 是否為 null。如果 key 為 null,就會使用輪詢分區(qū)策略,通過計(jì)算 key 的哈希碼并對分區(qū)數(shù)取模來確定分區(qū)。如果 key 不為 null,則使用 key 的字節(jié)數(shù)組的哈希碼來確定分區(qū)。

要使用基于 key 的分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.KeyPartitioner");

5、自定義分區(qū)策略

自定義分區(qū)的代碼如下:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
         // 自定義分區(qū)邏輯
        // 根據(jù)消息的 key 或 value 來選擇分區(qū)
        // 這里以 key 的哈希值作為分區(qū)選擇依據(jù)
        int partition = Math.abs(key.hashCode()) % numPartitions;
         return partition;
    }
    
    @Override
    public void close() {
        // 可選:清理資源
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
        // 可選:配置分區(qū)器
    }
}

partition 方法根據(jù)消息的 key 或 value 來選擇分區(qū)。這里使用 key 的哈希值進(jìn)行取模運(yùn)算,以確保選擇的分區(qū)索引在分區(qū)數(shù)范圍內(nèi)。

要使用自定義分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");

寫在最后

通過y以上這些實(shí)現(xiàn),生產(chǎn)者將根據(jù)自定義的分區(qū)策略來選擇分區(qū)來發(fā)送消息。您可以根據(jù)自己的需求,實(shí)現(xiàn)不同的分區(qū)邏輯。

???? 本文由激流原創(chuàng),原創(chuàng)不易,希望大家關(guān)注、點(diǎn)贊、收藏,給博主一點(diǎn)鼓勵(lì),感謝?。?!
??????????????????????????????????????????????????????????????
【項(xiàng)目實(shí)戰(zhàn)】Kafka 生產(chǎn)者寫入分區(qū)的策略,中間件,kafka,linq,分布式,原力計(jì)劃文章來源地址http://www.zghlxwxcb.cn/news/detail-579080.html

到了這里,關(guān)于【項(xiàng)目實(shí)戰(zhàn)】Kafka 生產(chǎn)者寫入分區(qū)的策略的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 第3、4章 Kafka 生產(chǎn)者 和 消費(fèi)者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)

    第3、4章 Kafka 生產(chǎn)者 和 消費(fèi)者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)

    重要的特性: 消息通過 隊(duì)列來進(jìn)行交換 每條消息僅會傳遞給一個(gè)消費(fèi)者 消息傳遞有先后順序,消息被消費(fèi)后從隊(duì)列刪除(除非使用了消息優(yōu)先級) 生產(chǎn)者或者消費(fèi)者可以動(dòng)態(tài)加入 傳送模型: 異步即發(fā)即棄:生產(chǎn)者發(fā)送一條消息,不會等待收到一個(gè)響應(yīng) 異步請求、應(yīng)答:

    2024年02月20日
    瀏覽(21)
  • Kafka 之生產(chǎn)者與消費(fèi)者基礎(chǔ)知識:基本配置、攔截器、序列化、分區(qū)器

    Kafka 之生產(chǎn)者與消費(fèi)者基礎(chǔ)知識:基本配置、攔截器、序列化、分區(qū)器

    kafaf集群地址列表:理論上寫一個(gè)節(jié)點(diǎn)地址,就相當(dāng)于綁定了整個(gè)kafka集群了,但是建議多寫幾個(gè),如果只寫一個(gè),萬一宕機(jī)就麻煩了 kafka消息的key和value要指定序列化方法 kafka對應(yīng)的生產(chǎn)者id 使用java代碼表示則為以下代碼: ?可使用?retries 參數(shù) 進(jìn)行設(shè)置,同時(shí)要注意記住兩

    2024年02月05日
    瀏覽(30)
  • Springboot最簡單的實(shí)戰(zhàn)介紹 整合kafka-生產(chǎn)者與消費(fèi)者(消息推送與訂閱獲取)

    Springboot最簡單的實(shí)戰(zhàn)介紹 整合kafka-生產(chǎn)者與消費(fèi)者(消息推送與訂閱獲取)

    #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生產(chǎn)者 ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.ap

    2024年04月09日
    瀏覽(26)
  • 三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生產(chǎn)者的 大體過程 初始化過程中會新建很多對象,本文暫先分享部分對象 1.分區(qū)器---Partitioner partitioner 2.重試時(shí)間---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.攔截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    瀏覽(37)
  • C# 快速寫入日志 不卡線程 生產(chǎn)者 消費(fèi)者模式

    C# 快速寫入日志 不卡線程 生產(chǎn)者 消費(fèi)者模式

    有這樣一種場景需求,就是某個(gè)方法,對耗時(shí)要求很高,但是又要記錄日志到數(shù)據(jù)庫便于分析,由于訪問數(shù)據(jù)庫基本都要幾十毫秒,可在方法里寫入BlockingCollection,由另外的線程寫入數(shù)據(jù)庫。 可以看到,在我的機(jī)子上面,1ms寫入了43條日志。

    2024年02月15日
    瀏覽(23)
  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_(dá)原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認(rèn)配置,以及配置的參數(shù),開啟網(wǎng)絡(luò)線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進(jìn)行消息key, value序列化 ???? 4. 進(jìn)行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • Kafka-生產(chǎn)者

    Kafka-生產(chǎn)者

    Kafka在實(shí)際應(yīng)用中,經(jīng)常被用作高性能、可擴(kuò)展的消息中間件。 Kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。 在實(shí)踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復(fù)勞動(dòng),提高開發(fā)效率,也

    2024年01月20日
    瀏覽(24)
  • (三)Kafka 生產(chǎn)者

    (三)Kafka 生產(chǎn)者

    創(chuàng)建一個(gè) ProducerRecord 對象,需要包含目標(biāo)主題和要發(fā)送的內(nèi)容,還可以指定鍵、分區(qū)、時(shí)間戳或標(biāo)頭。 在發(fā)送 ProducerRecord 對象時(shí),生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基

    2024年02月09日
    瀏覽(21)
  • 三、Kafka生產(chǎn)者

    三、Kafka生產(chǎn)者

    1 發(fā)送原理 在消息發(fā)送的過程中,涉及到了兩個(gè)線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker 【RecordAccumulator緩沖的結(jié)構(gòu): 每一個(gè)分區(qū)對應(yīng)一

    2024年02月12日
    瀏覽(21)
  • Kafka(生產(chǎn)者)

    Kafka(生產(chǎn)者)

    目 前 企 業(yè) 中 比 較 常 見 的 消 息 隊(duì) 列 產(chǎn) 品 主 要 有 Kafka(在大數(shù)據(jù)場景主要采用 Kafka 作為消息隊(duì)列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場景 傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場景包括: 緩存/消峰 、 解耦 和 異步通信 。 緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過

    2024年02月11日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包