1. Kafka 發(fā)送消息的主要步驟
- 創(chuàng)建一個 ProducerRecord 對象,需要包含目標(biāo)主題和要發(fā)送的內(nèi)容,還可以指定鍵、分區(qū)、時間戳或標(biāo)頭。
- 在發(fā)送 ProducerRecord 對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。
- 如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基于 ProducerRecord 對象的鍵選擇一個分區(qū)。選好分區(qū)以后,生產(chǎn)者就知道該往哪個主題和分區(qū)發(fā)送這條消息了。
- 緊接著,該消息會被添加到一個消息批次里,這個批次里的所有消息都將被發(fā)送給同一個主題和分區(qū)。有一個獨(dú)立的線程負(fù)責(zé)把這些消息批次發(fā)送給目標(biāo) broker。
- broker 在收到這些消息時會返回一個響應(yīng)。如果消息寫入成功,就返回一個 RecordMetaData 對象,其中包含了主題和分區(qū)信息,以及消息在分區(qū)中的偏移量。如果消息寫入失敗,則會返回一個錯誤。生產(chǎn)者在收到錯誤之后會嘗試重新發(fā)送消息,重試幾次之后如果還是失敗,則會放棄重試,并返回錯誤信息。
2.創(chuàng)建 Kafka 生產(chǎn)者
- 要向 Kafka 寫入消息,首先需要創(chuàng)建一個生產(chǎn)者對象,并設(shè)置一些屬性。Kafka 生產(chǎn)者有 3
個必須設(shè)置的屬性。- bootstrap.servers:broker 的地址。由多個 host:port 組成,地址之間以逗號分隔。生產(chǎn)者用它們來建立初始的 Kafka 集群連接。它不需要包含所有的 broker 地址,因?yàn)樯a(chǎn)者在建立初始連接之后可以從給定的 broker 那里找到其他 broker 的信息。不過還是建議至少提供兩個 broker 地址,因?yàn)橐坏┢渲幸粋€停機(jī),則生產(chǎn)者仍然可以連接到集群。
- key.serializer:一個類名,用來序列化消息的鍵。broker 希望接收到的消息的鍵和值都是字節(jié)數(shù)組。生產(chǎn)者可以把任意 Java 對象作為鍵和值發(fā)送給 broker,但它需要知道如何把這些 Java 對象轉(zhuǎn)換成字節(jié)數(shù)組。key.serializer 必須被設(shè)置為一個實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口的類,生產(chǎn)者會用這個類把鍵序列化成字節(jié)數(shù)組。Kafka客戶端默認(rèn)提供了 ByteArraySerializer、StringSerializer 和 IntegerSerializer 等,如果你只使用常見的幾種 Java 對象類型,就沒有必要實(shí)現(xiàn)自己的序列化器。需要注意的是,必須設(shè)置 key.serializer 這個屬性,盡管你可能只需要將值發(fā)送給 Kafka。如果只需要發(fā)送值,則可以將 Void 作為鍵的類型,然后將這個屬性設(shè)置為 VoidSerializer。
- value.serializer:一個類名,用來序列化消息的值。與設(shè)置 key.serializer 屬性一樣,需要設(shè)置成可以序列化消息值對象的類。
/* 創(chuàng)建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 創(chuàng)建一個生產(chǎn)者對象,設(shè)置鍵和值得類型,傳入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
3.發(fā)送消息到 Kafka
- 發(fā)送消息主要有以下 3 種方式:發(fā)送并忘記、同步發(fā)送、異步發(fā)送。
(1)發(fā)送并忘記
- 把消息發(fā)送給服務(wù)器,但并不關(guān)心它是否成功送達(dá)。大多數(shù)情況下,消息可以成功送達(dá),因?yàn)?Kafka 是高可用的,而且生產(chǎn)者有自動嘗試重發(fā)的機(jī)制。但是,如果發(fā)生了不可重試的錯誤或超時,那么消息將會丟失,應(yīng)用程序?qū)⒉粫盏饺魏涡畔⒒虍惓?/strong>。
- 在發(fā)送消息之前,生產(chǎn)者仍有可能拋出其他的異常。這些異??赡苁?SerializationException(序列化消息失?。ufferExhaustedException 或 TimeoutException(緩沖區(qū)已滿),或者InterruptException(發(fā)送線程被中斷)
/* 創(chuàng)建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 創(chuàng)建一個生產(chǎn)者對象,設(shè)置鍵和值得類型,傳入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
/* 指定目標(biāo)主題、消息的鍵和值 */
ProducerRecord<String, String> record = new ProducerRecord<>(
"CustomerCountry", "Precision Products", "France"
);
/*
* 發(fā)送 ProducerRecord 對象
* 消息會先被放進(jìn)緩沖區(qū),然后通過單獨(dú)的線程發(fā)送給服務(wù)器端。send() 方法會返回一個包含 RecordMetadata 的 Future 對象。因?yàn)槲覀冞x擇忽略返回值,所以不知道消息是否發(fā)送成功。
* 在發(fā)送消息之前,生產(chǎn)者仍有可能拋出其他的異常。這些異??赡苁?SerializationException(序列化消息失敗)、BufferExhaustedException 或 TimeoutException(緩沖區(qū)已滿),或者InterruptException(發(fā)送線程被中斷)。
*/
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
(2)同步發(fā)送
- 一般來說,生產(chǎn)者是異步的——我們調(diào)用 send() 方法發(fā)送消息,它會返回一個 Future對象。可以調(diào)用 get() 方法等待 Future 完成,這樣就可以在發(fā)送下一條消息之前知道當(dāng)前消息是否發(fā)送成功。如果采用同步發(fā)送方式,那么發(fā)送線程在這段時間內(nèi)就只能等待,什么也不做,甚至都不發(fā)送其他消息,這將導(dǎo)致糟糕的性能。
/* 創(chuàng)建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 創(chuàng)建一個生產(chǎn)者對象,設(shè)置鍵和值得類型,傳入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
/* 指定目標(biāo)主題、消息的鍵和值 */
ProducerRecord<String, String> record = new ProducerRecord<>(
"CustomerCountry", "Precision Products", "France"
);
/*
* 如果消息沒有發(fā)送成功,那么這個方法將拋出一個異常。如果沒有發(fā)生錯誤,那么我們將得到一個 RecordMetadata 對象,并能從中獲取消息的偏移量和其他元數(shù)據(jù)。
*/
try {
RecordMetadata recordMetadata = producer.send(record).get();
log.info("================topic: " + recordMetadata.topic() + ", offset: " + recordMetadata.offset() + ", partition: " + recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
}
(3)異步發(fā)送
- 大多數(shù)時候,并不需要等待響應(yīng)——盡管 Kafka 會把消息的目標(biāo)主題、分區(qū)信息和偏移量返回給客戶端,但對客戶端應(yīng)用程序來說可能不是必需的。不過,當(dāng)消息發(fā)送失敗,需要拋出異常、記錄錯誤日志或者把消息寫入“錯誤消息”文件以便日后分析診斷時,就需要用到這些信息了。為了能夠在異步發(fā)送消息時處理異常情況,生產(chǎn)者提供了回調(diào)機(jī)制。
- 為了使用回調(diào),需要一個實(shí)現(xiàn)了 org.apache.kafka.clients.producer.Callback 接口的類,這個接口只有一個 onCompletion 方法。
- 如果 Kafka 返回錯誤,那么 onCompletion 方法會收到一個非空(nonnull)異常。這里只是簡單地把它打印了出來,但在生產(chǎn)環(huán)境中應(yīng)該使用更好的處理方式。
/* 創(chuàng)建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 創(chuàng)建一個生產(chǎn)者對象,設(shè)置鍵和值得類型,傳入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
/* 指定目標(biāo)主題、消息的鍵和值 */
ProducerRecord<String, String> record = new ProducerRecord<>(
"CustomerCountry", "Precision Products", "France"
);
try {
producer.send(record, new Callback(){
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
log.info("================topic: " + recordMetadata.topic() +
", offset: " + recordMetadata.offset() +
", partition: " + recordMetadata.partition()
);
}
});
} catch (Exception e) {
e.printStackTrace();
}
4.生產(chǎn)者配置
/* 創(chuàng)建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("client.id","granola123");
kafkaProps.put("ack","all");
kafkaProps.put("max.block.ms","60000"); // 默認(rèn) 60000,即 60s
kafkaProps.put("batch.size","16384"); // 默認(rèn) 16384,即 16KB
kafkaProps.put("linge.ms","100"); // 改,默認(rèn) 0ms
kafkaProps.put("request.timeout.ms","30000"); // 默認(rèn) 30000,即 30s
kafkaProps.put("retries","3"); // 改,默認(rèn) 0次
kafkaProps.put("retry.backoff.ms","100"); // 默認(rèn) 100ms
kafkaProps.put("max.request.size","10485760"); // 默認(rèn) 1048576,即 1M
kafkaProps.put("compression.type","snappy "); // 默認(rèn)不壓縮
kafkaProps.put("buffer.memory","33554432"); // 默認(rèn) 33554432,即 32M
kafkaProps.put("receive.buffer.bytes","32768"); // 默認(rèn) 32768,即 32KB
kafkaProps.put("send.buffer.bytes","131072"); // 默認(rèn) 33554432,即 128KB
kafkaProps.put("max.in.flight.requests.per.connection","2"); // 默認(rèn) 5
kafkaProps.put("enable.idempotence",true); // 默認(rèn) false
(1)client.id
- 客戶端標(biāo)識符。值可以是任意字符串,broker 用它來識別從客戶端發(fā)送過來的消息。被用在日志、指標(biāo)和配額中。
(2)ack
- 指定了生產(chǎn)者在多少個分區(qū)副本收到消息的情況下才會認(rèn)為消息寫入成功。默認(rèn)情況下,Kafka 會在首領(lǐng)副本收到消息后向客戶端回應(yīng)消息寫入成功。acks 設(shè)置的值越小,生產(chǎn)者發(fā)送消息的速度就越快。也就是說,我們通過犧牲可靠性來換取較低的生產(chǎn)者延遲。不過,端到端延遲是指從消息生成到可供消費(fèi)者讀取的時間,這對 3 種配置來說都是一樣的。這是因?yàn)闉榱吮3忠恢滦裕?strong>在消息被寫入所有同步副本之前,Kafka 不允許消費(fèi)者讀取它們。因此,如果你關(guān)心的是端到端延遲,而不是生產(chǎn)者延遲,那么就不需要在可靠性和低延遲之間做權(quán)衡了:你可以選擇最可靠的配置,但仍然可以獲得相同的端到端延遲。建議根據(jù)實(shí)際情況設(shè)置,如果要嚴(yán)格保證消息不丟失,請?jiān)O(shè)置為all;如果允許存在丟失,建議設(shè)置為1;一般不建議設(shè)為0,除非無所謂消息丟不丟失。
- acks=0: 生產(chǎn)者不會等待任何來自 broker 的響應(yīng)。如果 broker 因?yàn)槟承﹩栴}沒有收到消息,那么生產(chǎn)者便無從得知,消息也就丟失了。生產(chǎn)者不需要等待 broker 返回響應(yīng),所以它們能夠以網(wǎng)絡(luò)可支持的最大速度發(fā)送消息,從而達(dá)到很高的吞吐量。
- acks=1: 只要集群的首領(lǐng)副本收到消息,生產(chǎn)者就會收到消息成功寫入的響應(yīng)。如果消息無法到達(dá)首領(lǐng)副本(比如首領(lǐng)副本發(fā)生崩潰,新首領(lǐng)還未選舉出來),那么生產(chǎn)者會收到一個錯誤響應(yīng)。為了避免數(shù)據(jù)丟失,生產(chǎn)者會嘗試重發(fā)消息。不過,在首領(lǐng)副本發(fā)生崩潰的情況下,如果消息還沒有被復(fù)制到新的首領(lǐng)副本,則消息還是有可能丟失。
- acks=all:只有當(dāng)所有副本全部收到消息時,生產(chǎn)者才會收到消息成功寫入的響應(yīng)。它的延遲比 acks=1 高,因?yàn)樯a(chǎn)者需要等待不止一個 broker 確認(rèn)收到消息。
(3)調(diào)用 send 方法后消息的傳遞時間(Kafka 返回成功響應(yīng)或放棄重試并承認(rèn)發(fā)送失敗的時間)
- 從 Kafka 2.1 開始,我們將 ProduceRecord 的發(fā)送時間分成如下兩個時間間隔,它們是被分開處理的。
- 異步調(diào)用 send() 所花費(fèi)的時間。在此期間,調(diào)用 send() 的線程將被阻塞。在發(fā)送消息之前,生產(chǎn)者仍有可能拋出其他的異常。這些異??赡苁?SerializationException(序列化消息失?。?、BufferExhaustedException 或 TimeoutException(緩沖區(qū)已滿),或者InterruptException(發(fā)送線程被中斷)。
- 從異步調(diào)用 send() 返回到觸發(fā)回調(diào)(不管是成功還是失?。┑臅r間,也就是從 ProduceRecord 被放到批次中直到Kafka成功響應(yīng)、出現(xiàn)不可恢復(fù)異?;虬l(fā)送超時的時間。
文章來源:http://www.zghlxwxcb.cn/news/detail-485806.html
- max.block.ms:用于控制在調(diào)用 send() 或通過 partitionsFor() 顯式地請求元數(shù)據(jù)時生產(chǎn)者可以發(fā)生阻塞的時間。
- 當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)被填滿或元數(shù)據(jù)不可用時,這些方法就可能發(fā)生阻塞。當(dāng)達(dá)到 max.block.ms 配置的時間時,就會拋出一個超時異常。
- 默認(rèn)值 60s。
- batch.size:當(dāng)有多條消息被發(fā)送給同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。這個參數(shù)指定了一個批次可以使用的內(nèi)存大小。需要注意的是,該參數(shù)是按照字節(jié)數(shù)而不是消息條數(shù)來計算的。
- 當(dāng)批次被填滿時,批次里所有的消息都將被發(fā)送出去。但是生產(chǎn)者并不一定都會等到批次被填滿時才將其發(fā)送出去。那些未填滿的批次,甚至只包含一條消息的批次也有可能被發(fā)送出去。
- 就算把批次大小設(shè)置得很大,也不會導(dǎo)致延遲,只是會占用更多的內(nèi)存而已。但如果把批次大小設(shè)置得太小,則會增加一些額外的開銷,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息。
- 默認(rèn)值為 16384,即 16KB。合理調(diào)大該參數(shù)值,能夠顯著提升生產(chǎn)端吞吐量,比如可以調(diào)整到 32KB,調(diào)大也意味著消息會有相對較大的延時。
- linger.ms:指定了生產(chǎn)者在發(fā)送消息批次之前等待更多消息加入批次的時間。生產(chǎn)者會在批次被填滿或等待時間達(dá)到 linger.ms 時把消息批次發(fā)送出去。
- 在默認(rèn)情況下,只要有可用的發(fā)送者線程,生產(chǎn)者都會直接把批次發(fā)送出去,就算批次中只有一條消息。
- 默認(rèn)值為 0,表示消息需要被立即發(fā)送,把 linger.ms 設(shè)置成比 0 大的數(shù),比如設(shè)置為 100ms,可以讓生產(chǎn)者在將批次發(fā)送給服務(wù)器之前等待一會兒,以使更多的消息加入批次中。雖然這樣會增加一點(diǎn)兒延遲,但也極大地提升了吞吐量。這是因?yàn)橐淮涡园l(fā)送的消息越多,每條消息的開銷就越小,如果啟用了壓縮,則計算量也更少了。
- request.timeout.ms:用于控制生產(chǎn)者在發(fā)送消息時等待服務(wù)器響應(yīng)的時間。
- 需要注意的是,這是指生產(chǎn)者在放棄之前等待每個請求的時間。如果設(shè)置的值已觸及,但服務(wù)器沒有響應(yīng),那么生產(chǎn)者將重試發(fā)送,或者執(zhí)行回調(diào),并傳給它一個 TimeoutException。
- 默認(rèn)值為 30000,即 30s,如果生產(chǎn)端負(fù)載很大,可以適當(dāng)調(diào)大以避免超時,比如可以調(diào)到 60000。
- retries 和 retry.backoff.ms:當(dāng)生產(chǎn)者收到來自服務(wù)器的錯誤消息時,這個錯誤有可能是暫時的(例如,一個分區(qū)沒有首領(lǐng))。在這種情況下,retries 參數(shù)可用于控制生產(chǎn)者在放棄發(fā)送并向客戶端宣告失敗之前可以重試多少次,默認(rèn)值為0,表示不進(jìn)行重試,這個參數(shù)一般是為了解決因瞬時故障導(dǎo)致的消息發(fā)送失敗,比如網(wǎng)絡(luò)抖動、leader換主,其中瞬時的leader重選舉是比較常見的,建議設(shè)置為一個大于 0 的值,比如 3 或者更大值。
- 在默認(rèn)情況下,重試時間間隔是 100 毫秒,但可以通過 retry.backoff.ms 參數(shù)來控制重試時間間隔,通常可以不調(diào)整。
- 可以測試一下 broker 在發(fā)生崩潰之后需要多長時間恢復(fù)(也就是直到所有分區(qū)都有了首領(lǐng)副本),讓重試時間大于 Kafka 集群從崩潰中恢復(fù)的時間,以免生產(chǎn)者過早放棄重試。
- 生產(chǎn)者并不會重試所有的錯誤。有些錯誤不是暫時的,生產(chǎn)者就不會進(jìn)行重試(例如,“消息太大”錯誤)。對于可重試的錯誤,生產(chǎn)者會自動進(jìn)行重試,所以不需要在應(yīng)用程序中處理重試邏輯。你要做的是集中精力處理不可重試的錯誤或者當(dāng)重試次數(shù)達(dá)到上限時的情況。
- 如果想完全禁用重試,那么唯一可行的方法是將 retries 設(shè)置為 0。
- delivery.timeout.ms:用于控制從消息準(zhǔn)備好發(fā)送(send() 方法成功返回并將消息放入批次中)到
broker 響應(yīng)或客戶端放棄發(fā)送(包括重試)所花費(fèi)的時間。- 這個時間應(yīng)該大于 linger.ms 和request.timeout.ms。如果配置的時間不滿足這一點(diǎn),則會拋出異常。
- 如果生產(chǎn)者在重試時超出了 delivery.timeout.ms,那么將執(zhí)行回調(diào),并會將 broker 之前返回的錯誤傳給它。如果消息批次還沒有發(fā)送完畢就超出了 delivery.timeout.ms,那么也將執(zhí)行回調(diào),并會將超時異常傳給它。
- 可以將這個參數(shù)配置成你愿意等待的最長時間,通常是幾分鐘,并使用默認(rèn)的重試次數(shù)(幾乎無限制)?;谶@樣的配置,只要生產(chǎn)者還有時間(或者在發(fā)送成功之前),它都會持續(xù)重試。這是一種合理的重試方式。
(4)max.request.size
- 限制了可發(fā)送的單條最大消息的大小和單個請求的消息總量的大小。假設(shè)這個參數(shù)的值為 1 MB,那么可發(fā)送的單條最大消息就是 1 MB,或者生產(chǎn)者最多可以在單個請求里發(fā)送一條包含 1024 個大小為 1 KB 的消息。另外,broker 對可接收的最大消息也有限制(message.max.bytes),其兩邊的配置最好是匹配的,以免生產(chǎn)者發(fā)送的消息被 broker 拒絕。
- 默認(rèn)值為1048576,即1M。
(5)compression.type
- 默認(rèn)情況下,生產(chǎn)者發(fā)送的消息是未經(jīng)壓縮的。這個參數(shù)可以被設(shè)置為 snappy、gzip、lz4 或 zstd,這指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法。
- snappy 壓縮算法由谷歌發(fā)明,雖然占用較少的 CPU 時間,但能提供較好的性能和相當(dāng)可觀的壓縮比。如果同時有性能和網(wǎng)絡(luò)帶寬方面的考慮,那么可以使用這種算法。
- gzip 壓縮算法通常會占用較多的 CPU 時間,但提供了更高的壓縮比。如果網(wǎng)絡(luò)帶寬比較有限,則可以使用這種算法。使用壓縮可以降低網(wǎng)絡(luò)傳輸和存儲開銷,而這些往往是向 Kafka 發(fā)送消息的瓶頸所在。
(6)buffer.memory
- 設(shè)置生產(chǎn)者要發(fā)送給服務(wù)器的消息的內(nèi)存緩沖區(qū)大小。如果應(yīng)用程序調(diào)用send() 方法的速度超過生產(chǎn)者將消息發(fā)送給服務(wù)器的速度,那么生產(chǎn)者的緩沖空間可能會被耗盡,后續(xù)的 send() 方法調(diào)用會等待內(nèi)存空間被釋放,如果在 max.block.ms 之后還沒有可用空間,就拋出異常。
- 需要注意的是,這個異常與其他異常不一樣,它是 send() 方法而不是 Future 對象拋出來的。
- 默認(rèn)值為33554432,即 32M。
(7)receive.buffer.bytes 和 send.buffer.bytes
- TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。被設(shè)為 –1,就使用操作系統(tǒng)默認(rèn)值。
- receive.buffer.bytes 默認(rèn)是 32768,32KB,send.buffer.bytes 默認(rèn)是 131072,128KB。
(8)max.in.flight.requests.per.connection
- 指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個消息批次。它的值越大,占用的內(nèi)存就越多,不過吞吐量也會得到提升。在單數(shù)據(jù)中心環(huán)境中,該參數(shù)被設(shè)置為 2 時可以獲得最佳的吞吐量,但使用默認(rèn)值 5 也可以獲得差不多的性能。
☆☆☆ Kafka 中順序保證
- Kafka 可以保證同一個分區(qū)中的消息是有序的。也就是說,如果生產(chǎn)者按照一定的順序發(fā)送消息,那么 broker 會按照這個順序把它們寫入分區(qū),消費(fèi)者也會按照同樣的順序讀取它們。在某些情況下,順序是非常重要的。例如,向一個賬戶中存入 100 元再取出來與先從賬戶中取錢再存回去是截然不同的!不過,有些場景對順序不是很敏感。
- 假設(shè)我們把 retries 設(shè)置為非零的整數(shù),并把 max.in.flight.requests.per.connection 設(shè)置為比 1 大的數(shù)。如果第一個批次寫入失敗,第二個批次寫入成功,那么 broker 會重試寫入第一個批次,等到第一個批次也寫入成功,兩個批次的順序就反過來了。
- 我們希望至少有 2 個正在處理中的請求(出于性能方面的考慮),并且可以進(jìn)行多次重試(出于可靠性方面的考慮),這個時候,最好的解決方案是將enable.idempotence 設(shè)置為 true。這樣就可以在最多有 5 個正在處理中的請求的情況下保證順序,并且可以保證重試不會引入重復(fù)消息。第 8 章將深入探討冪等生產(chǎn)者。
(9)enable.idempotence
- 從 0.11 版本開始,Kafka 支持精確一次性(exactly once)語義。冪等生產(chǎn)者是它的一個簡單且重要的組成部分。
- 假設(shè)為了最大限度地提升可靠性,你將生產(chǎn)者的 acks 設(shè)置為 all,并將 delivery.timeout.ms 設(shè)置為一個比較大的數(shù),允許進(jìn)行盡可能多的重試。這些配置可以確保每條消息被寫入 Kafka 至少一次。但在某些情況下,消息有可能被寫入 Kafka 不止一次。假設(shè)一個 broker 收到了生產(chǎn)者發(fā)送的消息,然后消息被寫入本地磁盤并成功復(fù)制給了其他 broker。此時,這個 broker 還沒有向生產(chǎn)者發(fā)送響應(yīng)就發(fā)生了崩潰。而生產(chǎn)者將一直等待,直到達(dá)到request.timeout.ms,然后進(jìn)行重試。重試發(fā)送的消息將被發(fā)送給新的首領(lǐng),而這個首領(lǐng)已經(jīng)有這條消息的副本,因?yàn)橹皩懭氲南⒁呀?jīng)被成功復(fù)制給它了?,F(xiàn)在,你就有了一條重復(fù)的消息。
- 為了避免這種情況,可以將 enable.idempotence 設(shè)置為 true。當(dāng)冪等生產(chǎn)者被啟用時,生產(chǎn)者將給發(fā)送的每一條消息都加上一個序列號。如果 broker 收到具有相同序列號的消息,那么它就會拒絕第二個副本,而生產(chǎn)者則會收到 DuplicateSequenceException,這個異常對生產(chǎn)者來說是無害的。
- 如果要啟用冪等性,那么 max.in.flight.requests.per.connection 應(yīng)小于或等于 5、retries 應(yīng)大于 0,并且 acks 被設(shè)置為 all。如果設(shè)置了不恰當(dāng)?shù)闹?,則會拋出 ConfigException 異常。
- 默認(rèn)為 false。
5.分區(qū)
(1)鍵
- ProducerRecord 對象包含了主題名稱、記錄的鍵和值。Kafka 消息就是一個個的鍵 – 值對,ProducerRecord 對象可以只包含主題名稱和值,鍵默認(rèn)情況下是 null。如果要創(chuàng)建鍵為 null 的消息,那么不指定鍵就可以了。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "USA");
- 大多數(shù)應(yīng)用程序還是會用鍵來發(fā)送消息。鍵有兩種用途:一是作為消息的附加信息與消息保存在一起,二是用來確定消息應(yīng)該被寫入主題的哪個分區(qū)。具有相同鍵的消息將被寫入同一個分區(qū)。如果一個進(jìn)程只從主題的某些分區(qū)讀取數(shù)據(jù),那么具有相同鍵的所有記錄都會被這個進(jìn)程讀取。要創(chuàng)建一個包含鍵和值的記錄,只需像下面這樣創(chuàng)建一個 ProducerRecord 即可。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
(2)默認(rèn)的分區(qū)器
鍵為 null
- 如果鍵為 null,并且使用了默認(rèn)的分區(qū)器,那么記錄將被隨機(jī)發(fā)送給主題的分區(qū)。分區(qū)器使用輪詢調(diào)度(round-robin)算法將消息均衡地分布到各個分區(qū)中。
- 從 Kafka 2.4 開始,在處理鍵為 null 的記錄時,默認(rèn)分區(qū)器使用的輪詢調(diào)度算法具備了黏性。
- kafka producer 發(fā)送數(shù)據(jù)并不是一個一個消息發(fā)送,而是取決于兩個 producer 端參數(shù)。一個是
linger.ms
,默認(rèn)是 0ms,當(dāng)達(dá)到這個時間后,kafka producer 就會立刻向 broker 發(fā)送數(shù)據(jù)。另一個參數(shù)是batch.size
,默認(rèn)是 16kb,當(dāng)產(chǎn)生的消息數(shù)達(dá)到這個大小后,就會立即向 broker 發(fā)送數(shù)據(jù)。 - 按照這個設(shè)計,從直觀上思考,肯定是希望每次都盡可能填滿一個 batch 再發(fā)送到一個分區(qū)。
- 但實(shí)際決定 batch 如何形成的一個因素是分區(qū)策略(partitionerstrategy)。
- 在 Kafka 2.4 版本之前,在 producer 發(fā)送數(shù)據(jù)默認(rèn)的分區(qū)策略是輪詢策略(沒指定 key 的情況。如果多條消息不是被發(fā)送到相同的分區(qū),它們就不能被放入到一個batch中。
- 所以如果使用默認(rèn)的輪詢 partition 策略,可能會造成一個大的 batch 被輪詢成多個小的 batch 的情況。
- 鑒于此,社區(qū)于 2.4 版本引入了黏性分區(qū)策略(Sticky Partitioning Strategy),該策略是一種全新的策略,能夠顯著地降低給消息指定分區(qū)過程中的延時,并減少 broker 的負(fù)載。它會隨機(jī)地選擇一個分區(qū)并會盡可能地堅(jiān)持使用該分區(qū)——即所謂的粘住這個分區(qū)。
- kafka producer 發(fā)送數(shù)據(jù)并不是一個一個消息發(fā)送,而是取決于兩個 producer 端參數(shù)。一個是
鍵不為 null
- 如果鍵不為空且使用了默認(rèn)的分區(qū)器,那么 Kafka 會對鍵進(jìn)行哈希(使用 Kafka 自己的哈希算法,即使升級 Java 版本,哈希值也不會發(fā)生變化),然后根據(jù)哈希值把消息映射到特定的分區(qū)。這里的關(guān)鍵在于同一個鍵總是被映射到同一個分區(qū),所以在進(jìn)行映射時,會用到主題所有的分區(qū),而不只是可用的分區(qū)。這也意味著,如果在寫入數(shù)據(jù)時目標(biāo)分區(qū)不可用,那么就會出錯。
(3)其它分區(qū)器
- 除了默認(rèn)的分區(qū)器,Kafka 客戶端還提供了 RoundRobinPartitioner 和 UniformStickyPartitioner。在消息包含鍵的情況下,可以用它們來實(shí)現(xiàn)隨機(jī)分區(qū)分配和黏性隨機(jī)分區(qū)分配。對某些應(yīng)用程序(例如,ETL 應(yīng)用程序會將數(shù)據(jù)從 Kafka 加載到關(guān)系數(shù)據(jù)庫中,并使用 Kafka 記錄的鍵作為數(shù)據(jù)庫的主鍵)來說,鍵很重要,但如果負(fù)載出現(xiàn)了傾斜,那么其中某些鍵就會對應(yīng)較大的負(fù)載。這個時候,可以用 UniformStickyPartitioner 將負(fù)載均衡地分布到所有分區(qū)。
- 默認(rèn)的分區(qū)器,如果有key的話,那么它是按照 key 來決定分區(qū)的,這個時候并不會使用粘性分區(qū)。UniformStickyPartitioner 不管你有沒有 key,統(tǒng)一都用粘性分區(qū)來分配。
Properties kafkaProps = new Properties();
kafkaProps .put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");
(4)自定義分區(qū)策略
- 假設(shè)你是 B2B 供應(yīng)商,你有一個大客戶,它是手持設(shè)備 Banana 的制造商。你的日常交易中有 10% 以上的交易與這個客戶有關(guān)。如果使用默認(rèn)的哈希分區(qū)算法,那么與 Banana 相關(guān)的記錄就會和其他客戶的記錄一起被分配給相同的分區(qū),導(dǎo)致這個分區(qū)比其他分區(qū)大很多。服務(wù)器可能會出現(xiàn)存儲空間不足、請求處理緩慢等問題。因此,需要給 Banana 分配單獨(dú)的分區(qū),然后使用哈希分區(qū)算法將其他記錄分配給其他分區(qū)。
package com.chb.partitioner;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages "+ "to have customer name as key");
if (((String) key).equals("Banana"))
return numPartitions - 1; // Banana的記錄總是被分配到最后一個分區(qū)
// 其他記錄被哈希到其他分區(qū)
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
public void close() {}
}
- 在 Kafka 配置參數(shù)時設(shè)置分區(qū)器的類。
/* 創(chuàng)建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("partitioner.class", "com.chb.partitioner.BananaPartitioner");
6.標(biāo)頭
- 除了鍵和值,記錄還可以包含標(biāo)頭??梢栽诓桓淖冇涗涙I – 值對的情況下向標(biāo)頭中添加一些有關(guān)記錄的元數(shù)據(jù)。
- 標(biāo)頭指明了記錄數(shù)據(jù)的來源,可以在不解析消息體的情況下根據(jù)標(biāo)頭信息來路由或跟蹤消息(消息有可能被加密,而路由器沒有訪問加密數(shù)據(jù)的權(quán)限)。
- 標(biāo)頭由一系列有序的鍵值對組成。鍵是字符串,值可以是任意被序列化的對象,就像消息里的值一樣。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
record.headers().add("privacy-level","YOLO".getBytes(StandardCharsets.UTF_8));
7.攔截器
- 有時候,你希望在不修改代碼的情況下改變 Kafka 客戶端的行為。這或許是因?yàn)槟阆虢o公司所有的應(yīng)用程序都加上同樣的行為,或許是因?yàn)闊o法訪問應(yīng)用程序的原始代碼。常見的生產(chǎn)者攔截器應(yīng)用場景包括:捕獲監(jiān)控和跟蹤信息、為消息添加標(biāo)頭,以及敏感信息脫敏。
- 下面是一個非常簡單的生產(chǎn)者攔截器示例,它只是簡單地統(tǒng)計在特定時間窗口內(nèi)發(fā)送和接
收的消息數(shù)量。- 可以覆蓋 configure 方法,并在調(diào)用其他方法之前設(shè)置好需要的東西。這個方法的參數(shù)包含了生產(chǎn)者所有的配置屬性,你可以隨意訪問它們。示例中,添加了一個自己的配置屬性。
- close 方法會在生產(chǎn)者被關(guān)閉時調(diào)用,我們可以借助這個機(jī)會清理攔截器的狀態(tài)。示例中,關(guān)閉了之前創(chuàng)建的線程。如果你打開了文件句柄、與遠(yuǎn)程數(shù)據(jù)庫建立了接,或者做了其他類似的操作,那么可以在這里關(guān)閉所有的資源,以免發(fā)生資源泄漏。
public class CountingProducerInterceptor implements ProducerInterceptor {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
static AtomicLong numSent = new AtomicLong(0);
static AtomicLong numAcked = new AtomicLong(0);
public void configure(Map<String, ?> map) {
Long windowSize = Long.valueOf((String) map.get("counting.interceptor.window.size.ms"));
executorService.scheduleAtFixedRate(CountingProducerInterceptor::run,windowSize, windowSize, TimeUnit.MILLISECONDS);
}
public ProducerRecord onSend(ProducerRecord producerRecord) {
numSent.incrementAndGet();
return producerRecord;
}
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
numAcked.incrementAndGet();
}
public void close() {
executorService.shutdownNow();
}
public static void run() {
System.out.println(numSent.getAndSet(0));
System.out.println(numAcked.getAndSet(0));
}
}
- Kafka 的 ProducerInterceptor 攔截器包含兩個關(guān)鍵方法。
- ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) 這個方法會在記錄被發(fā)送給 Kafka 之前,甚至是在記錄被序列化之前調(diào)用。如果覆蓋了這個方法,那么你就可以捕獲到有關(guān)記錄的信息,甚至可以修改它。只需確保這個方法返回一個有效的 ProducerRecord 對象。這個方法返回的記錄將被序列化并發(fā)送給 Kafka。
- void onAcknowledgement(RecordMetadata metadata, Exception exception) 這個方法會在收到 Kafka 的確認(rèn)響應(yīng)時調(diào)用。如果覆蓋了這個方法,則不可以修改 Kafka 返回的響應(yīng),但可以捕獲到有關(guān)響應(yīng)的信息。
8.配額和節(jié)流
- Kafka 可以限制生產(chǎn)消息和消費(fèi)消息的速率,這是通過配額機(jī)制來實(shí)現(xiàn)的。Kafka 提供了 3 種配額類型:生產(chǎn)、消費(fèi)和請求。生產(chǎn)配額和消費(fèi)配額限制了客戶端發(fā)送和接收數(shù)據(jù)的速率(以字節(jié) / 秒為單位)。請求配額限制了 broker 用于處理客戶端請求的時間百分比。
- 可以為所有客戶端(使用默認(rèn)配額)、特定客戶端、特定用戶,或特定客戶端及特定用戶設(shè)置配額。特定用戶的配額只在集群配置了安全特性并對客戶端進(jìn)行了身份驗(yàn)證后才有效。
- 默認(rèn)的生產(chǎn)配額和消費(fèi)配額是 broker 配置文件的一部分。如果要限制每個生產(chǎn)者平均發(fā)送的消息不超過 2 MBps,那么可以在 broker 配置文件中加入 quota.producer.default=2M。
- 也可以覆蓋 broker 配置文件中的默認(rèn)配額來為某些客戶端配置特定的配額,盡管不建議這么做。如果允許 clientA 的配額達(dá)到 4 MBps、clientB 的配額達(dá)到 10 MBps,則可以這樣配置:quota.producer.override=“clientA:4M,clientB:10M”。
- 在配置文件中指定的配額都是靜態(tài)的,如果要修改它們,則需要重啟所有的 broker。因?yàn)殡S時都可能有新客戶端加入,所以這種配置方式不是很方便。因此,特定客戶端的配額通常采用動態(tài)配置??梢杂?kafka-config.sh 或 AdminClient API 來動態(tài)設(shè)置配額。
- 下面來看一些例子。
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024' --entity-name clientC --entity-type clients ?
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024,consumer_byte_rate=2048' --entity-name user1 --entity-type users ?
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'consumer_
byte_rate=2048' --entity-type users ?
? 限制 clientC(通過客戶端 ID 來識別)每秒平均發(fā)送不超過 1024 字節(jié)。
? 限制 user1(通過已認(rèn)證的賬號來識別)每秒平均發(fā)送不超過 1024 字節(jié)以及每秒平均消
費(fèi)不超過 2048 字節(jié)。
? 限制所有用戶每秒平均消費(fèi)不超過 2048 字節(jié),有覆蓋配置的特定用戶除外。這也是動態(tài)修改默認(rèn)配置的一種方式。文章來源地址http://www.zghlxwxcb.cn/news/detail-485806.html
- 當(dāng)客戶端觸及配額時,broker 會開始限制客戶端請求,以防止超出配額。這意味著 broker將延遲對客戶端請求做出響應(yīng)。對大多數(shù)客戶端來說,這樣會自動降低請求速率(因?yàn)閳?zhí)行中的請求數(shù)量也是有限制的),并將客戶端流量降到配額允許的范圍內(nèi)。但是,被節(jié)流的客戶端還是有可能向服務(wù)器端發(fā)送額外的請求,為了不受影響,broker 將在一段時間內(nèi)暫停與客戶端之間的通信通道,以滿足配額要求。
- 節(jié)流行為通過 produce-throttle-time-avg、produce-throttle-time-max、fetch-throttle-time-avg 和fetch-throttle-time-max 暴露給客戶端,這幾個參數(shù)是生產(chǎn)請求和消費(fèi)請求因節(jié)流而被延遲的平均時間和最長時間。需要注意的是,這些時間對應(yīng)的是生產(chǎn)消息和消費(fèi)消息的吞吐量配額、請求時間配額,或兩者兼而有之。其他類型的客戶端請求只會因觸及請求時間配額而被節(jié)流,這些節(jié)流行為也會通過其他類似的指標(biāo)暴露出來。
- 如果你異步調(diào)用 Producer.send(),并且發(fā)送速率超過了 broker 能夠接受的速率(無論是由于配額的限制還是由于處理能力不足),那么消息將會被放入客戶端的內(nèi)存隊(duì)列。如果發(fā)送速率一直快于接收速率,那么客戶端最終將耗盡內(nèi)存緩沖區(qū),并阻塞后續(xù)的 Producer.send() 調(diào)用。如果超時延遲不足以讓 broker 趕上生產(chǎn)者,使其清理掉一些緩沖區(qū)空間,那么 Producer.send() 最終將拋出 TimeoutException 異常?;蛘?,批次里的記錄因?yàn)榈却龝r間超過了 delivery.timeout.ms 而過期,導(dǎo)致執(zhí)行 send() 的回調(diào),并拋出TimeoutException 異常。因此,要做好計劃和監(jiān)控,確保 broker 的處理能力總是與生產(chǎn)者發(fā)送數(shù)據(jù)的速率相匹配。
到了這里,關(guān)于(三)Kafka 生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!