国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka實戰(zhàn)-消費者offset重置問題

這篇具有很好參考價值的文章主要介紹了kafka實戰(zhàn)-消費者offset重置問題。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

背景

背景:當app啟動時,會調(diào)用 “啟動上報接口” 上報啟動數(shù)據(jù),該數(shù)據(jù)包含且不限于手機型號、應用版本、app類型、啟動時間等,一站式接入平臺系統(tǒng)會記錄該數(shù)據(jù)。

生產(chǎn)者:“啟動上報接口”會根據(jù)啟動數(shù)據(jù)發(fā)送一條kafka消息,topic“xxx”

消費者:“啟動處理模塊”會監(jiān)控topic “xxx”,當發(fā)現(xiàn)消息時進行消費,將啟動數(shù)據(jù)存放至相應的數(shù)據(jù)庫中。

問題現(xiàn)象

當生產(chǎn)者和消費者的項目都啟動后,我們發(fā)現(xiàn),生產(chǎn)者在不斷的生產(chǎn)消息,消費者在不斷的進行消費,查詢數(shù)據(jù)庫中的啟動數(shù)據(jù)也確實增加了,但是當前 consumer 的 offset 卻一直是0。

監(jiān)控了十分鐘,發(fā)現(xiàn)如下現(xiàn)象:數(shù)據(jù)庫數(shù)據(jù)確實增加,但是相同的數(shù)據(jù)卻重復出現(xiàn)了N次,N隨時間增加。

查看日志發(fā)現(xiàn)如下現(xiàn)象:

2023-02-01 23:13:55,323 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[273] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Adding newly assigned partitions: CONSUMER-7
2023-02-01 23:13:55,434 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[1306] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Found no committed offset for partition CONSUMER-7
2023-02-01 23:13:55,716 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState[397] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Resetting offset for partition CONSUMER-7 to offset 0.
2023-02-01 23:13:55,717 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer[292] - app-loginfo-message-consumer: partitions assigned: [CONSUMER-7]
...						 
2023-02-01 23:19:23,425 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StartServiceImpl[200] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- enterStartupLog success:業(yè)務日志
2023-02-01 23:19:23,425 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...listener.ApiKafkaListener[58] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- tag:UPMAPI_APP_START_REPORT, message:業(yè)務日志
2023-02-01 23:19:23,425 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StatisticsServiceImpl[142] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- 業(yè)務日志
2023-02-01 23:19:23,811 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StartServiceImpl[200] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- enterStartupLog success:業(yè)務日志
2023-02-01 23:19:23,811 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...listener.ApiKafkaListener[58] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- tag:UPMAPI_APP_START_REPORT, message:業(yè)務日志
2023-02-01 23:19:23,812 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StatisticsServiceImpl[142] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- 業(yè)務日志
2023-02-01 23:19:24,108 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StartServiceImpl[200] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- enterStartupLog success:業(yè)務日志
2023-02-01 23:19:24,108 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org...apache.kafka.clients.consumer.internals.ConsumerCoordinator[1107] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Failing OffsetCommit request since the consumer is not part of an active group
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator[1013] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] LeaveGroup request failed with error: The coordinator is not aware of this member.
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer[149] - Consumer exception java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1361) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1063) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2311) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2306) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2292) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2106) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke$original$cUFzUMfk(KafkaMessageListenerContainer.java:1097) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke$original$cUFzUMfk$accessor$KCXsM7NI(KafkaMessageListenerContainer.java) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$auxiliary$YPllDASy.call(Unknown Source) at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031) ... 3 common frames omitted
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[676] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[311] - [Consumer clientId=consumer-app-loginfo-message-consumer-35, groupId=app-loginfo-message-consumer] Lost previously assigned partitions CONSUMER-7
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer[292] - app-loginfo-message-consumer: partitions lost: [CONSUMER-7]
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer[292] - app-loginfo-message-consumer: partitions revoked: [CONSUMER-7]
2023-02-01 23:19:24,110 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator[552] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] (Re-)joining group

23:13:55發(fā)現(xiàn)消息并將offset置為0,然后開始poll,進行消費。

處理到23:19:24時全部數(shù)據(jù)處理完畢,開始提交offset。此時報錯提交失敗,消費者已離開消費組,當前消費者已分配的分區(qū)丟失。

然后當前消費者重新加入消費者,offset重置為0,從頭開始消費。

分析原因

從以上日志可以看出,消費者正確消費,不存在報錯的現(xiàn)象,kafka offset手動提交代碼無誤。因此可以斷定報錯原因不是業(yè)務異常,而是由于kafka自身機制導致的提交失敗。

查看發(fā)現(xiàn)當前消費者制配置如下:

allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [ip0:9092, ip1:9092, ip2:9092]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = app-loginfo-message-consumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

通過查看配置發(fā)現(xiàn),由于啟動數(shù)據(jù)量大,消費者每次拉取的消息數(shù)量滿載,達到max.poll.records = 500,即每次拉取500條消息進行消費,全部消費完畢后進行offset提交。

然而通過上述日志可以發(fā)現(xiàn),一條消息消費的時間接近一秒,500條數(shù)據(jù)消費的總時間≈500*1s=500s,該時間遠大于max.poll.interval.ms=300000設(shè)置的最大拉取時間(300s)。

因此,當達到最大時間300s時,kafka client發(fā)現(xiàn)當前消費者仍未進行過拉取,因此認為當前消費者處于非健康狀態(tài),將其從消費者組中剔除。

所以,當消費者消費500條消息完畢后進行提交時,發(fā)現(xiàn)自己不在消費者組中,故而提交offset失敗。
提交失敗后,當前消費者重新加入消費者組,offset又從起點開始拉取。

上述過程反復進行,導致offset不斷被重置。

問題解決

通過問題分析我們得到,當前現(xiàn)象產(chǎn)生的關(guān)鍵點是:“指定時間不足以消費指定數(shù)量消息” 的問題。所以可得兩種解決方式進行解決:一種是減小max.poll.records,使得指定時間內(nèi)能夠完成全部消費;另一種是增大max.poll.interval.ms,使得當前數(shù)量的消息全部消費完畢后,還未達到時間閾值。

由于以下兩點原因,我們決定通過第一種方式,即減小max.poll.records的方式來解決該問題:

1.由于在生產(chǎn)環(huán)境,我們的消費者服務是docker部署,支持動態(tài)擴容的,且配置了concurrency=#{partitionCount}進行多線程消費,所以處理速度完全滿足業(yè)務需求,不會造成消息堆積。

2.max.poll.interval.ms相當于消費者的探活時間,默認=300000即3分鐘,如果設(shè)置過大則失去原有意義,所以一般情況下,在業(yè)務能力可以得到滿足時,不建議修改該配置。

結(jié)果:設(shè)置max.poll.records=100,問題解決。

附-常見的消費者配置描述和調(diào)優(yōu)方案

1. max.poll.records

作用:意味消費者一次poll()操作,能夠獲取的最大數(shù)據(jù)量。

注意:調(diào)整這個值能提升吞吐量,于此同時也需要同步提升max.poll.interval.ms的參數(shù)大小。設(shè)置之前,應該考慮并計算所有數(shù)據(jù)處理完畢的時間,務必要小于max.poll.interval.ms的參數(shù)大小。

2. fetch.max.bytes

作用:意味server端可返回給consumer的最大數(shù)據(jù)大小

注意:增加可以提升吞吐量,但是在客戶端和服務端網(wǎng)絡延遲比較大的環(huán)境下,建議可以減小該值,防止業(yè)務處理數(shù)據(jù)超時。

3. heartbeat.interval.ms

作用:消費超時時間,consumer與kafka之間的超時時間

注意:該參數(shù)不能超過session.timeout.ms,通常設(shè)置為session.timeout.ms的三分之一,默認值:3000。

4. max.partition.fetch.bytes

作用:限制每個consumer發(fā)起fetch請求時候,讀到數(shù)據(jù)(record)的限制

注意:設(shè)置過大,consumer本地緩存的數(shù)據(jù)就會越多,可能影響內(nèi)存的使用,默認值:1048576。

5. fetch.max.bytes

作用:server端可返回給consumer的最大數(shù)據(jù)大小

注意:數(shù)值可大于max.partition.fetch.bytes,一般設(shè)置為默認值即可,默認值:52428800

6. session.timeout.ms

作用:使用consumer組管理offset時,consumer與broker之間的心跳超時時間

注意:如果consumer消費數(shù)據(jù)的頻率非常低,建議增大這個參數(shù)值,默認值:10000。

7. auto.offset.reset

作用:消費過程中無法找到數(shù)據(jù)消費到的offset位置,所選擇的消費策略

注意:earliest:從頭開始消費,可能會消費到重復數(shù)據(jù),latest:從數(shù)據(jù)末尾開始消費,可能會丟失數(shù)據(jù)。默認值:earlist。

8. max.poll.interval.ms

作用:消費者在每一輪poll() (拉取數(shù)據(jù)之間的最大時間延遲),默認值:300s

注意:如果此超時時間期滿之前poll()沒有被再次調(diào)用,則消費者被視為失敗,并且分組將觸發(fā)rebalance,以便將分區(qū)重新分配給別的成員

如果,再兩次poll之間需要添加過多復雜的,耗時的邏輯,需要延長這個時間;當然,如果當前消費的消息消費結(jié)果可以忽略,即失敗也無所謂,比如某些統(tǒng)計次數(shù)的場景等,數(shù)量有少數(shù)差異不影響業(yè)務使用,那么可以設(shè)置為異步處理,此時就不會消耗大量時間。

9. max.poll.records

作用:消費者一次poll()操作,能夠獲取的最大數(shù)據(jù)量

注意:增加這個參數(shù)值,會增加一次性拉取數(shù)據(jù)的數(shù)據(jù)量,確保拉取數(shù)據(jù)的時間,至少在max.poll.interval.ms規(guī)定的范圍之內(nèi),默認值:500

更多優(yōu)化內(nèi)容可查看 5種kafka消費端性能優(yōu)化方法文章來源地址http://www.zghlxwxcb.cn/news/detail-409983.html

到了這里,關(guān)于kafka實戰(zhàn)-消費者offset重置問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • Kafka3.0.0版本——消費者(自動提交 offset)

    Kafka3.0.0版本——消費者(自動提交 offset)

    官網(wǎng)文檔 參數(shù)解釋 參數(shù) 描述 enable.auto.commi 默認值為 true,消費者會自動周期性地向服務器提交偏移量。 auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,默認 5s。 圖解分析 消費者自動提交 offset代碼 消費者自動提交

    2024年02月09日
    瀏覽(27)
  • Kafka - 主題Topic與消費者消息Offset日志記錄機制

    Kafka - 主題Topic與消費者消息Offset日志記錄機制

    可以根據(jù)業(yè)務類型,分發(fā)到不同的Topic中,對于每一個Topic,下面可以有多個分區(qū)(Partition)日志文件: kafka 下的Topic的多個分區(qū),每一個分區(qū)實質(zhì)上就是一個隊列,將接收到的消息暫時存儲到隊列中,根據(jù)配置以及消息消費情況來對隊列消息刪除。 可以這么來理解Topic,Partitio

    2024年02月03日
    瀏覽(22)
  • Kafka-Java四:Spring配置Kafka消費者提交Offset的策略

    Kafka消費者提交Offset的策略有 自動提交Offset: 消費者將消息拉取下來以后未被消費者消費前,直接自動提交offset。 自動提交可能丟失數(shù)據(jù),比如消息在被消費者消費前已經(jīng)提交了offset,有可能消息拉取下來以后,消費者掛了 手動提交Offset 消費者在消費消息時/后,再提交o

    2024年02月08日
    瀏覽(22)
  • Kafka篇——Kafka消費者端常見配置,涵蓋自動手動提交offset、poll消息細節(jié)、健康狀態(tài)檢查、新消費組消費offset規(guī)則以及指定分區(qū)等技術(shù)點配置,全面無死角,一篇文章拿下!

    Kafka篇——Kafka消費者端常見配置,涵蓋自動手動提交offset、poll消息細節(jié)、健康狀態(tài)檢查、新消費組消費offset規(guī)則以及指定分區(qū)等技術(shù)點配置,全面無死角,一篇文章拿下!

    一、自動提交offset 1、概念 Kafka中默認是自動提交offset。消費者在poll到消息后默認情況下,會自動向Broker的_consumer_offsets主題提交當前 主題-分區(qū)消費的偏移量 2、自動提交offset和手動提交offset流程圖 3、在Java中實現(xiàn)配置 4、自動提交offset問題 自動提交會丟消息。因為如果消費

    2024年01月22日
    瀏覽(22)
  • golang kafka Shopify/sarama 消費者重置新增分區(qū)偏移量并進行重新消費

    golang kafka Shopify/sarama 消費者重置新增分區(qū)偏移量并進行重新消費

    當我們使用kafka的時候存在這樣一個場景: 有一個消費組正在正常消費中并且消息偏移量策略為lastoffset(最新偏移量),這個時候在kafka服務器中為當前主題下新增了一個分區(qū),各個生產(chǎn)者紛紛將消息投遞到了這個新增分區(qū)中。當然我們知道針對于這種場景消費者方可以觸發(fā)

    2024年02月09日
    瀏覽(18)
  • Kafka及Kafka消費者的消費問題及線程問題

    Topic:是 Kafka 消息發(fā)布和訂閱的基本單元,同時也是消息的容器。Topic 中的消息被分割成多個分區(qū)進行存儲和處理。 Partition:是 Topic 分區(qū),將 Topic 細分成多個分區(qū),每個分區(qū)可以獨立地存儲在不同的 Broker 中,從而增加了消息的并發(fā)性、可擴展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    瀏覽(29)
  • 【項目實戰(zhàn)】Java 開發(fā) Kafka 消費者

    【項目實戰(zhàn)】Java 開發(fā) Kafka 消費者

    ?? 博主介紹 : 博主從事應用安全和大數(shù)據(jù)領(lǐng)域,有8年研發(fā)經(jīng)驗,5年面試官經(jīng)驗,Java技術(shù)專家,WEB架構(gòu)師,阿里云專家博主,華為云云享專家,51CTO TOP紅人 Java知識圖譜點擊鏈接: 體系化學習Java(Java面試專題) ???? 感興趣的同學可以收藏關(guān)注下 , 不然下次找不到喲

    2024年02月16日
    瀏覽(31)
  • kafka消費者報錯Offset commit ......it is likely that the consumer was kicked out of the group的解決

    2022年10月份接到一個小功能,對接kafka將數(shù)據(jù)寫到數(shù)據(jù)庫,開始的需求就是無腦批量insert,隨著時間的推移,業(yè)務需求有變更,kafka的生產(chǎn)消息頻次越來越高,到今年7月份為止就每秒會有幾十條甚至上百條,然后消費消息的代碼就報錯: Caused by: org.apache.kafka.clients.consumer.Com

    2024年02月07日
    瀏覽(22)
  • 解決Kafka新消費者組導致重復消費的問題

    ???????? 問題描述 :在使用Kafka時,當我們向新的消費者組中添加消費者時,可能會遇到重復消費的問題。本文將介紹一些解決這個問題的方法,幫助開發(fā)者更好地處理Kafka中的消費者組和消費偏移量。 ????????Kafka是一個強大的分布式消息隊列系統(tǒng),但在使用過程中

    2024年02月07日
    瀏覽(18)
  • kafka消費者詳解,根據(jù)實際生產(chǎn)解決問題

    1.首先kafka每創(chuàng)建一個消費者就是一個消費者組,必須指定groupip 2.兩個消費者組之間不相互影響,消費同一個主題的同一個分區(qū),兩個消費者組不相互影響,各自記錄自己的offset 3.在開發(fā)中如果沒有指定每個消費者去消費特定的分區(qū),那么kafka默認是按照roundRobin輪詢的方式分

    2024年02月10日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包