問題描述
我在項目里把原來用著的 獨立消費(fèi)者 consumer-group-id 同時當(dāng)做消費(fèi)者組來消費(fèi)分區(qū)信息,導(dǎo)致協(xié)調(diào)器找不到這個 consumer-group-id
2022-12-14 16:33:31.908 ERROR 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Offset commit failed on partition REBALANCE-ONE-TOPIC-1 at offset 13: The coordinator is not aware of this member.
2022-12-14 16:33:31.908 INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 16:33:31.908 INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.908 INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.909 ERROR 16020 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
... 4 common frames omitted
報錯復(fù)現(xiàn)方式
注冊兩個測試 topic
@Bean
public NewTopic testone() {
return TopicBuilder.name("test-topic-group").partitions(2).replicas(1).build();
}
@Bean
public NewTopic testtwo() {
return TopicBuilder.name("test-topic-standalone").partitions(1).replicas(1).build();
}
寫一個消費(fèi)者組
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TestErrorConsumerListener {
/**
* 消費(fèi)者組
*/
@KafkaListener(groupId = "test-group", topicPartitions = {
@TopicPartition(topic = "test-topic-group", partitions = "0", partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void one(String value) {
log.info("one:接收kafka消息:[{}]", value);
}
/**
* 消費(fèi)者組
*/
@KafkaListener(groupId = "test-group", topicPartitions = {
@TopicPartition(topic = "test-topic-group", partitions = "1", partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void two(String value) {
log.info("two:接收kafka消息:[{}]", value);
}
/**
* 獨立消費(fèi)者
*/
@KafkaListener(topics = "test-topic-standalone", groupId = "test-group")
public void three(String value) {
log.info("three:接收kafka消息:[{}]", value);
}
}
啟動項目,發(fā)送消息
this.template.send("test-topic-group", message);
this.template.send("test-topic-standalone", message);
報錯日志文章來源:http://www.zghlxwxcb.cn/news/detail-543640.html
2022-12-14 19:04:18.835 INFO 13416 --- [tainer#13-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener : one:接收kafka消息:[111111111]
2022-12-14 19:04:18.841 INFO 13416 --- [tainer#14-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener : three:接收kafka消息:[111111111]
2022-12-14 19:04:19.336 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Discovered group coordinator 192.168.136.136:9092 (id: 2147482646 rack: null)
2022-12-14 19:04:19.338 ERROR 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Offset commit failed on partition test-topic-group-0 at offset 1: The coordinator is not aware of this member.
2022-12-14 19:04:19.338 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 19:04:19.338 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.338 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.342 ERROR 13416 --- [tainer#13-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
... 4 common frames omitted
查看kafka服務(wù)器的消費(fèi)者狀態(tài)文章來源地址http://www.zghlxwxcb.cn/news/detail-543640.html
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-topic-standalone 0 1 1 0 consumer-test-group-16-d1a0c068-a68d-456c-a958-97b44219ef1b /192.168.136.1 consumer-test-group-16
到了這里,關(guān)于kafka-報錯-The coordinator is not aware of this member的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!