生產(chǎn)者客戶端代碼
public?class?SzzTestSend?{
????public?static?final?String?bootStrap?=?"xxxxxx:9090";
????public?static?final?String?topic?=?"t_3_1";
????public?static?void?main(String[]?args)?{
????????Properties?properties?=?new?Properties();
????????properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);
????????//?序列化協(xié)議??下面兩種寫法都可以
????????properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,?StringSerializer.class.getName());
????????properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
????????//過濾器?可配置多個(gè)用逗號(hào)隔開
????????properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");
????????//構(gòu)造?KafkaProducer
????????KafkaProducer?producer?=?new?KafkaProducer(properties);
????????//??發(fā)送消息,?并設(shè)置?回調(diào)(回調(diào)函數(shù)也可以不要)
????????ProducerRecord<String,String>?record?=?new?ProducerRecord(topic,"Hello?World!");
????????try?{
????????????producer.send(record,new?SzzTestCallBack(record.topic(),?record.key(),?record.value()));
????????}catch?(Exception?e){
????????????e.printStackTrace();
????????}
????}
????/**
?????*?發(fā)送成功回調(diào)類
?????*/
????public?static?class?SzzTestCallBack?implements?Callback{
????????private?static?final?Logger?log?=?LoggerFactory.getLogger(SzzTestCallBack.class);
????????private?String?topic;
????????private?String?key;
????????private?String?value;
????????public?SzzTestCallBack(String?topic,?String?key,?String?value)?{
????????????this.topic?=?topic;
????????????this.key?=?key;
????????????this.value?=?value;
????????}
????????public?void?onCompletion(RecordMetadata?metadata,?Exception?e)?{
????????????if?(e?!=?null)?{
????????????????log.error("Error?when?sending?message?to?topic?{}?with?key:?{},?value:?{}?with?error:",
????????????????????????topic,?key,value,?e);
????????????}else?{
????????????????log.info("send?message?to?topic?{}?with?key:?{}?value:{}?success,?partiton:{}?offset:{}",
????????????????????????topic,?key,value,metadata.partition(),metadata.offset());
????????????}
????????}
????}
}
1 構(gòu)造 KafkaProducer
KafkaProducer 通過解析producer.propeties
文件里面的屬性來(lái)構(gòu)造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、RecordAccumulator消息累加器 、元信息更新器、啟動(dòng)發(fā)送請(qǐng)求的后臺(tái)線程
????????//構(gòu)造?KafkaProducer
????????KafkaProducer?producer?=?new?KafkaProducer(properties);
生產(chǎn)者元信息更新器
我們之前有講過. 客戶端都會(huì)保存集群的元信息,例如生產(chǎn)者的元信息是 ProducerMetadata. 消費(fèi)組的是 ConsumerMetadata 。
?
相關(guān)的 Producer 配置有:
雖然 Producer 元信息會(huì)自動(dòng)更新, 但是有可能在生產(chǎn)者發(fā)送消息的時(shí)候,發(fā)現(xiàn)某個(gè) TopicPartition 不存在,這個(gè)時(shí)候可能就需要立刻發(fā)起一個(gè)元信息更新了。
集群資源變更監(jiān)聽器
org.apache.kafka.common.ClusterResourceListener
在構(gòu)造 KafkaConsumer 的時(shí)候, 還會(huì)構(gòu)造一個(gè) 集群資源變更監(jiān)聽器 ClusterResourceListener
當(dāng)用戶希望收到有關(guān)集群元數(shù)據(jù)更改的通知時(shí),可以實(shí)現(xiàn)回調(diào)接口。
需要在攔截器、指標(biāo)采樣器、序列化器和反序列化器 中訪問集群元數(shù)據(jù)的用戶可以實(shí)現(xiàn)此接口。
public?interface?ClusterResourceListener?{
????/**
?????*?用戶可以實(shí)現(xiàn)以獲取?ClusterResource?更新的回調(diào)方法。
?????*?@param?clusterResource?cluster?metadata
?????*/
????void?onUpdate(ClusterResource?clusterResource);
}
下面描述了每種類型的方法調(diào)用順序。
Clients
在每個(gè)元數(shù)據(jù)響應(yīng)之后都會(huì)調(diào)用一次 onUpdate(ClusterResource)
當(dāng)在org.apache.kafka.clients.producer.ProducerInterceptor
實(shí)現(xiàn)的 ClusterResourceListener 的時(shí)候
調(diào)用順序?yàn)?/strong>: ProducerInterceptor.onSend() -> onUpdate(ClusterResource) -> ProducerInterceptor.onAcknowledgement()
當(dāng)在org.apache.kafka.clients.consumer.ConsumerInterceptor
實(shí)現(xiàn)的 ClusterResourceListener 的時(shí)候文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-407479.html
調(diào)用順序?yàn)?/strong>:onUpdate() - > ConsumerInterce文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-407479.html
到了這里,關(guān)于多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!