消息發(fā)送
消息生產(chǎn)流程
整個(gè)流程如下:
- Producer創(chuàng)建時(shí),會(huì)創(chuàng)建一個(gè)Sender線程并設(shè)置為守護(hù)線程。
- 生產(chǎn)消息時(shí),內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建)。
- 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者linger.ms達(dá)到上限,哪個(gè)先達(dá)到就算哪個(gè)。
- 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失敗原因允許重試,那么客戶端內(nèi)部會(huì)對(duì)該消息進(jìn)行重試。
- 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者。
- 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調(diào)返回。
ProducerRecord
在生產(chǎn)發(fā)送消息前,會(huì)將信息封裝成ProducerRecord對(duì)象。主要由以下幾部分組成:
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
其中主要有要發(fā)送的Topic名稱,要發(fā)送至那個(gè)分區(qū),以及要發(fā)送的數(shù)據(jù)和key。
其他的都比較好理解,key的作用是如果key存在的話,就會(huì)對(duì)key進(jìn)行hash,然后根據(jù)不同的結(jié)果發(fā)送至不同的分區(qū),這樣當(dāng)有相同的key時(shí),所有相同的key都會(huì)發(fā)送到同一個(gè)分區(qū),我們之前也提到,所有的新消息都會(huì)被添加到分區(qū)的尾部,進(jìn)而保證了數(shù)據(jù)的順序性。
序列化器
由于Kafka中的數(shù)據(jù)都是字節(jié)數(shù)組,在將消息發(fā)送到Kafka之前需要先將數(shù)據(jù)序列化為字節(jié)數(shù)組。
序列化器的作用就是用于序列化要發(fā)送的消息的。
Kafka使用org.apache.kafka.common.serialization.Serializer
接口用于定義序列化器,將泛型指定類型的數(shù)據(jù)轉(zhuǎn)換為字節(jié)數(shù)組,如下:
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}
byte[] serialize(String var1, T var2);
default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}
default void close() {
}
}
其中kafka提供了許多實(shí)現(xiàn)類。
除了上述提供的,還可以自定義序列化器,只要實(shí)現(xiàn)Serializer接口即可。
假如我們有如下實(shí)體類:
public class User {
private Integer userId;
private String username;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}
序列化類:
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
}
@Override
public byte[] serialize(String topic, User data) {
try {
// 如果數(shù)據(jù)是null,則返回null
if (data == null) return null;
Integer userId = data.getUserId();
String username = data.getUsername();
int length = 0;
byte[] bytes = null;
if (null != username) {
bytes = username.getBytes("utf-8");
length = bytes.length;
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
buffer.putInt(userId);
buffer.putInt(length);
buffer.put(bytes);
return buffer.array();
} catch (UnsupportedEncodingException e) {
throw new SerializationException("序列化數(shù)據(jù)異常");
}
}
@Override
public void close() {
// do nothing
}
}
分區(qū)器
分區(qū)器來計(jì)算消息該發(fā)送到哪個(gè)分區(qū)中。
默認(rèn)(DefaultPartitioner)分區(qū)計(jì)算:
- 如果record提供了分區(qū)號(hào),則使用record提供的分區(qū)號(hào)
- 如果record沒有提供分區(qū)號(hào),則使用key的序列化后的值的hash值對(duì)分區(qū)數(shù)量取模
- 如果record沒有提供分區(qū)號(hào),也沒有提供key,則使用輪詢的方式分配分區(qū)號(hào)。
也可以自定義分區(qū)器,需要
- 首先開發(fā)Partitioner接口的實(shí)現(xiàn)類
- 在KafkaProducer中進(jìn)行設(shè)置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)
攔截器
Producer攔截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于實(shí)現(xiàn)Client端的定制化控制邏輯。
對(duì)于Producer而言,Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。同時(shí),Producer允許用戶指定多個(gè)Interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。
注意攔截器發(fā)生異常拋出的異常會(huì)被忽略;此外,也要注意一種情況:如果某個(gè)攔截器依賴上一個(gè)攔截器的結(jié)果,但是當(dāng)上一個(gè)攔截器異常,則該攔截器可能也不會(huì)正常工作,因?yàn)樗邮艿降氖巧弦粋€(gè)成功返回的結(jié)果,而不是期望的上一個(gè)攔截器結(jié)果。
Intercetpor的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括:
- onSend(ProducerRecord):該方法封裝進(jìn)KafkaProducer.send方法中,即運(yùn)行在用戶主線程中。Producer確保在消息被序列化以計(jì)算分區(qū)前調(diào)用該方法。用戶可以在該方法中對(duì)消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會(huì)影響目標(biāo)分區(qū)的計(jì)算。
- onAcknowledgement(RecordMetadata, Exception):該方法會(huì)在消息被應(yīng)答之前或消息發(fā)送失敗時(shí)調(diào)用,并且通常都是在Producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運(yùn)行在Producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會(huì)拖慢Producer的消息發(fā)送效率。
- close:關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。
如前所述,Interceptor可能被運(yùn)行在多個(gè)線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全。另外倘若指定了多個(gè)Interceptor,則Producer將按照指定順序調(diào)用它們。
自定義攔截器步驟:
- 實(shí)現(xiàn)ProducerInterceptor接口
- 在KafkaProducer的設(shè)置中設(shè)置自定義的攔截器
案例:
消息實(shí)體類:
public class User {
private Integer userId;
private String username;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}
還是用上面的序列化器。
自定義攔截器1:
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorOne implements ProducerInterceptor<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("攔截器1 -- go");
// 消息發(fā)送的時(shí)候,經(jīng)過攔截器,調(diào)用該方法
// 要發(fā)送的消息內(nèi)容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 攔截器攔下來之后根據(jù)原來消息創(chuàng)建的新的消息
// 此處對(duì)原消息沒有做任何改動(dòng)
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 傳遞新的消息
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("攔截器1 -- back");
// 消息確認(rèn)或異常的時(shí)候,調(diào)用該方法,該方法中不應(yīng)實(shí)現(xiàn)較重的任務(wù)
// 會(huì)影響kafka生產(chǎn)者的性能。
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}
自定義攔截器2:
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorTwo implements ProducerInterceptor<Integer, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
System.out.println("攔截器2 -- go");
// 消息發(fā)送的時(shí)候,經(jīng)過攔截器,調(diào)用該方法
// 要發(fā)送的消息內(nèi)容
final String topic = record.topic();
final Integer partition = record.partition();
final Integer key = record.key();
final String value = record.value();
final Long timestamp = record.timestamp();
final Headers headers = record.headers();
// 攔截器攔下來之后根據(jù)原來消息創(chuàng)建的新的消息
// 此處對(duì)原消息沒有做任何改動(dòng)
ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
topic,
partition,
timestamp,
key,
value,
headers
);
// 傳遞新的消息
return newRecord;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("攔截器2 -- back");
// 消息確認(rèn)或異常的時(shí)候,調(diào)用該方法,該方法中不應(yīng)實(shí)現(xiàn)較重的任務(wù)
// 會(huì)影響kafka生產(chǎn)者的性能。
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
final Object classContent = configs.get("classContent");
System.out.println(classContent);
}
}
正常那個(gè)運(yùn)行會(huì)打印:
攔截器1 -- go
攔截器2 -- go
攔截器1 -- back
攔截器2 -- back
即發(fā)送和回調(diào)都是按照鏈的順序正序來的。
生產(chǎn)者原理剖析
kafka整個(gè)消息發(fā)送流程如下圖:
生產(chǎn)消息的所有工作分給了2個(gè)線程協(xié)作完成:一個(gè)是主線程(負(fù)責(zé)消息的預(yù)處理),第二個(gè)是發(fā)送線程(sender線程負(fù)責(zé)將發(fā)送消息以及接受發(fā)送的結(jié)果)。
主線程
主線程:負(fù)責(zé)消息創(chuàng)建,攔截器,序列化器,分區(qū)器等操作,并將消息追加到消息收集器消息累加器(RecoderAccumulator)中。
消息累加器
參考:https://mp.weixin.qq.com/s/n8iCJWA13Xz-haZnB8zYFg
消息累加器的作用是緩存消息以便sender線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗提升性能。
消息累加器的緩存大小可以通過生產(chǎn)者參數(shù) buffer.memory
配置,默認(rèn)為32MB。
若主線程發(fā)送消息的速度超過sender線程發(fā)送消息的速度,會(huì)導(dǎo)致消息累加器被填滿,這時(shí)候再調(diào)用生產(chǎn)者客戶端的send方法會(huì)被阻塞,若阻塞超過60秒(由參數(shù)max.block.ms
控制),則會(huì)拋出異常 BufferExhaustedException 。
消息收集器RecoderAccumulator
為每個(gè)分區(qū)都維護(hù)了一個(gè)Deque<ProducerBatch>
類型的雙端隊(duì)列。
ProducerBatch 可以理解為是 ProducerRecord 的集合,批量發(fā)送有利于提升吞吐量,降低網(wǎng)絡(luò)影響。
主線程發(fā)送的數(shù)據(jù)由這樣的結(jié)構(gòu)保存:首先按照 Topic 進(jìn)行劃分,每個(gè) Topic 會(huì)有一個(gè) Map(key:Topic,value:TopicInfo);之后,按照分區(qū)進(jìn)行劃分,TopicInfo 里也有一個(gè) Map(key:分區(qū)號(hào),value:Deque<ProducerBatch>),每個(gè)雙端隊(duì)列會(huì)保存多個(gè)消息批次。當(dāng)有消息發(fā)送時(shí),會(huì)從對(duì)應(yīng) Topic 、對(duì)應(yīng)分區(qū)的雙端隊(duì)列的尾部取出一個(gè)批次,判斷是否可以將消息追加到后面。這種結(jié)構(gòu)的目的在于:
- 使用字節(jié)將更加緊湊,節(jié)約空間
- 多個(gè)小的消息組成一個(gè)批次一起發(fā)送,減少網(wǎng)絡(luò)請求次數(shù)提升吞吐量。因?yàn)?sender 線程發(fā)送消息的基本單位也是批次,它會(huì)從雙端隊(duì)列的頭部取數(shù)據(jù)發(fā)送。
ProducerBatch 的大小與 batch.size
參數(shù)(默認(rèn)16KB)密切相關(guān)。此外,Kafka 生產(chǎn)者使用 BufferPool
實(shí)現(xiàn) ByteBuffer 從而實(shí)現(xiàn)對(duì)內(nèi)存的復(fù)用。該緩存池只針對(duì)特定大?。?batch.size指定)的 ByteBuffer進(jìn)行管理,對(duì)于消息過大的緩存,不能做到重復(fù)利用。
消息累加器的基本結(jié)構(gòu)如下圖所示,紅色+綠色區(qū)域總大小32MB,一個(gè)池中單位 ByteBuffer 大小16KB。
假設(shè)剛啟動(dòng)新插入一條消息,對(duì)應(yīng)的 Topic 、對(duì)應(yīng)的 Deque<ProducerBatch> 為空,這時(shí)執(zhí)行allocate
方法嘗試開辟空間,方法主要過程如下:
- 如果申請空間的大小大于最大空間(
buffer.memory
默認(rèn)32MB),則直接拋出異常; - 操作緩存池之前嘗試獲取可重入鎖,若獲取的空間(size)正好等于每個(gè)批次預(yù)設(shè)大小(
batch.size
默認(rèn)16KB),則直接從Deque<ByteBuffer>
中取出第一個(gè) ByteBuffer 返回; - 若獲取的空間(size)大于批次預(yù)設(shè)大小,計(jì)算剩余的空閑空間,即池中空閑空間+池外空閑空間(nonPooledAvailableMemory)。如果剩余的空閑空間大于size,則進(jìn)行第4步;如果小于size,則進(jìn)行第5步;
- 直接使用池外空閑空間分配,若不夠再取池內(nèi)空閑空間,最后返回。
- 將當(dāng)前線程加入到等待隊(duì)列(waiters)的尾部,如果等待超時(shí)也沒有足夠的空間,則拋出異常;若中途被喚醒,則進(jìn)行下一步;
- 中途喚醒后有兩種情況,當(dāng)釋放的空間正好等于一個(gè)批次大小且自己沒有累計(jì)獲得空間,則獲取后返回;否則累計(jì)獲取釋放空間,滿足后才會(huì)返回。
發(fā)送線程
sender 線程從消息累加器獲取準(zhǔn)備好可以發(fā)送消息(等待時(shí)候是否超過linger.ms
參數(shù)設(shè)置的時(shí)間、或批次個(gè)數(shù)大于1或第一個(gè)批次已滿)后,就可以準(zhǔn)備消費(fèi)消息了:
- 遍歷每個(gè) Topic 下的分區(qū)批次,根據(jù)分區(qū) leader,將其處理為
<Node, List<ProducerBatch>
的形式, Node 表示集群的broker節(jié)點(diǎn)。 - 進(jìn)一步將
<Node, List<ProducerBatch>
轉(zhuǎn)化為<Node, Request>
形式,此時(shí)才可以向服務(wù)端發(fā)送數(shù)據(jù)。 - 在發(fā)送之前,Sender線程將消息以
Map<NodeId, Deque<Request>>
的形式保存到InFlightRequests
中進(jìn)行緩存,可以通過其獲取 leastLoadedNode ,即當(dāng)前Node中負(fù)載壓力最小的一個(gè),以實(shí)現(xiàn)消息的盡快發(fā)出。保存到InFlightRequests
中的目的是緩存已經(jīng)發(fā)出去但沒收到響應(yīng)的請求,NodeId 對(duì)應(yīng)一個(gè) broker 節(jié)點(diǎn) id ,也就是一個(gè)連接,每個(gè)連接最多堆積的未完成請求為5個(gè)(max.in.flight.requests.per.connection
參數(shù)配置)。
生產(chǎn)者參數(shù)
除了前面說過的必要的5個(gè)參數(shù),如下:
-
bootstrap.servers
:配置生產(chǎn)者如何與broker建立連接。該參數(shù)設(shè)置的是初始化參數(shù)。如果生產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個(gè)部分broker的地址,而不是全部,當(dāng)生產(chǎn)者連接上此處指定的broker之后,在通過該連接發(fā)現(xiàn)集群中的其他節(jié)點(diǎn)。 -
key.serializer
:要發(fā)送信息的key數(shù)據(jù)的序列化類。設(shè)置的時(shí)候可以寫類名,也可以使用該類的Class對(duì)象。 -
value.serializer
:要發(fā)送消息的value數(shù)據(jù)的序列化類。設(shè)置的時(shí)候可以寫類名,也可以使用該類的Class對(duì)象。 -
acks
:默認(rèn)值:all。- acks=0:生產(chǎn)者不等待broker對(duì)消息的確認(rèn),只要將消息放到緩沖區(qū),就認(rèn)為消息已經(jīng)發(fā)送完成。該情形不能保證broker是否真的收到了消息,retries配置也不會(huì)生效。發(fā)送的消息的返回的消息偏移量永遠(yuǎn)是-1。
- acks=1:表示消息只需要寫到主分區(qū)即可,然后就響應(yīng)客戶端,而不等待副本分區(qū)的確認(rèn)。在該情形下,如果主分區(qū)收到消息確認(rèn)之后就宕機(jī)了,而副本分區(qū)還沒來得及同步該消息,則該消息丟失。
- acks=all:leader分區(qū)會(huì)等待所有的ISR副本分區(qū)確認(rèn)記錄。該處理保證了只要有一個(gè)ISR副本分區(qū)存活,消息就不會(huì)丟失。這是Kafka最強(qiáng)的可靠性保證,等效于 acks=-1。
-
retries
:retries重試次數(shù)。當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。跟客戶端收到錯(cuò)誤時(shí)重發(fā)一樣。如果設(shè)置了重試,還想保證消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
,否則在重試此失敗消息的時(shí)候,其他的消息可能發(fā)送成功了。
還有一些比較重要的參數(shù):
-
client.id
:生產(chǎn)者發(fā)送請求的時(shí)候傳遞給broker的id字符串。 用于在broker的請求日志中追蹤什么應(yīng)用發(fā)送了什么消息。 一般該id是跟業(yè)務(wù)有關(guān)的字符串。 -
retry.backoff.ms
:在向一個(gè)指定的主題分區(qū)重發(fā)消息的時(shí)候,重試之間的等待時(shí)間。 比如3次重試,每次重試之后等待該時(shí)間長度,再接著重試。在一些失敗的場 景,避免了密集循環(huán)的重新發(fā)送請求。 long型值,默認(rèn)100。 -
request.timeout.ms
:客戶端等待請求響應(yīng)的最大時(shí)長。如果服務(wù)端響應(yīng)超時(shí),則會(huì)重發(fā)請求,除非達(dá)到重試次數(shù)。該設(shè)置應(yīng)該比 replica.lag.time.max.ms 要大,以免在服務(wù)器延遲時(shí)間內(nèi)重發(fā)消息。int類型值,默認(rèn): 30000。 -
buffer.memory
:生產(chǎn)者可以用來緩存等待發(fā)送到服務(wù)器的記錄的總內(nèi)存字節(jié)。如果記錄的發(fā)送 速度超過了將記錄發(fā)送到服務(wù)器的速度,則生產(chǎn)者將阻塞 max.block.ms 的時(shí) 間,此后它將引發(fā)異常。此設(shè)置應(yīng)大致對(duì)應(yīng)于生產(chǎn)者將使用的總內(nèi)存,但并非 生產(chǎn)者使用的所有內(nèi)存都用于緩沖。一些額外的內(nèi)存將用于壓縮(如果啟用了 壓縮)以及維護(hù)運(yùn)行中的請求。long型數(shù)據(jù)。默認(rèn)值:33554432。 -
batch.size
:當(dāng)多個(gè)消息發(fā)送到同一個(gè)分區(qū)的時(shí)候,生產(chǎn)者嘗試將多個(gè)記錄作為一個(gè)批來處理。批處理提高了客戶端和服務(wù)器的處理效率。 該配置項(xiàng)以字節(jié)為單位控制默認(rèn)批的大小。 所有的批小于等于該值。 發(fā)送給broker的請求將包含多個(gè)批次,每個(gè)分區(qū)一個(gè),并包含可發(fā)送的數(shù) 據(jù)。如果該值設(shè)置的比較小,會(huì)限制吞吐量(設(shè)置為0會(huì)完全禁用批處理)。如果 設(shè)置的很大,又有一點(diǎn)浪費(fèi)內(nèi)存,因?yàn)镵afka會(huì)永遠(yuǎn)分配這么大的內(nèi)存來參與 到消息的批整合中。 -
linger.ms
:該配置設(shè)置了一個(gè)延遲,生產(chǎn)者不會(huì)立即將消息發(fā)送到broker,而是等待這么一段時(shí)間以累積消息,然 后將這段時(shí)間之內(nèi)的消息作為一個(gè)批次發(fā)送。該設(shè)置是批處理的另一個(gè)上限: 一旦批消息達(dá)到了 batch.size 指定的值,消息批會(huì)立即發(fā)送,如果積累的消 息字節(jié)數(shù)達(dá)不到 batch.size 的值,可以設(shè)置該毫秒值,等待這么長時(shí)間之 后,也會(huì)發(fā)送消息批。該屬性默認(rèn)值是0(沒有延遲)。如果設(shè)置 linger.ms=5 ,則在一個(gè)請求發(fā)送之前先等待5ms。long型值,默認(rèn):0。 -
max.request.size
:單個(gè)請求的最大字節(jié)數(shù)。該設(shè)置會(huì)限制單個(gè)請求中消息批的消息個(gè)數(shù),以免單 個(gè)請求發(fā)送太多的數(shù)據(jù)。服務(wù)器有自己的限制批大小的設(shè)置,與該配置可能不 一樣。int類型值,默認(rèn)1048576。 -
interceptor.classes
:指定在生產(chǎn)者接收到該消息,向Kafka集群傳輸之前,由序列化器處理之前,配置的攔截器對(duì)消息進(jìn)行處理。 -
partitioner.class
:實(shí)現(xiàn)了接口 org.apache.kafka.clients.producer.Partitioner 的分區(qū) 器實(shí)現(xiàn)類,默認(rèn)為DefaultPartitioner。 -
send.buffer.bytes
:TCP發(fā)送數(shù)據(jù)的時(shí)候使用的緩沖區(qū)(SO_SNDBUF)大小。如果設(shè)置為0,則使 用操作系統(tǒng)默認(rèn)的。 -
receive.buffer.bytes
:TCP接收緩存(SO_RCVBUF),如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)的值。 int類型值,默認(rèn)32768。 -
security.protocol
:跟broker通信的協(xié)議:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。string類型值,默認(rèn):PLAINTEXT。 -
max.block.ms
:控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的 時(shí)長。當(dāng)緩存滿了或元數(shù)據(jù)不可用的時(shí)候,這些方法阻塞。在用戶提供的序列化器和分區(qū)器的阻塞時(shí)間不計(jì)入。long型值,默認(rèn):60000。 -
connections.max.idle.ms
:當(dāng)連接空閑時(shí)間達(dá)到這個(gè)值,就關(guān)閉連接。long型數(shù)據(jù),默認(rèn):540000 -
max.in.flight.requests.per.connection
:單個(gè)連接上未確認(rèn)請求的最大數(shù)量。達(dá)到這個(gè)數(shù)量,客戶端阻塞。如果該值大 于1,且存在失敗的請求,在重試的時(shí)候消息順序不能保證。 int類型值,默認(rèn)5。 -
reconnect.backoff.max.ms
:對(duì)于每個(gè)連續(xù)的連接失敗,每臺(tái)主機(jī)的退避將成倍增加,直至達(dá)到此最大值。 在計(jì)算退避增量之后,添加20%的隨機(jī)抖動(dòng)以避免連接風(fēng)暴。 long型值,默認(rèn)1000。 -
reconnect.backoff.ms
:嘗試重連指定主機(jī)的基礎(chǔ)等待時(shí)間。避免了到該主機(jī)的密集重連。該退避時(shí)間 應(yīng)用于該客戶端到broker的所有連接。 long型值,默認(rèn)50。文章來源:http://www.zghlxwxcb.cn/news/detail-820370.html -
compression.type
:生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是none,也就是不壓縮。 支持的值:none、gzip、snappy和lz4。 壓縮是對(duì)于整個(gè)批來講的,所以批處理的效率也會(huì)影響到壓縮的比例。文章來源地址http://www.zghlxwxcb.cn/news/detail-820370.html
到了這里,關(guān)于【Kafka】高級(jí)特性:生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!