目錄
一、生產(chǎn)者的初始化流程
二、生產(chǎn)者到緩沖隊(duì)列的流程
三、Sender拉取數(shù)據(jù)到Kafka流程
四、消費(fèi)者初始化
五、主題訂閱原理
六、消費(fèi)者抓取數(shù)據(jù)原理
七、消費(fèi)者組初始化
八、消費(fèi)者組消費(fèi)流程
九、提交offset原理文章來源:http://www.zghlxwxcb.cn/news/detail-724658.html
一、生產(chǎn)者的初始化流程
- 首先獲取事務(wù)id和客戶端id(用到事物必須要事物id不然報(bào)錯(cuò),每個(gè)生產(chǎn)者都需要唯一標(biāo)識(shí)客戶端id)
- 監(jiān)控kafka相關(guān)情況的JmxReporter配置
- 然后獲取分區(qū)器,如果用戶有自定義的就讀取配置的,如果沒有配置就用默認(rèn)分區(qū)器
- 然后key和value進(jìn)行序列化
- 然后就讀取自定義攔截器,可以定義多個(gè)攔截器,組成攔截器鏈
- 然后初始化控制單條日志的大小,默認(rèn)是1m;緩沖區(qū)大小,默認(rèn)32m;
- 創(chuàng)建內(nèi)存池,緩存隊(duì)列,初始化批次大小默認(rèn)16k,壓縮相關(guān)處理,默認(rèn)是none,重試間隔時(shí)間默認(rèn)100ms
- 連接kafka集群,獲取元數(shù)據(jù),才能知道要發(fā)送到哪個(gè)分區(qū)
- 創(chuàng)建sender線程,會(huì)有個(gè)創(chuàng)建sender的方法,sender線程負(fù)責(zé)拉取緩沖隊(duì)列消息到Kafka,在方法里面會(huì)定義緩存請(qǐng)求的個(gè)數(shù)默認(rèn)5個(gè),然后請(qǐng)求超時(shí)的時(shí)間,然后創(chuàng)建一個(gè)網(wǎng)絡(luò)請(qǐng)求客戶端對(duì)象,會(huì)傳入剛剛的參數(shù)還有客戶端id,重試時(shí)間,發(fā)送緩沖區(qū)的大小128和接受緩沖區(qū)的大小32,還有acks等配置。sender繼承了Runnbale接口,然后會(huì)new個(gè)sender線程出來用上面這些參數(shù),然后返回。
- sender放到后臺(tái),啟動(dòng)sender線程
二、生產(chǎn)者到緩沖隊(duì)列的流程
- 在執(zhí)行到攔截器的時(shí)候就要調(diào)用一個(gè)onSend方法,如果有多個(gè)攔截器,每個(gè)攔截器都會(huì)走一次這個(gè)方法,這個(gè)方法就是攔截器對(duì)數(shù)據(jù)加工的
- 然后獲取元數(shù)據(jù),要根據(jù)主題的分區(qū)放到對(duì)應(yīng)的緩存隊(duì)列
- 序列化相關(guān)操作key和value的序列化和壓縮
- 分區(qū)操作,如果指定了分區(qū),直接分配到指定分區(qū);沒有指定就會(huì)根據(jù)分區(qū)器進(jìn)行分配,沒有指定key就會(huì)粘性分區(qū)處理(如果批次大小和活著時(shí)間到了不然就一直是那個(gè),滿足才能創(chuàng)建新隊(duì)列用),如果指定key就根據(jù)key到hashcode進(jìn)分區(qū)數(shù)取模,
- 保證(序列化和壓縮后)數(shù)據(jù)大小能夠傳輸,他去讀取配置的消息最大值和緩沖區(qū)大小,如果有超過的拋異常
- 向緩存隊(duì)列里面追加數(shù)據(jù),獲取或者創(chuàng)建一個(gè)隊(duì)列按照分區(qū),然后嘗試添加數(shù)據(jù)(一般不成功,因?yàn)檫€沒申請(qǐng)內(nèi)存),然后根據(jù)16k和現(xiàn)在壓縮后的總大小取最大值,申請(qǐng)內(nèi)存就申請(qǐng)這個(gè)大小,內(nèi)存池分配內(nèi)存,然后sender線程拿走就了會(huì)釋放內(nèi)存。
- 如果批次大小滿了或者有了新的批次需要?jiǎng)?chuàng)建,就喚醒sender線程把緩沖隊(duì)列的數(shù)據(jù)拉取過去。
三、Sender拉取數(shù)據(jù)到Kafka流程
- 事務(wù)相關(guān)操作
- 獲取元數(shù)據(jù)信息,為了知道發(fā)到哪個(gè)分區(qū)
- 判斷32m緩存是否準(zhǔn)備好,先獲取隊(duì)列的信息,先判斷內(nèi)存隊(duì)列有沒有數(shù)據(jù)
- 判斷l(xiāng)eader是不是空如果沒有目標(biāo)那還是會(huì)拋出異常,如果批次大小或時(shí)間滿足一個(gè)條件,就會(huì)發(fā)送。
- 把所有請(qǐng)求按照節(jié)點(diǎn)為單位來發(fā)送請(qǐng)求,這樣一臺(tái)機(jī)器只需要建立一次連接
- 封裝了個(gè)request然后通過網(wǎng)絡(luò)客戶端把數(shù)據(jù)發(fā)送過去
- 然后服務(wù)端還是通過網(wǎng)絡(luò)客戶端獲取結(jié)果
四、消費(fèi)者初始化
- 消費(fèi)者組平衡
- 獲取消費(fèi)者組id和客戶端id
- 設(shè)置請(qǐng)求服務(wù)端等待時(shí)間,默認(rèn)30秒;重試時(shí)間,默認(rèn)100毫秒
- 攔截器鏈相關(guān)處理
- key和value的反序列化
- 判斷offset從什么位置開始消費(fèi)
- 獲取消費(fèi)者元數(shù)據(jù)(重試時(shí)間、是否允許訪問系統(tǒng)主題默認(rèn)false,是否允許自動(dòng)創(chuàng)建topic主題默認(rèn)true)
- 連接Kafka集群
- 創(chuàng)建網(wǎng)絡(luò)客戶端對(duì)象(連接重試時(shí)間默認(rèn)50ms,最大重試時(shí)間1s,發(fā)送緩沖區(qū)128kb和接受緩沖區(qū)64kb大?。?/li>
- 指定消費(fèi)者分區(qū)分配策略
- 創(chuàng)建coordinator對(duì)象
- 設(shè)置自動(dòng)提交offset時(shí)間,默認(rèn)5s,配置抓取數(shù)據(jù)的參數(shù)(最少抓取多少最大一次抓取多少等)
五、主題訂閱原理
- 傳入要訂閱的主題,如果為null直接拋出異常
- 注冊(cè)負(fù)載均衡監(jiān)聽器,如果消費(fèi)者組中有節(jié)點(diǎn)掛了,要通知其他消費(fèi)者
- 按照主題自動(dòng)訂閱進(jìn)行分配
六、消費(fèi)者抓取數(shù)據(jù)原理
- 他首先先初始化消費(fèi)者組和隊(duì)列
- 然后回調(diào)消息會(huì)到緩沖隊(duì)列,然后去隊(duì)列抓取數(shù)據(jù),最多一次500條
- 然后抓取后攔截器開始處理數(shù)據(jù)
七、消費(fèi)者組初始化
- 先判斷coordinator不為null那就說明為消費(fèi)者組
- 如果沒有指定分區(qū)分配策略會(huì)拋出異常
- 判斷coordinator是否準(zhǔn)備好,他會(huì)循環(huán)創(chuàng)建查找coordinator的請(qǐng)求并發(fā)送,并獲取服務(wù)器返回到結(jié)果
他這整個(gè)消費(fèi)者組初始化就是判斷coordinator有沒有準(zhǔn)備好文章來源地址http://www.zghlxwxcb.cn/news/detail-724658.html
八、消費(fèi)者組消費(fèi)流程
- 他會(huì)用判斷coordinator是不是空,是的話就等待
- 他上來先去隊(duì)列拉取數(shù)據(jù),一般是拉取不到的
- 他先構(gòu)造請(qǐng)求的入?yún)ⅲㄗ钌僖淮巫ザ嗌伲疃嘧ザ嗌?,超時(shí)時(shí)間等待)然后調(diào)用send
- 他送后返回future,通過回調(diào)獲取數(shù)據(jù)的
- 他會(huì)循環(huán)遍歷數(shù)據(jù)獲取分區(qū),獲取分區(qū)的數(shù)據(jù),如果有數(shù)據(jù)就放到消息隊(duì)列里面
- 然后就調(diào)用從隊(duì)列拉取數(shù)據(jù)的方法拉取,然后他有大小限制最大500,他會(huì)循環(huán)一波一波拉取過去
- 然后放到攔截器走加工操作
九、提交offset原理
- 同步提交:找到coordinator然后調(diào)用commitOffset進(jìn)行發(fā)送,然后不停dowhile循環(huán),調(diào)用發(fā)送提交請(qǐng)求,然后等待回調(diào)獲取結(jié)果,一直循環(huán)到成功為止。
- 異步提交:他還是用coordinator去提交但是他不等待結(jié)果,他new了個(gè)監(jiān)聽等待結(jié)果。
到了這里,關(guān)于Kafka源碼簡(jiǎn)要分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!