一、主題
1.1、配置主題
- 在應(yīng)用程序上下文定義一個 KafkaAdmin Bean, 它可以自動將主題添加到代理。通過這個Bean可以將
每一個新建的主題 Topic 添加到應(yīng)用程序上下文中。下面是一個簡單的示例:
也可以創(chuàng)建 TopicBuilder 類,使用它創(chuàng)建 Bean 更加簡單。
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
使用 Spring Boot 時,KafkaAdminbean 會自動注冊
默認(rèn)情況下,代理不可用時會記錄一條消息,然后上下文會繼續(xù)加載。可以以編程方式調(diào)用Admin的initialize()方法以稍后重試。
也可將 admin 的fatalIfBrokerNotAvailable屬性設(shè)置為true。然后上下文無法初始化。
1.2、在運(yùn)行時檢查和創(chuàng)建主題
目前有兩種方法來進(jìn)行操作:
- createOrModifyTopics
- describeTopics
或者使用 AdminClient 來直接使用:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
二、消息發(fā)送
2.1、使用 KafkaTemplate
2.1.1、KafkaTemplate 介紹
KafkaTemplate 包裝了生產(chǎn)者并提供了將數(shù)據(jù)發(fā)送到 Kafka 主題的便捷方法。
2.1.2、配置 KafkaTemplate
要使用模板,需要配置生產(chǎn)者工廠并在模板的構(gòu)造函數(shù)中提供。
- 單個生產(chǎn)者配置
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
-
多個生產(chǎn)者配置
使用來自同一工廠的不同生產(chǎn)者配置創(chuàng)建模板,需要覆蓋工廠的ProducerConfig屬性。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
然后調(diào)用 KafkaTemplate 的方法來使用它。
- 異步消息發(fā)布示例
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new KafkaSendCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(KafkaProducerException ex) {
handleFailure(data, record, ex);
}
});
}
- 阻塞發(fā)布示例
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
ExecutionException 在于 KafkaProducerException 屬性 failedProducerRecord 中
2.1.3、發(fā)布結(jié)果查看
- 異步
發(fā)布成功還是失敗可以向偵聽器注冊回調(diào)以異步接收發(fā)送結(jié)果:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
}
});
或者使用 lambda:文章來源:http://www.zghlxwxcb.cn/news/detail-701683.html
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
- 同步
阻塞發(fā)送線程等待結(jié)果需要調(diào)用 future 的 get()方法,可以使用帶超時的 get() 方法。文章來源地址http://www.zghlxwxcb.cn/news/detail-701683.html
到了這里,關(guān)于主題配置和 KafkaTemplate 的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!