一、首先下載windows版本的Kafka
官網(wǎng):Apache Kafka
二、啟動(dòng)Kafka
cmd進(jìn)入到kafka安裝目錄:
1:cmd啟動(dòng)zookeeer
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
2:cmd啟動(dòng)kafka server
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
3:使用cmd窗口啟動(dòng)一個(gè)生產(chǎn)者命令:
.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic Topic1
4:cmd啟動(dòng)zookeeer
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 -topic Topic1
?三、引入kafka依賴(lài)
<!--kafka依賴(lài)-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
四、配置文件
server:
port: 8080
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: localhost:9092
producer: # producer 生產(chǎn)者
retries: 0 # 重試次數(shù)
acks: 1 # 應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: com.itheima.demo.config.MySerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer: # consumer消費(fèi)者
group-id: javagroup # 默認(rèn)的消費(fèi)組ID
enable-auto-commit: true # 是否自動(dòng)提交offset
auto-commit-interval: 100 # 提交offset延時(shí)(接收到消息后多久提交offset)
# earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
# latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
# none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: com.itheima.demo.config.MyDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
?五、編寫(xiě)生產(chǎn)者發(fā)送消息
1:異步發(fā)送
@RestController
@Api(tags = "異步接口")
@RequestMapping("/kafka")
public class KafkaProducer {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/test/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
Message message = new Message();
message.setMessage(msg);
kafkaTemplate.send("Topic3", JSON.toJSONString(message));
}
}
1:同步發(fā)送
//測(cè)試同步發(fā)送與監(jiān)聽(tīng)
@RestController
@Api(tags = "同步接口")
@RequestMapping("/kafka")
public class AsyncProducer {
private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
//同步發(fā)送
@GetMapping("/kafka/sync/{msg}")
public void sync(@PathVariable("msg") String msg) throws Exception {
Message message = new Message();
message.setMessage(msg);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("Topic3", JSON.toJSONString(message));
//注意,可以設(shè)置等待時(shí)間,超出后,不再等候結(jié)果
SendResult<String, Object> result = future.get(3, TimeUnit.SECONDS);
logger.info("send result:{}",result.getProducerRecord().value());
}
}
六、消費(fèi)者編寫(xiě)
@Component
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
//不指定group,默認(rèn)取yml里配置的
@KafkaListener(topics = {"Topic3"})
public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
Optional<?> optional = Optional.ofNullable(consumerRecord.value());
if (optional.isPresent()) {
Object msg = optional.get();
logger.info("message:{}", msg);
}
}
}
?通過(guò)swagger,進(jìn)行生產(chǎn)者發(fā)送消息,觀察控制臺(tái)結(jié)果
?至此,一個(gè)簡(jiǎn)單的整合就完成了。
后續(xù)會(huì)持續(xù)更新kafka相關(guān)內(nèi)容(多多關(guān)注哦?。?mark hidden color="red">文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-809695.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-809695.html
到了這里,關(guān)于SpringBoot整合Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!