国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka源碼簡(jiǎn)要分析

這篇具有很好參考價(jià)值的文章主要介紹了Kafka源碼簡(jiǎn)要分析。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

一、生產(chǎn)者的初始化流程

二、生產(chǎn)者到緩沖隊(duì)列的流程

三、Sender拉取數(shù)據(jù)到Kafka流程

四、消費(fèi)者初始化

五、主題訂閱原理

六、消費(fèi)者抓取數(shù)據(jù)原理

七、消費(fèi)者組初始化

八、消費(fèi)者組消費(fèi)流程

九、提交offset原理


一、生產(chǎn)者的初始化流程

  1. 首先獲取事務(wù)id和客戶端id(用到事物必須要事物id不然報(bào)錯(cuò),每個(gè)生產(chǎn)者都需要唯一標(biāo)識(shí)客戶端id)
  2. 監(jiān)控kafka相關(guān)情況的JmxReporter配置
  3. 然后獲取分區(qū)器,如果用戶有自定義的就讀取配置的,如果沒有配置就用默認(rèn)分區(qū)器
  4. 然后key和value進(jìn)行序列化
  5. 然后就讀取自定義攔截器,可以定義多個(gè)攔截器,組成攔截器鏈
  6. 然后初始化控制單條日志的大小,默認(rèn)是1m;緩沖區(qū)大小,默認(rèn)32m;
  7. 創(chuàng)建內(nèi)存池,緩存隊(duì)列,初始化批次大小默認(rèn)16k,壓縮相關(guān)處理,默認(rèn)是none,重試間隔時(shí)間默認(rèn)100ms
  8. 連接kafka集群,獲取元數(shù)據(jù),才能知道要發(fā)送到哪個(gè)分區(qū)
  9. 創(chuàng)建sender線程,會(huì)有個(gè)創(chuàng)建sender的方法,sender線程負(fù)責(zé)拉取緩沖隊(duì)列消息到Kafka,在方法里面會(huì)定義緩存請(qǐng)求的個(gè)數(shù)默認(rèn)5個(gè),然后請(qǐng)求超時(shí)的時(shí)間,然后創(chuàng)建一個(gè)網(wǎng)絡(luò)請(qǐng)求客戶端對(duì)象,會(huì)傳入剛剛的參數(shù)還有客戶端id,重試時(shí)間,發(fā)送緩沖區(qū)的大小128和接受緩沖區(qū)的大小32,還有acks等配置。sender繼承了Runnbale接口,然后會(huì)new個(gè)sender線程出來用上面這些參數(shù),然后返回。
  10. sender放到后臺(tái),啟動(dòng)sender線程

二、生產(chǎn)者到緩沖隊(duì)列的流程

  1. 在執(zhí)行到攔截器的時(shí)候就要調(diào)用一個(gè)onSend方法,如果有多個(gè)攔截器,每個(gè)攔截器都會(huì)走一次這個(gè)方法,這個(gè)方法就是攔截器對(duì)數(shù)據(jù)加工的
  2. 然后獲取元數(shù)據(jù),要根據(jù)主題的分區(qū)放到對(duì)應(yīng)的緩存隊(duì)列
  3. 序列化相關(guān)操作key和value的序列化和壓縮
  4. 分區(qū)操作,如果指定了分區(qū),直接分配到指定分區(qū);沒有指定就會(huì)根據(jù)分區(qū)器進(jìn)行分配,沒有指定key就會(huì)粘性分區(qū)處理(如果批次大小和活著時(shí)間到了不然就一直是那個(gè),滿足才能創(chuàng)建新隊(duì)列用),如果指定key就根據(jù)key到hashcode進(jìn)分區(qū)數(shù)取模,
  5. 保證(序列化和壓縮后)數(shù)據(jù)大小能夠傳輸,他去讀取配置的消息最大值和緩沖區(qū)大小,如果有超過的拋異常
  6. 向緩存隊(duì)列里面追加數(shù)據(jù),獲取或者創(chuàng)建一個(gè)隊(duì)列按照分區(qū),然后嘗試添加數(shù)據(jù)(一般不成功,因?yàn)檫€沒申請(qǐng)內(nèi)存),然后根據(jù)16k和現(xiàn)在壓縮后的總大小取最大值,申請(qǐng)內(nèi)存就申請(qǐng)這個(gè)大小,內(nèi)存池分配內(nèi)存,然后sender線程拿走就了會(huì)釋放內(nèi)存。
  7. 如果批次大小滿了或者有了新的批次需要?jiǎng)?chuàng)建,就喚醒sender線程把緩沖隊(duì)列的數(shù)據(jù)拉取過去。

三、Sender拉取數(shù)據(jù)到Kafka流程

  1. 事務(wù)相關(guān)操作
  2. 獲取元數(shù)據(jù)信息,為了知道發(fā)到哪個(gè)分區(qū)
  3. 判斷32m緩存是否準(zhǔn)備好,先獲取隊(duì)列的信息,先判斷內(nèi)存隊(duì)列有沒有數(shù)據(jù)
  4. 判斷l(xiāng)eader是不是空如果沒有目標(biāo)那還是會(huì)拋出異常,如果批次大小或時(shí)間滿足一個(gè)條件,就會(huì)發(fā)送。
  5. 把所有請(qǐng)求按照節(jié)點(diǎn)為單位來發(fā)送請(qǐng)求,這樣一臺(tái)機(jī)器只需要建立一次連接
  6. 封裝了個(gè)request然后通過網(wǎng)絡(luò)客戶端把數(shù)據(jù)發(fā)送過去
  7. 然后服務(wù)端還是通過網(wǎng)絡(luò)客戶端獲取結(jié)果

四、消費(fèi)者初始化

  1. 消費(fèi)者組平衡
  2. 獲取消費(fèi)者組id和客戶端id
  3. 設(shè)置請(qǐng)求服務(wù)端等待時(shí)間,默認(rèn)30秒;重試時(shí)間,默認(rèn)100毫秒
  4. 攔截器鏈相關(guān)處理
  5. key和value的反序列化
  6. 判斷offset從什么位置開始消費(fèi)
  7. 獲取消費(fèi)者元數(shù)據(jù)(重試時(shí)間、是否允許訪問系統(tǒng)主題默認(rèn)false,是否允許自動(dòng)創(chuàng)建topic主題默認(rèn)true)
  8. 連接Kafka集群
  9. 創(chuàng)建網(wǎng)絡(luò)客戶端對(duì)象(連接重試時(shí)間默認(rèn)50ms,最大重試時(shí)間1s,發(fā)送緩沖區(qū)128kb和接受緩沖區(qū)64kb大?。?/li>
  10. 指定消費(fèi)者分區(qū)分配策略
  11. 創(chuàng)建coordinator對(duì)象
  12. 設(shè)置自動(dòng)提交offset時(shí)間,默認(rèn)5s,配置抓取數(shù)據(jù)的參數(shù)(最少抓取多少最大一次抓取多少等)

五、主題訂閱原理

  1. 傳入要訂閱的主題,如果為null直接拋出異常
  2. 注冊(cè)負(fù)載均衡監(jiān)聽器,如果消費(fèi)者組中有節(jié)點(diǎn)掛了,要通知其他消費(fèi)者
  3. 按照主題自動(dòng)訂閱進(jìn)行分配

六、消費(fèi)者抓取數(shù)據(jù)原理

  1. 他首先先初始化消費(fèi)者組和隊(duì)列
  2. 然后回調(diào)消息會(huì)到緩沖隊(duì)列,然后去隊(duì)列抓取數(shù)據(jù),最多一次500條
  3. 然后抓取后攔截器開始處理數(shù)據(jù)

七、消費(fèi)者組初始化

  1. 先判斷coordinator不為null那就說明為消費(fèi)者組
  2. 如果沒有指定分區(qū)分配策略會(huì)拋出異常
  3. 判斷coordinator是否準(zhǔn)備好,他會(huì)循環(huán)創(chuàng)建查找coordinator的請(qǐng)求并發(fā)送,并獲取服務(wù)器返回到結(jié)果

他這整個(gè)消費(fèi)者組初始化就是判斷coordinator有沒有準(zhǔn)備好文章來源地址http://www.zghlxwxcb.cn/news/detail-724658.html

八、消費(fèi)者組消費(fèi)流程

  1. 他會(huì)用判斷coordinator是不是空,是的話就等待
  2. 他上來先去隊(duì)列拉取數(shù)據(jù),一般是拉取不到的
  3. 他先構(gòu)造請(qǐng)求的入?yún)ⅲㄗ钌僖淮巫ザ嗌伲疃嘧ザ嗌?,超時(shí)時(shí)間等待)然后調(diào)用send
  4. 他送后返回future,通過回調(diào)獲取數(shù)據(jù)的
  5. 他會(huì)循環(huán)遍歷數(shù)據(jù)獲取分區(qū),獲取分區(qū)的數(shù)據(jù),如果有數(shù)據(jù)就放到消息隊(duì)列里面
  6. 然后就調(diào)用從隊(duì)列拉取數(shù)據(jù)的方法拉取,然后他有大小限制最大500,他會(huì)循環(huán)一波一波拉取過去
  7. 然后放到攔截器走加工操作

九、提交offset原理

  • 同步提交:找到coordinator然后調(diào)用commitOffset進(jìn)行發(fā)送,然后不停dowhile循環(huán),調(diào)用發(fā)送提交請(qǐng)求,然后等待回調(diào)獲取結(jié)果,一直循環(huán)到成功為止。
  • 異步提交:他還是用coordinator去提交但是他不等待結(jié)果,他new了個(gè)監(jiān)聽等待結(jié)果。

到了這里,關(guān)于Kafka源碼簡(jiǎn)要分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 消息中間件之Kafka(一)

    消息中間件之Kafka(一)

    高性能的消息中間件,在大數(shù)據(jù)的業(yè)務(wù)場(chǎng)景下性能比較好,kafka本身不維護(hù)消息位點(diǎn),而是交由Consumer來維護(hù),消息可以重復(fù)消費(fèi),并且內(nèi)部使用了零拷貝技術(shù),性能比較好 Broker持久化消息時(shí)采用了MMAP的技術(shù),Consumer拉取消息時(shí)使用的sendfile技術(shù) Kafka是最初由Linkedin公司開發(fā),

    2024年01月20日
    瀏覽(53)
  • 消息中間件之Kafka(二)

    消息中間件之Kafka(二)

    1.1 為什么要對(duì)topic下數(shù)據(jù)進(jìn)行分區(qū)存儲(chǔ)? 1.commit log文件會(huì)受到所在機(jī)器的文件系統(tǒng)大小的限制,分區(qū)之后可以將不同的分區(qū)放在不同的機(jī)器上, 相當(dāng)于對(duì)數(shù)據(jù)做了分布式存儲(chǔ),理論上一個(gè)topic可以處理任意數(shù)量的數(shù)據(jù) 2.提高并行度 1.2 如何在多個(gè)partition中保證順序消費(fèi)? 方案一

    2024年01月21日
    瀏覽(29)
  • 【Java面試丨消息中間件】Kafka

    【Java面試丨消息中間件】Kafka

    1. 介紹 使用kafka在消息的收發(fā)過程都有可能會(huì)出現(xiàn)消息丟失 (1)生產(chǎn)者發(fā)送消息到broker丟失 (2)消息在broker中存儲(chǔ)丟失 (3)消費(fèi)者從broker接收消息丟失 2. 生產(chǎn)者發(fā)送消息到broker丟失 設(shè)置異步發(fā)送:同步發(fā)送會(huì)發(fā)生阻塞,一般使用異步發(fā)送方式發(fā)送消息 消息重試:由于網(wǎng)

    2024年02月11日
    瀏覽(30)
  • 消息中間件,RabbitMQ,kafka常見面試題

    RabbitMQ和Kafka都是消息隊(duì)列系統(tǒng),可以用于流處理。流處理是指對(duì)高速、連續(xù)、增量的數(shù)據(jù)進(jìn)行實(shí)時(shí)處理。 RabbitMQ 和 Kafka 的相同點(diǎn)有以下幾個(gè): 都是消息隊(duì)列系統(tǒng),可以用于流處理、異步通信、解耦等場(chǎng)景 都是開源的,有活躍的社區(qū)和豐富的文檔 都支持分布式部署,具有高

    2024年02月04日
    瀏覽(38)
  • 【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置

    【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置

    ? ? ? ? 圣誕節(jié)的到來,程序員不會(huì)收到圣誕老人的??,但可以自己滿足一下自己,所以,趁著有時(shí)間,就記錄一下這會(huì)兒擼了些什么代碼吧?。?! ????????因?yàn)闃I(yè)務(wù)原因,需要在系統(tǒng)內(nèi)新增其他的kakfa配置使用,所以今天研究的是怎么在系統(tǒng)內(nèi)整合多套kafka配置使用。

    2024年02月01日
    瀏覽(18)
  • 架構(gòu)師系列- 消息中間件(13)-kafka深入應(yīng)用

    架構(gòu)師系列- 消息中間件(13)-kafka深入應(yīng)用

    1)配置文件 ?2)啟動(dòng)信息 4.2.1 發(fā)送類型 KafkaTemplate調(diào)用send時(shí)默認(rèn)采用異步發(fā)送,如果需要同步獲取發(fā)送結(jié)果,調(diào)用get方法 詳細(xì)代碼參考:AsyncProducer.java 消費(fèi)者使用:KafkaConsumer.java 1)同步發(fā)送 通過swagger發(fā)送,控制臺(tái)可以正常打印send result swagger訪問地址:http://localhost:808

    2024年04月29日
    瀏覽(25)
  • 【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    作者簡(jiǎn)介 前言 博主之前寫過一個(gè)完整的MQ系列,包含RabbitMQ、RocketMQ、Kafka,從安裝使用到底層機(jī)制、原理。專欄地址: https://blog.csdn.net/joker_zjn/category_12142400.html?spm=1001.2014.3001.5482 本文是該系列的清單綜述,會(huì)拉通來聊一下三大MQ的特點(diǎn)和各種適合的場(chǎng)景。 目錄 1.概述 1.1.M

    2024年02月09日
    瀏覽(53)
  • ActiveMQ、RabbitMQ、Kafka、RocketMQ消息中間件技術(shù)選型

    消息中間件是分布式系統(tǒng)中重要的組件之一,用于實(shí)現(xiàn)異步通信、解耦系統(tǒng)、提高系統(tǒng)可靠性和擴(kuò)展性。在做消息中間件技術(shù)選型時(shí),需要考慮多個(gè)因素,包括可靠性、性能、可擴(kuò)展性、功能豐富性、社區(qū)支持和成本等。本文將五種流行的消息中間件技術(shù):ActiveMQ、RabbitMQ、

    2024年02月11日
    瀏覽(23)
  • SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的發(fā)送方:生產(chǎn)者 消息的接收方:消費(fèi)者 同步消息:發(fā)送方發(fā)送消息到接收方,接收方有所回應(yīng)后才能夠進(jìn)行下一次的消息發(fā)送 異步消息:不需要接收方回應(yīng)就可以進(jìn)行下一步的發(fā)送 什么是消息隊(duì)列? 當(dāng)此時(shí)有很多個(gè)用戶同時(shí)訪問服務(wù)器,需要服務(wù)器進(jìn)行操作,但此

    2024年04月27日
    瀏覽(53)
  • 消息中間件(MQ)對(duì)比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ

    前言 在構(gòu)建分布式系統(tǒng)時(shí),選擇適合的消息中間件是至關(guān)重要的決策。RabbitMQ、Kafka、ActiveMQ 和 RocketMQ 是當(dāng)前流行的消息中間件之一,它們各自具有獨(dú)特的特點(diǎn)和適用場(chǎng)景。本文將對(duì)這四種消息中間件進(jìn)行綜合比較,幫助您在項(xiàng)目中作出明智的選擇。 1. RabbitMQ 特點(diǎn): 消息模

    2024年02月20日
    瀏覽(35)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包