概述
本文主要是分享Kafka初始化生產(chǎn)者的大體過程
初始化過程中會新建很多對象,本文暫先分享部分對象
1.分區(qū)器---Partitioner partitioner
2.重試時間---long retryBackoffMs
3.序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer
4.攔截器---List<ProducerInterceptor<K, V>> interceptorList
5.累加器---RecordAccumulator accumulator
6.元數(shù)據(jù)---ProducerMetadata metadata
7.創(chuàng)建sender線程---Sender sender
生產(chǎn)者初始化代碼示例:
后續(xù)分享這些配置會被用到什么地方,生成上述哪些對象
// 這是構(gòu)建kafka生產(chǎn)者的[示例代碼]
// 設(shè)置屬性
Properties properties = new Properties();
// 指定連接的kafka服務(wù)器的地址,配置多臺的服務(wù) 用,分割, 其中一個宕機,生產(chǎn)者 依然可以連上(集群)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[kafka server ip]:[kafka server port]");
// 1.分區(qū)器---Partitioner partitioner
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
// 2.重試時間---long retryBackoffMs
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10L);
// 3.key和value的序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 4.攔截器---List<ProducerInterceptor<K, V>> interceptorList
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyInterceptor.class);
// 構(gòu)建kafka生產(chǎn)者對象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
一、分區(qū)器
對應(yīng)初始化時設(shè)置的參數(shù):
ProducerConfig.PARTITIONER_CLASS_CONFIG
示意:
分區(qū)器是在發(fā)送消息時用來計算消息將要發(fā)送到哪個分區(qū)的,支持自定義分區(qū)器
// 這是kafka client 初始化生產(chǎn)者的[源碼]
// 如果沒有設(shè)置自定義分區(qū)器,則partitioner為null,會影響到后續(xù)初始化邏輯以及發(fā)送消息時的邏輯
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
自定義分區(qū)器示例代碼
如需使用自定義分區(qū)器,需要考慮好分區(qū)負載問題,切勿為了解決需求盲目使用自定義分區(qū);
分區(qū)不合理可能影響broker性能,也是對低負載分區(qū)資源的浪費,嚴重情況下某一分區(qū)的消費者負載過大,或某一分區(qū)broker負載過大,可能導(dǎo)致雪崩;
// 這是自定義分區(qū)器的[示例代碼]
public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int num = partitionInfos.size();
// 與[org.apache.kafka.clients.producer.internals.DefaultPartitioner]
// 計算分區(qū)時一樣的算法
int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;
return parId;
}
public void close() {//do nothing}
public void configure(Map<String, ?> configs) {//do nothing}
}
分區(qū)器使用
1.用戶可以通過實現(xiàn)該接口自定義分區(qū)器,在生產(chǎn)者調(diào)用send方法發(fā)送消息時,會使用用戶自定義的分區(qū)器計算消息要發(fā)送到哪個分區(qū),自定義分區(qū)器簡單實現(xiàn)見上面代碼塊
org.apache.kafka.clients.producer.KafkaProducer#partition
2.在生產(chǎn)者調(diào)用send方法發(fā)送消息時,如果使用用戶自定義的分區(qū)器,允許在第一次將消息放入本地緩存失敗時,進行一次嘗試:重現(xiàn)分配分區(qū)和本地緩存
// 這是kafka生產(chǎn)者調(diào)用send方法發(fā)送消息時的部分[源碼]
// check if we have an in-progress batch
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// 這是第一次就將消息添加到緩存后返回
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
// 這是第一次嘗試將消息放入本地緩存失敗后,判斷如果使用戶自定義的分區(qū)器,則返回一個對象
// 該對象將在append方法的調(diào)用處進行重新計算分區(qū)并重試一次
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}
該對象還有別的使用場景暫不介紹
二、重試時間
對應(yīng)初始化時設(shè)置的參數(shù)
ProducerConfig.RETRY_BACKOFF_MS_CONFIG
示意:
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
嘗試重試對給定主題分區(qū)的失敗請求之前等待的時間。這避免了在某些失敗場景下以緊密循環(huán)的方式重復(fù)發(fā)送請求。
重試時間的使用
1.初始化累加器時作為入?yún)?,保存到累加器字段[long retryBackoffMs]中,用于sender線程發(fā)送消息時檢測重試超時
2.初始化元信息對象[org.apache.kafka.clients.Metadata]時,放入字段[long refreshBackoffMs]中,用于更新元信息前判斷等待時間
總結(jié)該字段主要用于向服務(wù)器循環(huán)發(fā)送請求時停頓等待
三、序列化器
對應(yīng)初始化時設(shè)置的參數(shù)
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
示意:
Serializer class for key that implements the org.apache.kafka.common.serialization.Serializerinterface.
實現(xiàn)org.apache.kafka.common.serialization.Serializer接口的key序列化程序類。
Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.實現(xiàn)org.apache.kafka.commun.serialization.Serializer接口的值的序列化程序類。
自定義序列化器
// 序列化對象
public class UserSerializer implements Serializer<User> {
public void configure(Map<String, ?> configs, boolean isKey) {
//do nothing
}
public byte[] serialize(String topic, User data) {
try {
byte[] name;
int nameSize;
if (data == null) {
return null;
}
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
//字符串的長度
nameSize = data.getName().length();
} else {
name = new byte[0];
nameSize = 0;
}
/*id的長度4個字節(jié),字符串的長度描述4個字節(jié),
字符串本身的長度nameSize個字節(jié)*/
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize);
buffer.putInt(data.getId());//4
buffer.putInt(nameSize);//4
buffer.put(name);//nameSize
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error serialize User:" + e);
}
}
public void close() {
//do nothing
}
}
// 反序列化
public class UserDeserializer implements Deserializer<User> {
public void configure(Map<String, ?> configs, boolean isKey) {
//do nothing
}
public User deserialize(String topic, byte[] data) {
try {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("Error data size.");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int id;
String name;
int nameSize;
id = buffer.getInt();
nameSize = buffer.getInt();
byte[] nameByte = new byte[nameSize];
buffer.get(nameByte);
name = new String(nameByte, "UTF-8");
return new User(id, name);
} catch (Exception e) {
throw new SerializationException("Error Deserializer DemoUser." + e);
}
}
public void close() {
//do nothing
}
}
// 消息中的實體類
public class User {
private int id;
private String name;
public User(int id) {
this.id = id;
}
public User(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
使用自定義序列化器時需要在消費者消費消息時使用反序列化器將消息反序列化,一般常用的就是字符串序列化器
org.apache.kafka.common.serialization.StringSerializer
四、攔截器
對應(yīng)初始化時設(shè)置的參數(shù)
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
示意:
Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors
通過實現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,您可以在生產(chǎn)者接收到的記錄發(fā)布到kafka集群之前攔截這些記錄。默認情況下,沒有攔截器
自定義序攔截器
// 這是自定義序列化器[示例代碼]
public class MyInterceptor implements ProducerInterceptor<String, String> {
private long successCount = 0L;
private long errorCount = 0L;
//該方法:Producer在將消息序列化和分配分區(qū)之前會調(diào)用攔截器的這個方法來對消息進行相應(yīng)的操作
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
//要把發(fā)送的value都帶上時間戳
return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers());
}
//發(fā)送消息情況統(tǒng)計
//該方法:會在消息從RecordAccumulator成功發(fā)送到Kafka Broker之后,或者在發(fā)送過程中失敗時調(diào)用。并且通常都是在producer回調(diào)邏輯觸發(fā)之前
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCount++;
} else {
errorCount++;
}
}
//該方法:可以關(guān)閉攔截器,主要用于執(zhí)行一些資源清理工作
@Override
public void close() {
//producer發(fā)送數(shù)據(jù)結(jié)束并close后,會自動調(diào)用攔截器的close方法來輸出統(tǒng)計的成功和失敗次數(shù)
System.out.println("成功次數(shù)=" + successCount);
System.out.println("失敗次數(shù)=" + errorCount);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
五、累加器
這是Kafka中非常重要的內(nèi)部組件,主要用于緩存消息以便批量發(fā)送,從而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗并提升性能。
簡單概述:
對于生產(chǎn)者的作用是使用累加器,可以讓生產(chǎn)者不必每次發(fā)送消息就即刻推送到broker,可以將一個topic的同一分區(qū)消息寫入同一份緩存,等待sender線程批量獲取這批消息一次性發(fā)送到broker。減少生產(chǎn)者發(fā)起網(wǎng)絡(luò)調(diào)用的次數(shù)。
對broker而言的作用是,broker也可以一次性將接收到的這一批多個消息以順序IO的方式追加到文件中,提高了儲存效率。
示意:
This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
this behavior is explicitly disabled.此類充當(dāng)一個隊列,將記錄累積到要發(fā)送到服務(wù)器的MemoryRecords實例中。
累加器使用一定量的內(nèi)存,當(dāng)內(nèi)存耗盡時,追加調(diào)用將被阻塞,除非此行為被明確禁用。
累加器核心作用
1.消息緩存:RecordAccumulator實際上是在客戶端開辟出的一塊內(nèi)存區(qū)域,主要用來緩存消息。這種緩存機制允許Sender線程后續(xù)批量發(fā)送消息,而不是每調(diào)用一次send方法就直接將消息發(fā)送給broker。
2.批量發(fā)送:當(dāng)觸發(fā)發(fā)送條件如MemoryRecords緩存或者Deque隊列滿了或者一個隊列等待時間達到配置時間,sender線程將一次性將這批消息發(fā)送給broker。這種方式可以減少網(wǎng)絡(luò)請求的數(shù)量,提高系統(tǒng)的吞吐量。這些觸發(fā)條件可參考下面參數(shù)配置的buffer.memory,batch.size,linger.ms,max.block.ms
3.資源管理:如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,那么累加器中空間不足的話,就會導(dǎo)致生產(chǎn)者無法繼續(xù)發(fā)送消息。在這種情況下,生產(chǎn)者可以通過設(shè)置max.block.ms參數(shù)來控制是否阻塞或者拋出異常。如果max.block.ms參數(shù)的默認值為60000(即60秒),那么超過這個時間限制后,如果累加器仍然沒有足夠的空間,生產(chǎn)者將無法繼續(xù)發(fā)送消息。
關(guān)于累加器的實現(xiàn),涉及到內(nèi)部緩存管理,broker服務(wù)器元數(shù)據(jù)統(tǒng)計,sender線程交互等邏輯,后續(xù)再進行分享吧
如下這些參數(shù)都是有關(guān)累加器的重要配置,直接影響kafka生產(chǎn)者發(fā)送消息的性能
配置 | 解釋 | 默認值 |
buffer.memory | Producer 用來緩沖等待被發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)。如果記錄發(fā)送的速度比發(fā)送到服務(wù)器的速度快, Producer 就會阻塞,如果阻塞的時間超過?max.block.ms ?配置的時長,則會拋出一個異常。 這個配置與 Producer 的可用總內(nèi)存有一定的對應(yīng)關(guān)系,但并不是完全等價的關(guān)系,因為 Producer 的可用內(nèi)存并不是全部都用來緩存。一些額外的內(nèi)存可能會用于壓縮(如果啟用了壓縮),以及維護正在運行的請求。 |
33554432 |
batch.size | 當(dāng)將多個記錄被發(fā)送到同一個分區(qū)時, Producer 將嘗試將記錄組合到更少的請求中。這有助于提升客戶端和服務(wù)器端的性能。這個配置控制一個批次的默認大?。ㄒ宰止?jié)為單位)。 當(dāng)記錄的大小超過了配置的字節(jié)數(shù), Producer 將不再嘗試往批次增加記錄。 發(fā)送到 broker 的請求會包含多個批次的數(shù)據(jù),每個批次對應(yīng)一個 partition 的可用數(shù)據(jù) 小的 batch.size 將減少批處理,并且可能會降低吞吐量(如果 batch.size = 0的話將完全禁用批處理)。 很大的 batch.size 可能造成內(nèi)存浪費,因為我們一般會在 batch.size 的基礎(chǔ)上分配一部分緩存以應(yīng)付額外的記錄。 |
16384 |
linger.ms | producer 會將兩個請求發(fā)送時間間隔內(nèi)到達的記錄合并到一個單獨的批處理請求中。通常只有當(dāng)記錄到達的速度超過了發(fā)送的速度時才會出現(xiàn)這種情況。然而,在某些場景下,即使處于可接受的負載下,客戶端也希望能減少請求的數(shù)量。這個設(shè)置是通過添加少量的人為延遲來實現(xiàn)的;即,與其立即發(fā)送記錄, producer 將等待給定的延遲時間,以便將在等待過程中到達的其他記錄能合并到本批次的處理中。這可以認為是與 TCP 中的 Nagle 算法類似。這個設(shè)置為批處理的延遲提供了上限:一旦我們接受到記錄超過了分區(qū)的 batch.size ,Producer 會忽略這個參數(shù),立刻發(fā)送數(shù)據(jù)。但是如果累積的字節(jié)數(shù)少于 batch.size ,那么我們將在指定的時間內(nèi)“逗留”(linger),以等待更多的記錄出現(xiàn)。這個設(shè)置默認為0(即沒有延遲)。例如:如果設(shè)置linger.ms=5 ?,則發(fā)送的請求會減少并降低部分負載,但同時會增加5毫秒的延遲。 |
0 |
max.block.ms | 該配置控制?KafkaProducer.send() 和KafkaProducer.partitionsFor() ?允許被阻塞的時長。這些方法可能因為緩沖區(qū)滿了或者元數(shù)據(jù)不可用而被阻塞。用戶提供的序列化程序或分區(qū)程序的阻塞將不會被計算到這個超時。 |
六、初始化元數(shù)據(jù)
對應(yīng)初始化時設(shè)置的參數(shù)
ProducerConfig.METADATA_MAX_AGE_CONFIG
示意:
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
以毫秒為單位的時間段,在此之后,即使我們沒有看到任何主分區(qū)的變化,我們也會強制刷新元數(shù)據(jù),以主動發(fā)現(xiàn)任何新的代理或分區(qū)
---元數(shù)據(jù)刷新時間
ProducerConfig.METADATA_MAX_IDLE_CONFIG
示意:
Controls how long the producer will cache metadata for a topic that's idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic's metadata is forgotten and the next access to it will force a metadata fetch request.
控制生產(chǎn)者為空閑主題緩存元數(shù)據(jù)的時間。如果自上次生成主題以來經(jīng)過的時間超過了元數(shù)據(jù)空閑持續(xù)時間,則該主題的元數(shù)據(jù)將被遺忘,下一次對其的訪問將強制執(zhí)行元數(shù)據(jù)獲取請求。
---生產(chǎn)者客戶端為[某一主題]緩存元數(shù)據(jù)的時間,超過該時間后獲取該主題元數(shù)據(jù)將強制從broker獲取
初始化元數(shù)據(jù)信息分為兩部分,第一部分初始化ProducerMetadata對象,設(shè)置元數(shù)據(jù),topic信息緩存空閑時間如下源代碼:
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
第二部分為加載broker節(jié)點信息Node,源代碼如下:
Node:org.apache.kafka.common.Node
this.metadata.bootstrap(addresses);
...
public synchronized void bootstrap(List<InetSocketAddress> addresses) {
this.needFullUpdate = true;
this.updateVersion += 1;
this.cache = MetadataCache.bootstrap(addresses);
}
...
static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
Map<Integer, Node> nodes = new HashMap<>();
int nodeId = -1;
for (InetSocketAddress address : addresses) {
nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort()));
nodeId--;
}
return new MetadataCache(null, nodes, Collections.emptyList(),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
null, Collections.emptyMap(), Cluster.bootstrap(addresses));
}
上面只是初始化元數(shù)據(jù)最外層的代碼,bootstrap是kafka生產(chǎn)者客戶端初始化broker信息緩存的入口,執(zhí)行這個方法后這個客戶端將緩存kakfa broker的節(jié)點信息,每個節(jié)點的topic信息,分片信息等緩存。在客戶端發(fā)送消息時,將通過這些緩存信息給broker發(fā)起請求,所以這塊緩存是生產(chǎn)者客戶端非常重要的部分,詳情請看:三、Kafka生產(chǎn)者4---核心組件[元數(shù)據(jù)]-Metadata-CSDN博客
七、創(chuàng)建sender線程
是一個無限循環(huán)運行在后臺的線程,會一直等待累加器中緩存的消息達到發(fā)送條件,把消息發(fā)送給Broker
發(fā)送的核心流程是:
1.從累加器中批量獲取消息并創(chuàng)建?ClientRequest對象
2.將ClientRequest對象交給NetworkClient客戶端發(fā)送
3.NetworkClient客戶端將請求放入KafkaChannel的緩存
4.NetworkClient執(zhí)行網(wǎng)絡(luò) I/O,完成請求的發(fā)送
5.NetworkClient收到響應(yīng),調(diào)用 ClientRequest 的回調(diào)函數(shù),觸發(fā)每個消息上注冊的回調(diào)函數(shù)
總結(jié):
本文大致介紹初始化kafka生產(chǎn)者的核心邏輯,創(chuàng)建的各類后續(xù)用于發(fā)送消息的對象,線程,配置信息等;
這些配置將直接影響生產(chǎn)者發(fā)送消息的性能和可靠性,如果需要在復(fù)雜應(yīng)用場景下追求兩者的平衡,需要對這些參數(shù)有深刻認識并調(diào)試驗證后上線!
kafka生產(chǎn)者配置參數(shù)中文網(wǎng)站:3. 配置 - 【布客】kafka 中文翻譯文章來源:http://www.zghlxwxcb.cn/news/detail-839800.html
kafka生產(chǎn)者配置參數(shù)英文網(wǎng)站:https://kafka.apache.org/documentation/#producerconfigs文章來源地址http://www.zghlxwxcb.cn/news/detail-839800.html
到了這里,關(guān)于三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!