一,新建Spring Boot
最近忙著搞低代碼開發(fā),好久沒新建spring項目了,結(jié)果今天心血來潮準備建個springboot項目
注意Type選Maven,java選8,其他默認
1,Maven配置
點下一步后完成就新建了一個spring boot項目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉庫,不然可能依賴下載不下來
2,無法識別為SpringBoot項目
在maven配置沒問題的前提下,IDEA無法識別這是一個Spring Boot項目,倒騰半天,終于發(fā)現(xiàn)問題原因所在=======>是Maven版本太高的原因
把.mvn/wrapper目錄下的maven-wrapper.properties文件第一行的版本號降低,比如說降為3.5.4,然后重新點下Maven的同步按鈕
3,無效的源發(fā)行版
接下來運行項目報錯:java: 無效的源發(fā)行版: 14
修改pom.xml中java.version值為8,原來是17
<properties>
<java.version>17</java.version>
</properties>
4,無法訪問SpringApplication
繼續(xù)運行,繼續(xù)報錯
降低spring-boot-starter-parent版本,原來是3.1.3,改為2.7.2
5,運行直接Finish
繼續(xù)運行,沒報錯,服務(wù)直接Finished
需要添加web依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
6,服務(wù)運行成功
終于,一個空的spring boot項目成功跑起來了,喜極而泣
二,安裝啟動Kafka
1,下載
官網(wǎng)=>https://kafka.apache.org/downloads,下載最新版的kafka,目前是3.5.1
2,配置
解壓到D盤Config目錄下即完成安裝,目錄為D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties
broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka
(2) zookeeper.properties
dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper
3,啟動
先啟動zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
再啟動kafka
bin\windows\kafka-server-start.bat config\server.properties
停止的時候,先停止kafka,再停止zookeeper,直接ctrl+c停止
4,其他命令
1,查看topic列表
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
2,查看topic具體信息
bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test
3,創(chuàng)建topic
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
三,生產(chǎn)消費消息
1,加入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2,yam配置文件
application.yaml
spring:
profiles:
active: dev
application-dev.yaml
server:
port: 8082
servlet:
context-path: /test-kafka
spring:
cache:
type: ehcache
config: classpath:ehcache.xml
jpa:
database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: kafka-demo-kafka-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10
3,報錯enabled mechanisms are []
Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
這個錯誤我本地測試下來是因為沒把賬號密碼配置這塊注釋掉
4,生產(chǎn)者生產(chǎn)消息
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public String sendMessage(String content) {
String topic = "test_topic";
kafkaTemplate.send(topic, content).addCallback(success -> {
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
log.info("發(fā)送成功:主題:{},分區(qū):{},偏移量:{}",topic,partition,offset);
}, failure -> {
log.info("發(fā)送失敗:{}",failure.getMessage());
});
return "發(fā)送成功";
}
}
5,訂閱和消費消息
一,訂閱主題
1,獲取消費者
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.Properties;
/**
* kafka消費者配置
* @author liuxunming
*/
@Configuration
@Component
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
public KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}
2,訂閱topic
KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();
consumer.subscribe(Collections.singleton("traffic"));
3,拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}
4,消費位移,釋放資源
// 提交消費位移
consumer.commitSync();
// 關(guān)閉消費者以釋放資源
consumer.close();
二,點對點模式文章來源:http://www.zghlxwxcb.cn/news/detail-682415.html
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test_topic"})
public void handlerMsg(String content) {
log.info("接收到消息:消息值:{} ",content);
}
}
6,接口
@Slf4j
@RestController
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@PostMapping("/sendMessage")
public String sendMessage(@RequestParam String content) {
kafkaProducer.sendMessage(content);
return "ok";
}
}
7,測試結(jié)果
接收到消息文章來源地址http://www.zghlxwxcb.cn/news/detail-682415.html
四,參考博文
- 解決IDEA無法識別SpringBoot項目
- SpringBoot從入門到精通(十二)SpringBoot集成Kafka
- Kafka的下載安裝以及使用
- Kafka消息消費流程詳解
- Kafka之Consumer使用與基本原理
到了這里,關(guān)于使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!