??博主介紹: 博主從事應(yīng)用安全和大數(shù)據(jù)領(lǐng)域,有8年研發(fā)經(jīng)驗(yàn),5年面試官經(jīng)驗(yàn),Java技術(shù)專家,WEB架構(gòu)師,阿里云專家博主,華為云云享專家,51CTO TOP紅人
Java知識圖譜點(diǎn)擊鏈接:體系化學(xué)習(xí)Java(Java面試專題)
???? 感興趣的同學(xué)可以收藏關(guān)注下 ,不然下次找不到喲????
?? 感覺對你有幫助的朋友,可以給博主一個(gè)三連,非常感謝 ??????
1、生產(chǎn)者寫入分區(qū)的策略有哪些?
生產(chǎn)者寫入分區(qū)的策略主要有以下幾種:
-
輪詢分區(qū)策略:生產(chǎn)者可以使用輪詢策略將消息依次寫入每個(gè)分區(qū),實(shí)現(xiàn)負(fù)載均衡。在每次發(fā)送消息時(shí),生產(chǎn)者會按照輪詢的方式選擇下一個(gè)可用的分區(qū),并將消息寫入該分區(qū)。這樣可以確保消息均勻地分布在各個(gè)分區(qū)中。
-
隨機(jī)分區(qū)策略:Kafka生產(chǎn)者隨機(jī)的將消息寫入分區(qū),有可能會造成消息的分布不均,所以這個(gè)策略基本上也很少用。
-
按 key 分區(qū)策略:Kafka生產(chǎn)者基于消息的鍵(key)進(jìn)行哈希計(jì)算,然后將消息寫入對應(yīng)的分區(qū)。這種策略可以保證具有相同鍵的消息被寫入到相同的分區(qū),從而保證消息的順序性。
-
自定義分區(qū)策略:Kafka生產(chǎn)者可以使用自定義分區(qū)策略來決定將消息寫入哪個(gè)分區(qū)。
2、輪詢分區(qū)策略
輪詢分區(qū)的代碼如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class RoundRobinPartitioner implements Partitioner {
private int currentPartition;
@Override
public void configure(Map<String, ?> configs) {
// 初始化當(dāng)前分區(qū)索引
currentPartition = 0;
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 輪詢選擇下一個(gè)分區(qū)
int selectedPartition = currentPartition;
currentPartition = (currentPartition + 1) % numPartitions;
return selectedPartition;
}
@Override
public void close() {
// 可選:清理資源
}
}
partition 方法會使用一個(gè)變量 currentPartition 來記錄當(dāng)前選擇的分區(qū)索引。每次調(diào)用 partition 方法時(shí),會將 currentPartition 增加 1,并通過取模運(yùn)算來確保選擇的分區(qū)索引始終在分區(qū)數(shù)范圍內(nèi)。
要使用輪詢分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RoundRobinPartitioner");
3、隨機(jī)分區(qū)策略
隨機(jī)分區(qū)的代碼如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class RandomPartitioner implements Partitioner {
private final Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return random.nextInt(numPartitions);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
partition 方法會隨機(jī)選擇一個(gè)分區(qū)返回。 random.nextInt(numPartitions) 方法會生成一個(gè)小于分區(qū)數(shù)的隨機(jī)數(shù),作為分區(qū)的索引。
要使用隨機(jī)分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.RandomPartitioner");
4、按 key 分區(qū)策略
按 key 分區(qū)的代碼如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class KeyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 如果 key 為 null,則使用輪詢分區(qū)策略
return Math.abs(key.hashCode()) % numPartitions;
} else {
// 使用 key 的哈希碼來確定分區(qū)
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
// 可選:清理資源
}
@Override
public void configure(Map<String, ?> configs) {
// 可選:配置方法
}
}
partition 方法會檢查 key 是否為 null。如果 key 為 null,就會使用輪詢分區(qū)策略,通過計(jì)算 key 的哈希碼并對分區(qū)數(shù)取模來確定分區(qū)。如果 key 不為 null,則使用 key 的字節(jié)數(shù)組的哈希碼來確定分區(qū)。
要使用基于 key 的分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.KeyPartitioner");
5、自定義分區(qū)策略
自定義分區(qū)的代碼如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 自定義分區(qū)邏輯
// 根據(jù)消息的 key 或 value 來選擇分區(qū)
// 這里以 key 的哈希值作為分區(qū)選擇依據(jù)
int partition = Math.abs(key.hashCode()) % numPartitions;
return partition;
}
@Override
public void close() {
// 可選:清理資源
}
@Override
public void configure(Map<String, ?> configs) {
// 可選:配置分區(qū)器
}
}
partition 方法根據(jù)消息的 key 或 value 來選擇分區(qū)。這里使用 key 的哈希值進(jìn)行取模運(yùn)算,以確保選擇的分區(qū)索引在分區(qū)數(shù)范圍內(nèi)。
要使用自定義分區(qū)策略,您需要在生產(chǎn)者配置中設(shè)置 partitioner.class 屬性為您自定義分區(qū)器的類名:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.example.CustomPartitioner");
寫在最后
通過y以上這些實(shí)現(xiàn),生產(chǎn)者將根據(jù)自定義的分區(qū)策略來選擇分區(qū)來發(fā)送消息。您可以根據(jù)自己的需求,實(shí)現(xiàn)不同的分區(qū)邏輯。文章來源:http://www.zghlxwxcb.cn/news/detail-579080.html
???? 本文由激流原創(chuàng),原創(chuàng)不易,希望大家關(guān)注、點(diǎn)贊、收藏,給博主一點(diǎn)鼓勵(lì),感謝?。?!
??????????????????????????????????????????????????????????????文章來源地址http://www.zghlxwxcb.cn/news/detail-579080.html
到了這里,關(guān)于【項(xiàng)目實(shí)戰(zhàn)】Kafka 生產(chǎn)者寫入分區(qū)的策略的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!