1.首先kafka每創(chuàng)建一個消費者就是一個消費者組,必須指定groupip
2.兩個消費者組之間不相互影響,消費同一個主題的同一個分區(qū),兩個消費者組不相互影響,各自記錄自己的offset
3.在開發(fā)中如果沒有指定每個消費者去消費特定的分區(qū),那么kafka默認(rèn)是按照roundRobin輪詢的方式分配消費者消費分區(qū)的,如果指定了消費者消費特定的分區(qū),那么,就會按照指定的分區(qū)消費,那么具體如何指定特定分區(qū)消費呢?看代碼
// 0 配置
Properties properties = new Properties();
// 連接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 組id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 1 創(chuàng)建一個消費者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 訂閱主題對應(yīng)的分區(qū)
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
// 3 消費數(shù)據(jù)
while (true){
//Duration.ofSeconds(1) 就是等待一秒鐘的意思,如果等待1秒后仍然沒有消息,則返回空
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
在topicPartitions 中加入多個對象,每個對象指定該消費者消費的主題和對應(yīng)的分區(qū),再通過assign方法加入消費者配置中
那自定義消費哪個分區(qū)有什么用呢?
那作用大了,假如我們未來有一個消費者組,只有一臺的能力比較強,處理快,那么就可以指定它來消費kafka多的那個分區(qū),而且這樣我們可以自由的指定每個消費者消費哪個分區(qū),有更強的拓展性
消費者可能出現(xiàn)重復(fù)消費和漏消費的情況,如何解決?
這個根據(jù)實際情況來定,這個問題主要出現(xiàn)在kafka的某個消費者節(jié)點宕機后,可能就會出現(xiàn)這樣的問題,那么如何完全解決呢?就是使用事務(wù)消費?但是如果項目中可以接受部分消息丟失,就沒必要使用了,因為會造成挺大的性能損耗的,上代碼
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 禁用自動提交
props.put("isolation.level", "read_committed"); // 設(shè)置隔離級別為讀已提交
props.put("transactional.id", "my-transactional-id"); // 設(shè)置事務(wù)ID
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 設(shè)置拉取的超時時間
seataTransactionManager.begin(); // 開啟 Seata 分布式事務(wù)
try {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
// 在這里處理消息的邏輯
// ...
}
seataTransactionManager.commit(); // 提交事務(wù)
consumer.commitSync(); // 手動提交偏移量
} catch (Exception e) {
seataTransactionManager.rollback(); // 回滾事務(wù)
consumer.seekToBeginning(records.partitions()); // 將消費者偏移量重置到消息的起始位置,以重新消費
throw new RuntimeException("Failed to consume messages", e);
}
}
} finally {
consumer.close();
}
那么在生產(chǎn)中消費著具體如何編寫呢?
在生產(chǎn)環(huán)境中使用 Kafka 的 poll 模式來消費數(shù)據(jù),可以按照以下步驟進行配置和實現(xiàn):
1. 添加 Kafka 依賴:在項目的 `pom.xml` 文件中添加 Kafka 相關(guān)的依賴,例如:
<dependency>
? ? <groupId>org.springframework.kafka</groupId>
? ? <artifactId>spring-kafka</artifactId>
</dependency>
2. 配置 Kafka 連接信息:在 `application.properties` 或 `application.yml` 中配置 Kafka 的連接信息,包括 bootstrap servers(Kafka 服務(wù)器地址)、group id(消費者組ID)等配置項。
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
3. 創(chuàng)建 Kafka 消費者配置類:創(chuàng)建一個 Kafka 消費者配置類,用于配置 KafkaConsumer 的屬性??梢愿鶕?jù)實際需求進行自定義配置。
?
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
? ? @Bean
? ? public Map<String, Object> consumerConfigs() {
? ? ? ? Map<String, Object> props = new HashMap<>();
? ? ? ? props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
? ? ? ? props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
? ? ? ? props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? ? props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? ? // 其他可選配置項
? ? ? ? return props;
? ? }
}
4. 創(chuàng)建消息處理器:創(chuàng)建一個消息處理器,用于處理從 Kafka 主題中消費的消息。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;
@Component
public class MyMessageHandler {
? ? public void handleMessage(ConsumerRecord<String, String> record) {
? ? ? ? // 處理接收到的消息
? ? ? ? String key = record.key();
? ? ? ? String value = record.value();
? ? ? ? // ... 進行業(yè)務(wù)處理
? ? }
}
5. 創(chuàng)建 Kafka 消費者:創(chuàng)建一個 Kafka 消費者,并使用 KafkaConsumerConfig 中定義的配置。
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
@Component
public class MyKafkaConsumer {
? ? private final KafkaConsumer<String, String> kafkaConsumer;
? ? private final MyMessageHandler messageHandler;
? ? @Autowired
? ? public MyKafkaConsumer(Map<String, Object> consumerConfigs, MyMessageHandler messageHandler) {
? ? ? ? this.kafkaConsumer = new KafkaConsumer<>(consumerConfigs);
? ? ? ? this.messageHandler = messageHandler;
? ? }
? ? @PostConstruct
? ? public void startConsuming() {
? ? ? ? kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
? ? ? ? new Thread(() -> {
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
? ? ? ? ? ? ? ? records.forEach(record -> messageHandler.handleMessage(record));
? ? ? ? ? ? }
? ? ? ? }).start();
? ? }
? ? // 可以根據(jù)需要添加關(guān)閉消費者的方法
}
在上述示例中,使用 `KafkaConsumerConfig` 類定義
了 KafkaConsumer 的配置,并通過構(gòu)造函數(shù)注入到 `MyKafkaConsumer` 類中。在 `startConsuming()` 方法中,創(chuàng)建了一個新線程來進行消費,不斷地使用 `poll()` 方法輪詢獲取消息,并通過 `MyMessageHandler` 處理消息。
請注意,在實際生產(chǎn)環(huán)境中,你可能還需要添加更多的配置和處理,例如異常處理、消費者的關(guān)閉和資源釋放、消息提交偏移量的控制等。以上示例僅提供了一個基本的框架,你可以根據(jù)實際需求進行擴展和調(diào)整。文章來源:http://www.zghlxwxcb.cn/news/detail-498576.html
好,消費者就介紹到這里,后邊我們介紹在生產(chǎn)中如何選擇硬件以及kafka每個組件具體的優(yōu)化方案,以及如何配置文章來源地址http://www.zghlxwxcb.cn/news/detail-498576.html
到了這里,關(guān)于kafka消費者詳解,根據(jù)實際生產(chǎn)解決問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!