Java輕松使用Kafka生產(chǎn)者,消費(fèi)者
一、環(huán)境說明
- 項(xiàng)目中需要下面的依賴:(版本自定義)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2</version>
</dependency>
2.yml配置文件設(shè)置
kafka:
bootstrap-servers: ip:端口
jaas:
enabled: false
listener:
type: single
concurrency: 3
consumer:
# key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
group-id: group-id數(shù)據(jù)
auto-offset-reset: latest
enable-auto-commit: false
max-poll-records: 100
# kafka topic
task:
execute:
topic: topic名稱
二、生產(chǎn)者
1.簡單生產(chǎn)者的書寫:
@Component
public class SendKafkaUtil {
@Autowired
KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${spring.task.execute.topic}")
private String topic;
public void sendMessageKafka() {
kafkaTemplate.send(topic, "{json數(shù)據(jù)}");
}
}
三、消費(fèi)者
Consumer 消費(fèi)數(shù)據(jù)時(shí)的可靠性是很容易保證的,因?yàn)閿?shù)據(jù)在 Kafka 中是持久化的,故不用擔(dān)心數(shù)據(jù)丟失問題。由于 consumer 在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
1.簡單消費(fèi)者的書寫:
@KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}")
public void processMessage(List<ConsumerRecord<String, String>> records){
logger.info("kafka消費(fèi)消息數(shù)量:" + records.size());
}
2.消費(fèi)者批量消費(fèi)的書寫(批量控制:max-poll-records: 100)
?文章來源:http://www.zghlxwxcb.cn/news/detail-612885.html
@Bean
public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
factory.setBatchListener(true); // 開啟批量監(jiān)聽
return factory;
}
}
/**
* 消費(fèi)者1
* 批處理統(tǒng)一方法
*
* @param records
*/
@KafkaListener(topics = {"${spring.task.execute.topic}"},containerFactory = "batchFactory",topicPartitions = {
@TopicPartition(partitions = {"0"}, topic = "${spring.task.execute.topic}") })
public void consumer1(List<ConsumerRecord<String, Object>> records) throws IOException {
log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id1 records size " + records.size());
// todo數(shù)據(jù)邏輯處理
}
/**
* 消費(fèi)者2
* 批處理統(tǒng)一方法
*
* @param records
*/
@KafkaListener(topics = {"${spring.task.execute.topic}"}, containerFactory = "batchFactory",topicPartitions = {
@TopicPartition(partitions = {"1"}, topic = "${spring.task.execute.topic}") })
public void consumer2(List<ConsumerRecord<String, Object>> records) throws IOException {
log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id2 records size " + records.size());
// todo數(shù)據(jù)邏輯處理
}
注:多消費(fèi)者時(shí),需要對應(yīng)kafka中配置的分區(qū);多少的Partition就有多少個(gè)消費(fèi)者,以免資源浪費(fèi)文章來源地址http://www.zghlxwxcb.cn/news/detail-612885.html
到了這里,關(guān)于Java輕松使用Kafka生產(chǎn)者,消費(fèi)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!