項(xiàng)目場(chǎng)景:
互聯(lián)網(wǎng)項(xiàng)目中經(jīng)常用到MQ,由于本地項(xiàng)目開發(fā)連接測(cè)試環(huán)境kafka很不方便,所有在本機(jī)搭建一個(gè)kafka,方便開發(fā)測(cè)試。
前置準(zhǔn)備
提示:Kafka的運(yùn)行依賴于Zookeeper,所以在運(yùn)行Kafka之前我們需要安裝并運(yùn)行Zookeeper
下載Zookeeper地址:https://zookeeper.apache.org/releases.html
下載kafka地址:http://kafka.apache.org/downloads.html
配置Zookeeper
1.將下載好的文件解壓到本地,如圖:
復(fù)制zoo_sample.cfg文件,并將新復(fù)制的文件命名為zoo.cfg,修改文件zoo.cfg內(nèi)容如下:
dataDir=F:\mq\apache-zookeeper-3.6.3\dataDir
dataLogDir=F:\mq\apache-zookeeper-3.6.3\dataLogDir
2.配置Window環(huán)境變量
3.啟動(dòng)Zookeeper
進(jìn)入Zookeeper安裝目錄,cmd 輸入命令zkserver,如圖
?啟動(dòng)成功??!
配置kafka
1.解壓下載文件到本地
進(jìn)入F:\mq\kafka_2.13-2.8.0\config文件內(nèi),修改文件server.properties
log.dirs=F:\mq\kafka_2.13-2.8.0\logs
2.啟動(dòng)kafka服務(wù)
在安裝目錄cmd輸入命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
?無報(bào)錯(cuò)則正常啟動(dòng),本地啟動(dòng)窗口不要關(guān)閉。
3.創(chuàng)建topic名稱為syn_user的命令:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syn_user
4.查看創(chuàng)建的topic
.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
本地kafka環(huán)境測(cè)試:
啟動(dòng)生產(chǎn)者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic syn_user
啟動(dòng)消防者監(jiān)聽消息文章來源:http://www.zghlxwxcb.cn/news/detail-428772.html
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic syn_user
文章來源地址http://www.zghlxwxcb.cn/news/detail-428772.html
springboot 集成:
1.引入pom依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
enable-auto-commit: false
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: test-consumer-group
listener:
ack-mode: MANUAL
3.創(chuàng)建消息生產(chǎn)者
@RestController
@Api(value = "mq消息", tags = "Fh-mq消息")
@RequestMapping("/wkafka")
public class ProducerController {
private static final String KAFKA_TOPIC_NAME = "wlhydemo";
@Autowired
KafkaTemplate<String, String> kafka;
@PostMapping("/send")
public String register(@RequestBody User user) {
try {
String message = JSONUtil.toJsonStr(user);
System.out.println("注冊(cè)用戶信息:" + message);
kafka.send(KAFKA_TOPIC_NAME, message);
return "OK";
} catch (Exception e) {
e.printStackTrace();
}
return "消息同步失敗";
}
}
4.監(jiān)聽topic消息類
@Slf4j
@Component
public class KaUserConsumer {
@KafkaListener(topics = "wlhydemo")
public void listenFlowStart(@Payload String businessStr,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset )
{
try{
// 模擬業(yè)務(wù)處理...
log.info("當(dāng)前消費(fèi)分區(qū):{}", partition);
log.info("當(dāng)前消費(fèi)位置:{}", offset);
log.info("接收到的消息:{}", businessStr);
User user= JSONUtil.toBean(businessStr, User.class);
user.getNickName();
} catch (Exception e) {
e.printStackTrace();
}
}
}
到了這里,關(guān)于Window下搭建kafka運(yùn)行環(huán)境的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!