1 生產(chǎn)者
生產(chǎn)邏輯
- 配置生產(chǎn)者客戶端參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實例。
- 構(gòu)建待發(fā)送的消息。
- 發(fā)送消息
- 關(guān)閉實列
參數(shù)說明
- bootstrap.servers :用來指定生產(chǎn)者客戶端鏈接Kafka集群搜需要的broker地址清單,具體格式 host1:port1,host2:port2,可以設(shè)置一個或多個地址中間,號分割,參數(shù)默認(rèn) 空串。這里要注意并不需要配置所有的broker地址,應(yīng)為生產(chǎn)者會在broker中找到其他的broker地址,但是建議配置兩個以上,當(dāng)其中一個broker宕機時還可以通過另外一個工作。
- key.serializer和value.serializer:broker端接受的消息必須以字節(jié)數(shù)組的形式存在。
- client.id : 默認(rèn) “” 用來設(shè)置KafkaProducer對應(yīng)的客戶端id
- max.block.ms:默認(rèn)值 60000 用來控制KafkaProducer 中send()方法和partitionsFor()方法的阻塞時間
- partitioner.class:用來指定分區(qū)器
- enable.idempotence:默認(rèn)值 false 是否開啟冪等性
- interceptor.classes 用來設(shè)置生產(chǎn)者攔截器
- max.in.flight.requests.per.connection:5 限制每個連接最多緩存的請求數(shù)
- metadata.max.age.ms: 300000 5分鐘 如果在這個時間內(nèi)元數(shù)據(jù)沒有更新的話就強制更新。
- transactional.id:null 設(shè)置事務(wù)id 必須唯一
- batch.size 16384(16KB): 生產(chǎn)者客戶端中用于緩存消息的緩沖區(qū)大小。
序列化器(Serializer)
生產(chǎn)者發(fā)送消息到kafka是需要將對象序列化城流才能訪問到kafka,消費者需要把流反序列化 才能進行 消費。
分區(qū)器
消息在通過send()方法發(fā)送到broker的過程中,有可能需要經(jīng)過攔截器、序列化器和分區(qū)器(partitioner)的一系列作用之后才能被真正的發(fā)往broker。攔截器一般不是必須的,而序列化器時必須的必須的。消息經(jīng)過序列化之后就需要確定它發(fā)送的分區(qū),如果消息ProducerRecord中指定了partition字段,那么就不需要分區(qū)器的作用,因為partition代表的就是所要發(fā)往的分區(qū)。
分區(qū)器時通過kay來計算partition的值,分區(qū)器的作用就是為消息分配分區(qū)。
kafka的默認(rèn)分區(qū)器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
生產(chǎn)者攔截器(Interceptor)
生產(chǎn)者攔截器主要用來在消息發(fā)送前做一些準(zhǔn)備工作,如按照規(guī)則過濾不符合條件的消息,修改消息等,也可以用來做一些定制化的需求,kafkaProducer在將消息序列化和計算分區(qū)之前會調(diào)用攔截器的onSend()方法來對消息進行相應(yīng)的定制化文章來源:http://www.zghlxwxcb.cn/news/detail-400804.html
原理分析
文章來源地址http://www.zghlxwxcb.cn/news/detail-400804.html
- 主線程中由KafkaPartition創(chuàng)建消息
- 通過攔截器
- 通過序列化器
- 通過分區(qū)器
- 到達消息累加器(RecordAccumulator)主要是用來收集消息方便 Sender可以批量發(fā)送
到了這里,關(guān)于深入Kafka核心設(shè)計與實踐原理讀書筆記第二章的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!