一、概述
Spring for Apache Kafka 的高級概述以及底層概念和可運行的示例代碼。
二、準備工作
注意:進行工作開始之前至少要有一個 Apache Kafka 環(huán)境
2.1、依賴
- 使用 Spring Boot
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
使用 Spring Boot 時,省略版本,Boot 將自動引入與您的 Boot 版本兼容的正確版本
- 使用 Spring
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.14</version>
</dependency>
使用Spring 時必須要申明使用的版本。
2.2、版本兼容性
-
Apache Kafka 客戶端 2.7.0 或 2.8.0
-
Spring 框架 5.3.x 或 Spring Boot 2.7.x
-
最低 Java 版本:8
在 Spring Boot 應(yīng)用程序中使用 Apache Kafka 時, Apache Kafka 依賴項版本由 Spring Boot 的依賴項管理確定。
想要使用不同于Spring Boot版本的 Apache Kafka時需要覆蓋所有關(guān)聯(lián)的依賴項。
尤其在使用嵌入式 Kafka 代理時特別要注意。
2.3、依賴覆蓋
在 Spring Boot 應(yīng)用程序中使用 Apache Kafka 時, Apache Kafka 依賴項版本由 Spring Boot 的依賴項管理確定。
如果要使用kafka-clients或的不同版本kafka-streams(例如 2.x), 則需要覆蓋所有關(guān)聯(lián)的依賴項。
尤其是在 spring-kafka-test 中使用嵌入式 Kafka 代理時。
并非所有的 Spring Boot都會向下兼容
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.14</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.7.14</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>{kafka-version}</version>
</dependency>
<!-- 可選 僅在使用 kafka 流時需要 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>{kafka-version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>{kafka-version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>{kafka-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>{kafka-version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
三、Spring Boot消費者
3.1、應(yīng)用程序
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
3.2、配置項
spring.kafka.consumer.auto-offset-reset=earliest //從提交的offset開始消費;無提交的offset時,從頭開始消費
四、Srping Boot生產(chǎn)者
4.1、應(yīng)用程序
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
五、不使用 Spring Boot
在不使用 Spring Boot 時必須定義幾個基礎(chǔ)的Bean。
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<Integer, String>(producerFactory);
}
}
在 Spring 上下文之外創(chuàng)建偵聽器容器,則必須滿足容器實現(xiàn)的所有接口,否則有些功能會出現(xiàn)異常工作。文章來源:http://www.zghlxwxcb.cn/news/detail-701680.html
不包括 Spring Boot的完整示例如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-701680.html
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<Integer, String>(producerFactory);
}
}
到了這里,關(guān)于Spring for Apache Kafka概述和簡單入門的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!