-
生產(chǎn)者通過
producerRecord
對象封裝消息主題、消息的value(內(nèi)容)、timestamp(時間戳)等 -
生產(chǎn)者通過
send()
方法發(fā)送消息,send()方法會經(jīng)過如下幾步
1. 首先將消息交給攔截器(Interceptor)
處理, 攔截器對生產(chǎn)者而言,對所有消息都是生效的,攔截器也支持鏈?zhǔn)骄幊蹋ㄘ?zé)任器鏈)的效果,攔截器一般將一些通用的功能加進(jìn)來,通常在消息發(fā)送前,producer回調(diào)邏輯前對消息做一些定制化需求,消息頭部添加消息的屬性等
2. 接下來交給序列化器(Serializer)
,Key的序列化器和value的序列化器,對消息的key和value進(jìn)行序列化,序列化為字節(jié)數(shù)組,
3. 然后將序列化的結(jié)果交給分區(qū)器(Partitioner)
,分區(qū)器有3種策略
來計算消息應(yīng)該屬于哪個分區(qū),-
在producerRecord中直接
指定分區(qū)
,分區(qū)器會直接將消息放到指定分區(qū) -
如果沒有指定分區(qū)器,但是
消息有key
,分區(qū)器會根據(jù)消息的key計算hash值,根據(jù)主題分區(qū)數(shù)量取模,來決定將消息放到哪個分區(qū) -
如果沒有指定分區(qū)、也沒有指定key,分區(qū)器會以
輪詢(Round Robin)
的方式給消息分配分區(qū)在這里插入圖片描述
-
-
消息經(jīng)過以上攔截器->序列化器->分區(qū)器 進(jìn)行加工后,會將消息放到
RecordAccumulator緩沖區(qū)
,對每個分區(qū)都會有一個單獨的緩沖區(qū),經(jīng)過分區(qū)器計算出分區(qū)號之后,不同的消息就會分配給不同的緩沖區(qū),緩沖區(qū)里面消息也是有序
的,我們可以指定對緩沖區(qū)里的消息進(jìn)行分批次
,也可以指定緩沖區(qū)大小
-
-
當(dāng)緩沖區(qū)中消息達(dá)到條件會按批次發(fā)送到broker對應(yīng)分區(qū)上
-
broker將接收到的消息進(jìn)行刷盤持久化
-
一個消息發(fā)出去之后,服務(wù)器(broker)會返回給producer響應(yīng),producer再來判斷消息是否發(fā)送成功,
-
broker返回元數(shù)據(jù)信息 - > 落盤成功 ->生產(chǎn)者繼續(xù)發(fā)送后面消息
-
broker返回元數(shù)據(jù)信息 - >落盤失敗 - 生產(chǎn)者設(shè)置了重試次數(shù) -> producer 會將消息重新放入緩沖區(qū)進(jìn)行排隊,等待再次發(fā)送,當(dāng)一個消息發(fā)送失敗重試需要重發(fā),消息是放到緩沖區(qū)隊尾,
-
生產(chǎn)者去緩沖區(qū)重試發(fā)送
生產(chǎn)者在重試消息時,消息的順序就錯了,那怎么保證消息的有序性呢?
針對這種情況,可以做一個配置,
參數(shù):max.in.flight.requests.per.connection
表示producer 在收到broker響應(yīng)之前可以發(fā)送多少批消息,默認(rèn)5,
設(shè)置此值是1,表示broker在響應(yīng)之前producer不能再向同一個broker發(fā)送請求,就是我確認(rèn)一批你再發(fā)下一批,這樣可以保證消息有序性,對消息順序要求不高情況可以不考慮
補充:
-
Producer 創(chuàng)建時,會創(chuàng)建一個Sender線程(IO線程)設(shè)置為守護(hù)線程
-
Producer 創(chuàng)建時,會創(chuàng)建緩沖區(qū)
-
Producer 生產(chǎn)消息,內(nèi)部是一個
異步
流程,Sender
線程不斷輪詢
RecordAccumulator,滿足條件后進(jìn)行真正的網(wǎng)絡(luò)IO發(fā)送消息 -
-
RecordAccumulator(緩沖區(qū)) 對每一個分區(qū)都有一個緩沖區(qū)文章來源:http://www.zghlxwxcb.cn/news/detail-595877.html
- 每個分區(qū)的緩沖區(qū)中消息也是有序的
- 可以指定緩沖區(qū)中的消息按
批次
發(fā)送- 緩沖區(qū)大小達(dá)到
batch.size
,默認(rèn)16KB - 在緩沖區(qū)等待時間
lingger.ms
達(dá)到上限 - 以上兩個條件滿足一個即發(fā)送一批
- 緩沖區(qū)大小達(dá)到
- 可以指定整個緩沖區(qū)的大小
批次的概念很好理解,緩沖區(qū)就像一輛公交車,有兩種發(fā)車方式,一是人滿了就發(fā)車,一是等5分鐘就發(fā)車,不管是人滿了還是到5分鐘了,發(fā)車,go~
分批發(fā)送可以減少網(wǎng)絡(luò)IO,節(jié)省帶寬使用,減少網(wǎng)絡(luò)傳輸?shù)膲毫?,提升吞吐?span toymoban-style="hidden">文章來源地址http://www.zghlxwxcb.cn/news/detail-595877.html
- 一個批次消息發(fā)送后,通過網(wǎng)絡(luò),發(fā)往Kafka指定分區(qū),然后刷盤到broker
- 如果Producer設(shè)置了
retries
參數(shù)值>0,那么允許消息發(fā)送失敗進(jìn)行重試,重試機制由客戶端Producer內(nèi)部實現(xiàn) - Broker端消息落盤成功,會返回元數(shù)據(jù)給生產(chǎn)者
- 通過阻塞直接返回 (同步發(fā)送)
- 通過回調(diào)函數(shù)返回(異步發(fā)送)
到了這里,關(guān)于Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!