前言
本章節(jié)主要講述Kafka3.1X版本在Windows11主機(jī)下部署以及JAVA對Kafka應(yīng)用:
一、Kafka3.1X版本在Windows11主機(jī)部署
1.安裝JDK配置環(huán)境變量
2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目錄位置:D:\setup\apache-zookeeper-3.7.1
3.安裝Kafka3.1X
3.1 下載包(kafka_2.12-3.1.2.tgz)
Kafka
3.2、 解壓并進(jìn)入Kafka目錄:
根目錄:D:\setup\kafka3.1.2
3、 編輯config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 為根目錄下的\logs
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs
4.運(yùn)行Zookeeper
Zookeeper安裝目錄D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右鍵,選擇“打開命令窗口”選項(xiàng),打開命令行
.\zkServer.cmd;
5.運(yùn)行Kafka
Kafka安裝目錄D:\setup\kafka3.1.2,按下Shift+右鍵,選擇“打開命令窗口”選項(xiàng),打開命令行
.\bin\windows\kafka-server-start.bat .\config\server.properties
二、Kafk生產(chǎn)Topic主題數(shù)據(jù)
1.kafka生產(chǎn)數(shù)據(jù)
創(chuàng)建Topic主題heima
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.
查看Topic主題heima
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic heima
Topic主題heima生產(chǎn)數(shù)據(jù)
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima
在 > 符號(hào)后輸入數(shù)據(jù):
{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}
2.JAVA kafka客戶端消費(fèi)數(shù)據(jù)
2.1 pom.xml文件配置kafka客戶端-kafka-clients-2.0.1版本
<!-- kafka客戶端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
</dependency>
2.2 JAVA數(shù)據(jù)讀取文件
package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka服務(wù)器操作與數(shù)據(jù)讀取
*/
public class KafkaUtilDemo {
public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);
public static final Properties props = new Properties();
// protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);
public static void init(String kafakservers) {
// 配置Kafka消費(fèi)者屬性
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
/**
* 持續(xù)監(jiān)聽并處理kafa消息,當(dāng)手機(jī)號(hào)mobilePhone非空時(shí)進(jìn)入數(shù)據(jù)同步操作
* @param kafaktopic
* @return
*/
public static String poll(String kafaktopic) {
String msg = "";
try {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(kafaktopic));
log.info("Kafka消費(fèi)者訂閱指定主題,持續(xù)監(jiān)聽并處理消息");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));
for (ConsumerRecord<String, String> record : records) {
log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());
msg = record.value();
if (!StringUtils.isBlank(record.value())) {
JSONObject jsonObject = JSONObject.parseObject(record.value());
String mobilePhone = jsonObject.getString("mobilePhone");
if (StringUtils.isBlank(mobilePhone)) {
log.error("Kafka消費(fèi)者手機(jī)號(hào)mobilePhone為空");
} else {
KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();
kafkaUtil.syncSystemInfoTask(jsonObject);
}
}
}
}
} catch (Exception e) {
log.error("Kafka消費(fèi)者訂閱指定主題,持續(xù)監(jiān)聽并處理消息 error msg=" + e.getMessage());
}
return msg;
}
public boolean syncSystemInfoTask(JSONObject jsonObject) {
boolean repsBln = true;
try {
String mobilePhone = jsonObject.getString("mobilePhone");
String roleType = jsonObject.getString("roleType");
String roleCode = jsonObject.getString("roleCode");
log.info("業(yè)務(wù)數(shù)據(jù)同步操作................");
} catch (Exception e) {
repsBln = false;
log.error("Kafka消費(fèi)者同步入庫異常,error msg=" + e.getMessage());
}
return repsBln;
}
public static void main(String[] args) {
try {
String kafakservers = "localhost:9092";
String kafaktopic = "heima";
init(kafakservers);
poll(kafaktopic);
} catch (Exception e) {
log.error("error msg=" + e.getMessage());
}
}
}
3 執(zhí)行KafkaUtilDemo 文件,查看消費(fèi)數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-701679.html
總結(jié)
pom.xml文件在引入spring-kafka 會(huì)由于版本問題出現(xiàn)
org.apache.kafka
kafka-clients
2.0.1
文章來源地址http://www.zghlxwxcb.cn/news/detail-701679.html
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
到了這里,關(guān)于Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!