kafka是一個(gè)常用的分布式消息中間件,與RabbitMQ對(duì)比,特點(diǎn)是可以無(wú)限橫向擴(kuò)容,并保持高可靠性、高吞吐量和低延遲,因此比RabbitMQ有更高的市場(chǎng)占有率(網(wǎng)上搜了一下,kafka大約41%,RabbitMQ大約29%)。
一、kafka常見(jiàn)概念
一般正常的開(kāi)發(fā),了解到前6個(gè)概念就好,其余的概念更多用于kafka運(yùn)維配置,或問(wèn)題排查。
1、producer生產(chǎn)者
指生產(chǎn)消息,并把消息投遞到kafka的外部應(yīng)用程序 ,它不是kafka的組成部分
2、consumer消費(fèi)者
指連接到kafka,接收/訂閱消息,并進(jìn)行后續(xù)邏輯處理的外部應(yīng)用程序,它也不是kafka的組成部分。
一個(gè)消費(fèi)者可以同時(shí)消費(fèi)kafka的多個(gè)隊(duì)列(主題)
3、Consumer Group消費(fèi)者組
連接到kafka的消費(fèi)者,必須指定消費(fèi)者組,多個(gè)消費(fèi)者可以指定相同的消費(fèi)者組,這樣可以避免同一個(gè)消息被重復(fù)消費(fèi)。
如果2個(gè)消費(fèi)者綁定了同一個(gè)隊(duì)列(主題),指定了不同的消費(fèi)者組,則每條消息,都會(huì)同時(shí)投遞給這2個(gè)消費(fèi)者。
4、topic 主題
kafka里,收發(fā)消息的邏輯集合,每個(gè)主題,都可以認(rèn)為是一個(gè)隊(duì)列;
生產(chǎn)者和消費(fèi)者,都是通過(guò)連接主題 來(lái)處理消息。
5、partition分區(qū)
kafka里存儲(chǔ)消息的物理集合,一個(gè)主題可以劃分為1個(gè)或多個(gè)分區(qū),可以理解為子隊(duì)列;
每個(gè)分區(qū)只屬于一個(gè)主題,且只能被一個(gè)消費(fèi)者消費(fèi)(同一個(gè)組)。
該主題收到的所有消息,會(huì)根據(jù)消息的key選擇對(duì)應(yīng)的分區(qū)進(jìn)行投遞;
如果消息未指定key,且沒(méi)有定義分區(qū)規(guī)則時(shí),則kafka會(huì)隨機(jī)平均投遞到主題的多個(gè)分區(qū)里。
注意:每個(gè)分區(qū)里的消息,一定是按隊(duì)列的規(guī)則,保證先進(jìn)先出;但是不同的分區(qū)的消息,則無(wú)法保證。
因此,如果要確保消費(fèi)者能按消息的投遞順序進(jìn)行消費(fèi):
- 每個(gè)主題只建一個(gè)分區(qū)(這個(gè)不推薦)
- 同一批需要保證順序的消息,指定相同的key,比如使用用戶ID作為消息的key,相同key的消息會(huì)投遞到同一個(gè)分區(qū)
6、offset偏移量
指每個(gè)分區(qū)里的消息的唯一編號(hào),并且是從0開(kāi)始遞增的。主題+分區(qū)+偏移量,可以唯一定位一條消息。
注:每個(gè)分區(qū)里的每條消息,offset一定是不同的;不同分區(qū)的offset是會(huì)重復(fù)的。
消費(fèi)者會(huì)也記錄每次消費(fèi)的offset值,來(lái)標(biāo)識(shí)自己當(dāng)前處理到哪一條消息了,以便斷開(kāi)重連時(shí),繼續(xù)消費(fèi),消費(fèi)者的offset也是存儲(chǔ)在kafka中。
7、broker集群節(jié)點(diǎn)
kafka集群里的某個(gè)節(jié)點(diǎn),通常是一臺(tái)服務(wù)器上的kafka實(shí)例。
8、replica副本
主題的每個(gè)分區(qū),可以指定多個(gè)副本,每個(gè)副本都存儲(chǔ)一份完全相同的消息數(shù)據(jù)。
一般建議同一個(gè)分區(qū)的不同副本,要保存在不同的broker上,避免broker故障導(dǎo)致該分區(qū)數(shù)據(jù)丟失。
9、leader/follower
主題的每個(gè)分區(qū),如果有多個(gè)副本,那么其中一個(gè)副本會(huì)作為leader對(duì)外提供讀寫(xiě)服務(wù),其余的作為follower只同步數(shù)據(jù)。
如果leader出現(xiàn)故障時(shí),會(huì)從follower中選舉一個(gè)副本作為leader重新提供服務(wù)。
10、ISR(in-sync replicas)
分區(qū)的同步副本集合,每個(gè)分區(qū)都會(huì)維護(hù)一個(gè)ISR列表,內(nèi)容是那些與leader保持同步的follower清單。
如果某個(gè)follower跟不上同步的進(jìn)度,或無(wú)法保持同步時(shí),會(huì)從ISR列表中移除。
只有在ISR列表里的follower,才有機(jī)會(huì)提升為leader.
注:已經(jīng)成為leader的副本,也在ISR中
11、LEO日志末端偏移量
LEO指Log End Offset,即當(dāng)前分區(qū)中下一條待寫(xiě)入消息的偏移量,該條消息是未指向具體的消息。
分區(qū)的每個(gè)副本,都有自己的LEO。
12、HW高水位線
HW指High Watermark Offset,即當(dāng)前分區(qū)中已經(jīng)被提交并復(fù)制到所有副本的最高消息偏移量(offset),
leader接收到消息,但是還未同步完時(shí),不會(huì)更新HW值。
leader會(huì)比對(duì)自己和所有follower的LEO,用其中的較小值,來(lái)更新HW值。
13、LAG滯后消息數(shù)
一個(gè)消費(fèi)者組,在消費(fèi) 主題的每個(gè)分區(qū)時(shí),每個(gè)分區(qū)都會(huì)計(jì)算一個(gè)LAG值,指該分區(qū)的消息總數(shù)與消費(fèi)者已消費(fèi)的消息數(shù)的差值。
通常是 該分區(qū)的HW 減 消費(fèi)者組的offset。
實(shí)踐中,運(yùn)維人員應(yīng)當(dāng)對(duì)LAG進(jìn)行監(jiān)控,比如超出10000時(shí)進(jìn)行告警和處理。
理解了這些概念,網(wǎng)上找了一張kafka工作原理圖:
二、kafka與RabbitMQ對(duì)比
相對(duì)于RabbitMQ,Kafka有如下特點(diǎn):
- kafka的消息消費(fèi)完并不會(huì)立即刪除,而是保存一定時(shí)間后才刪除,默認(rèn)是7天。而RabbitMQ是消費(fèi)完就刪除。
kafka不刪消息這點(diǎn)我很喜歡,尤其是排查問(wèn)題需要數(shù)據(jù)恢復(fù)時(shí)。 - kafka消費(fèi)者只支持pull模式,不支持push模式,即消費(fèi)者只能主動(dòng)輪詢kafka獲取消息,默認(rèn)是每500ms拉一次,每次最多拉500條數(shù)據(jù),輪詢的優(yōu)點(diǎn)就是靈活,缺點(diǎn)就是沒(méi)消息時(shí)空耗性能。
RabbitMQ默認(rèn)只支持push模式,主動(dòng)推送消息給消費(fèi)者,實(shí)時(shí)性更好。 - kafka通過(guò)topic主題連接生產(chǎn)者和消費(fèi)者,多個(gè)消費(fèi)者連接到同一個(gè)topic,即可消費(fèi)該主題的所有消息,如果有不需要的消息,也只能由消費(fèi)者自行判斷和拋棄處理。
RabbitMQ則是通過(guò)Exchange接收消息,再通過(guò)指定的規(guī)則轉(zhuǎn)發(fā)到具體的Queue,由消費(fèi)者消費(fèi),可以參考我以前寫(xiě)的文章:https://youbl.blog.csdn.net/article/details/80401945
RabbitMQ可以通過(guò)配置很多路由,避免消息投遞給不必要的消費(fèi)者,
不過(guò)RabbitMQ也支持直接通過(guò)Queue接收和投遞消息。 - 性能上,RabbitMQ是單線程模型,大數(shù)據(jù)上會(huì)有瓶頸;而kafka可以幾乎無(wú)限擴(kuò)展。
- 有序性,kafka對(duì)于主題的每個(gè)分區(qū),因?yàn)橛星抑荒苡幸粋€(gè)消費(fèi)者,所以能保證消息的有序性,不同分區(qū)則無(wú)法保證;
而RabbitMQ在多消費(fèi)者時(shí),會(huì)平均分配消息,無(wú)法保證有序,并且在消息消費(fèi)失敗重新投遞時(shí),也會(huì)破壞消息順序。
三、kafka最佳實(shí)踐
1、生產(chǎn)者配置
-
生產(chǎn)者發(fā)送時(shí),有個(gè)acks配置,說(shuō)明如下:
- 為0,生產(chǎn)者發(fā)消息后,不等borker響應(yīng)就返回成功,性能最高,丟數(shù)據(jù)概率也最高;
- 為1,生產(chǎn)者發(fā)消息后,leader節(jié)點(diǎn)返回成功,就算成功;但是leader未同步給其它副本前就掛了,也會(huì)丟數(shù)據(jù);
- 為all 或 -1,則必須等所有副本都同步成功,才返回成功,保證數(shù)據(jù)不丟失,但性能最低。
生產(chǎn)環(huán)境,建議配置為-1,另外2個(gè)配置都有丟數(shù)據(jù)的可能性。
-
min.insync.replicas 最小副本數(shù)要求,默認(rèn)值1,建議為2(當(dāng)然要求每個(gè)topic副本數(shù)都在3以上)
因?yàn)槿绻渲脼?,當(dāng)leader收到數(shù)據(jù),還未同步就故障了,會(huì)丟失數(shù)據(jù)。 -
retries: 重試次數(shù),設(shè)置為較大值,默認(rèn)值為
Integer.MAX_VALUE
,確保發(fā)送成功。
注:雖然重試次數(shù)默認(rèn)很大,但是重試還受到另一個(gè)時(shí)間配置的影響:delivery.timeout.ms
(默認(rèn)2分鐘),retries還沒(méi)用完,這個(gè)超時(shí)時(shí)間就到了,也會(huì)中斷發(fā)送。
另外,如果設(shè)置了較大的retries,請(qǐng)使用異步發(fā)消息的方式,避免同步操作導(dǎo)致線程堵塞,影響用戶體驗(yàn),或其它業(yè)務(wù)問(wèn)題。 -
配置參考:
spring:
kafka:
producer:
bootstrap-servers: 10.1.1.1:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 10000
properties:
delivery.timeout.ms: 2000 # 發(fā)送消息上報(bào)成功或失敗的最大時(shí)間,默認(rèn)120000,兩分鐘
linger.ms: 0 # 生產(chǎn)者把數(shù)據(jù)組合到一個(gè)批處理進(jìn)行請(qǐng)求的最大延遲時(shí)間,默認(rèn)0
# 參考 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
request.timeout.ms: 1000 # 批處理就緒后到響應(yīng)的等待時(shí)長(zhǎng),含網(wǎng)絡(luò)+服務(wù)器復(fù)制時(shí)間
batch.size: 1000
2、消費(fèi)者配置
- 為了避免消息丟失,消費(fèi)者需要開(kāi)啟手動(dòng)ack,消息業(yè)務(wù)邏輯處理完成再提交偏移量
- 參考后面的死循環(huán)問(wèn)題,建議使用String反序列化器
- 根據(jù)業(yè)務(wù)情況,配置合適的批量拉取數(shù)量
max-poll-records
,默認(rèn)值500 - 根據(jù)業(yè)務(wù)情況,配置合適的
auto-offset-reset
值,默認(rèn)值latest- latest:消費(fèi)者在消費(fèi)主題的某個(gè)分區(qū)時(shí),如果沒(méi)有之前的消費(fèi)記錄(以前提交的偏移量),則只拉取最新消息,忽略歷史消息。
- earliest:與latest相反,沒(méi)有之前的消費(fèi)記錄時(shí),從最早的消息開(kāi)始處理。
- none:沒(méi)有之前的消費(fèi)記錄時(shí),拋出異常。
- 配置參考:
spring:
kafka:
consumer:
bootstrap-servers: 10.1.1.1:9092
max-poll-records: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
listener:
type: batch
ack-mode: manual_immediate
3、其它
- 配置多個(gè) bootstrap server url,避免單節(jié)點(diǎn)故障,導(dǎo)致連接失敗
- 如果是異步發(fā)消息,不要在kafkaTemplate的成功回調(diào)和失敗回調(diào)方法里有太多業(yè)務(wù)邏輯,回調(diào)方法是單線程處理,里面的業(yè)務(wù)邏輯會(huì)占用
delivery.timeout.ms
的超時(shí)時(shí)間配置,可能導(dǎo)致后續(xù)消息發(fā)送超時(shí)。 - 同理,消費(fèi)者也是單線程處理,消費(fèi)邏輯太重的話,可能導(dǎo)致
session.timeout.ms
超時(shí),從而被認(rèn)為消費(fèi)者離線,導(dǎo)致問(wèn)題
四、kafka工具介紹
1、圖形化工具
推薦 OffsetExplorer
,下載地址:https://www.kafkatool.com/download.html
2、命令行工具
kafka安裝包內(nèi)置了很多腳本工具,可以方便的查詢kafka的狀態(tài),這些工具只需要下載就可以使用,無(wú)需安裝。
- 下載地址: https://kafka.apache.org/downloads
下載后解壓,在bin目錄下有很多sh文件,這些是在linux上使用的;
如果在Windows下使用,要用 bin\windows\ 下的那些bat文件。
下面用windows的bat文件命令舉例(linux改用對(duì)應(yīng)的sh文件執(zhí)行即可) - 使用說(shuō)明,請(qǐng)參考官方文檔:https://kafka.apache.org/documentation/
查詢某個(gè)消費(fèi)者組下有哪些消費(fèi)者,以及這些消費(fèi)者對(duì)主題的消費(fèi)狀態(tài):
d:\kafka_2.13-3.4.0\bin\windows\kafka-consumer-groups.bat --describe --group=cb_consumers --bootstrap-server=10.0.0.1:9092
字段說(shuō)明:
- GROUP 消費(fèi)者分組
- TOPIC 消費(fèi)的主題
- PARTITION 消費(fèi)的分區(qū)
- CURRENT-OFFSET 當(dāng)前消費(fèi)到的消息偏移量
- LOG-END-OFFSET 當(dāng)前分區(qū)的最大消息偏移量
- LAG 滯后消息條數(shù)
- CONSUMER-ID 消費(fèi)者的ID
- HOST 消費(fèi)者所在的主機(jī)
- CLIENT-ID 客戶端ID
注:LAG可以簡(jiǎn)單理解為LOG-END-OFFSET 減 CURRENT-OFFSET
,但是實(shí)際上LAG=HW 減 CURRENT-OFFSET
四、springboot項(xiàng)目使用
1、生產(chǎn)者Demo代碼:
1.1、添加pom依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1.2、添加application.yml配置:
spring:
kafka:
producer:
bootstrap-servers: 10.1.1.1:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 2 # 失敗重發(fā)次數(shù)
1.3、Java發(fā)送代碼:
private final KafkaTemplate kafkaTemplate; // 注入的Bean
// 同步發(fā)送消息
String topic = "beinetTest111";
Object result = kafkaTemplate.send(topic, "我是key", objData).get();
2、消費(fèi)者Demo代碼:
2.1、添加pom依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.2、添加application.yml配置:
spring:
kafka:
consumer:
bootstrap-servers: 10.1.1.1:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
type: batch
ack-mode: manual_immediate
2.3、Java發(fā)送代碼:
@KafkaListener(topics = "${kafka-topic.reports}")
public void consumerCreateTask(List<ConsumerRecord<String, Object>> consumerRecordList, Acknowledgment ack) {
if (consumerRecordList == null || consumerRecordList.size() <= 0)
return;
long start = System.nanoTime();
ConsumerRecord lastRecord = consumerRecordList.get(0);
try {
// 轉(zhuǎn)換dto,并進(jìn)行業(yè)務(wù)邏輯處理
long elapsedTime = System.nanoTime() - start;
log.debug("Topic:{} 分區(qū):{} 偏移:{} 條數(shù):{} 耗時(shí):{}ns",
lastRecord.topic(),
lastRecord.partition(),
lastRecord.offset(),
dtos.size(),
elapsedTime);
} catch (Exception exp) {
long elapsedTime = System.nanoTime() - start;
log.error("Topic:{} 分區(qū):{} 偏移:{} 耗時(shí):{}ns 出錯(cuò):",
lastRecord.topic(),
lastRecord.partition(),
lastRecord.offset(),
elapsedTime,
exp);
} finally {
// 不論成敗,都提交,避免出錯(cuò)導(dǎo)致死循環(huán),避免丟消息的邏輯,可以在catch里備份
ack.acknowledge();
}
}
五、kafka常見(jiàn)問(wèn)題
1、有多個(gè)消費(fèi)者,但是總會(huì)有一個(gè)消費(fèi)者拿不到消息數(shù)據(jù)
對(duì)一個(gè)消費(fèi)者組而言,一個(gè)主題有幾個(gè)分區(qū),就最多接受幾個(gè)消費(fèi)者;
比如主題有2個(gè)分區(qū),那么每個(gè)分區(qū)只能分配給組里的一個(gè)消費(fèi)者,最多只有2個(gè)消費(fèi)者連接上來(lái),如果組里有3個(gè)消費(fèi)者,那么肯定會(huì)有一個(gè)消費(fèi)者處于空閑狀態(tài),沒(méi)活干。
如果主題有2個(gè)分區(qū),但是組里只有一個(gè)消費(fèi)者,那么2個(gè)分區(qū)的消息,都會(huì)投遞給這一個(gè)消費(fèi)者。
2、主題的分區(qū)分配策略是怎么樣的?
當(dāng)主題存在多個(gè)分區(qū)和多個(gè)消費(fèi)者時(shí),kafka的源碼實(shí)現(xiàn)里有如下幾種分區(qū)分配策略:
- Range策略(默認(rèn)策略):
把當(dāng)前消費(fèi)者組消費(fèi)的每個(gè)主題的所有分區(qū),逐個(gè)分配給消費(fèi)者,注意是每個(gè)主題單獨(dú)處理,所以會(huì)出現(xiàn)不均衡的情況。
例如:a主題有3個(gè)分區(qū)a0/a1/a2,b主題3個(gè)分區(qū)b0/b1/b2,有2個(gè)消費(fèi)者C0/C1,分配過(guò)程大致是:
第1步分配a主題: a0->C0, a1->C1, a2->C0
第2步分配b主題: b0->C0, b1->C1, b2->C0
這樣可以看出,【消費(fèi)者C0要維護(hù)4個(gè)分區(qū)的數(shù)據(jù),而C1只要維護(hù)2個(gè)分區(qū)的數(shù)據(jù)】,出現(xiàn)了明顯的不均衡問(wèn)題。 - Round-Robin策略:
把所有的分區(qū),排序后,輪詢方式逐一分配給所有的消費(fèi)者,
例如:a主題有3個(gè)分區(qū)a0/a1/a2,b主題3個(gè)分區(qū)b0/b1/b2,有2個(gè)消費(fèi)者C0/C1,分配過(guò)程大致是:
第1步分配a主題: a0->C0, a1->C1, a2->C0
第2步分配b主題: b0->C1, b1->C0, b2->C1
注意:第2步不是從頭開(kāi)始,而是接著第1步,繼續(xù)分配,所以排除了Range方案的不均衡問(wèn)題,
最終的分配結(jié)果是【2個(gè)消費(fèi)者,各自負(fù)責(zé)3個(gè)分區(qū)】。
但是,如果2個(gè)消費(fèi)者消費(fèi)的主題,只有部分交集,并不完全相同時(shí),還是會(huì)出現(xiàn)不均衡的情況。
如果希望改用這種策略,目前暫時(shí)沒(méi)有配置方法,要在代碼里修改partition.assignment.strategy
屬性,參考代碼:
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
private final ConcurrentKafkaListenerContainerFactory<String, Object> kafkaFactory;
@Bean("myKafkaFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
kafkaFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
return kafkaFactory;
}
}
然后在消費(fèi)者代碼上指定使用這個(gè)工廠Bean:
@KafkaListener(id = "beinetHandler1", groupId = "beinetGroup", topicPattern = "beinetTest.*",
containerFactory = "myKafkaFactory")
public void msgHandler(List<ConsumerRecord> message, Acknowledgment ack) {
- 在kafka的源碼里還有2種實(shí)現(xiàn),本文暫不深入介紹:
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
org.apache.kafka.clients.consumer.StickyAssignor
3、消費(fèi)者加入或退出時(shí),消息還能正常消費(fèi)嗎?
結(jié)論:只要有存活的消費(fèi)者存在,那么所有的消息都能正常消費(fèi)。
當(dāng)有新的消費(fèi)者加入組,或組中有消費(fèi)者下線/退出,都會(huì)觸發(fā)消費(fèi)者重新平衡的動(dòng)作,就是重新為所有的消費(fèi)者分配分區(qū)。
重平衡發(fā)生時(shí),默認(rèn)停止所有消費(fèi)者工作,直到分配結(jié)束。
4、有兄弟說(shuō)已經(jīng)往kafka寫(xiě)入消息了,但是消費(fèi)者那邊沒(méi)有數(shù)據(jù)入庫(kù)
- 首先,確認(rèn)kafka有消息,用上面的圖形化工具offset explorer,去對(duì)應(yīng)的topic主題查找數(shù)據(jù),發(fā)現(xiàn)確實(shí)有數(shù)據(jù)
- 再在工具里,查看Consumers下的對(duì)應(yīng)Group,發(fā)現(xiàn)Lag為0,說(shuō)明消息已經(jīng)被正常消費(fèi)了
- 查看消費(fèi)者的應(yīng)用日志,沒(méi)有消費(fèi)日志產(chǎn)生
- 再繼續(xù)排查消費(fèi)者的應(yīng)用日志,發(fā)現(xiàn)有如下日志:
cb_consumers: partitions assigned: []
這表示,該消費(fèi)者沒(méi)有分配到分區(qū),不在工作中。
初步判斷,是不是有人在其它地方啟動(dòng)了消費(fèi)者,把數(shù)據(jù)給消費(fèi)掉了。 - offset explorer這個(gè)工具,不能展示消費(fèi)者IP信息,只能使用上面的命令
kafka-consumer-groups.bat
查看消費(fèi)者IP,
再找運(yùn)維看看這個(gè)IP是誰(shuí)的。 - 最后定位到是測(cè)試環(huán)境配置錯(cuò)誤,把開(kāi)發(fā)環(huán)境的數(shù)據(jù)消費(fèi)掉了。
5、反序列化失敗,導(dǎo)致消費(fèi)者死循環(huán)問(wèn)題
某天發(fā)布到測(cè)試環(huán)境后,發(fā)現(xiàn)程序啟動(dòng)后,一直拋如下異常,且持續(xù)幾十分鐘也不會(huì)中斷:
<#6d8d6458> j.l.IllegalStateException: No type information in headers and no default type provided
at o.s.util.Assert.state(Assert.java:76)
at o.s.k.s.s.JsonDeserializer.deserialize(JsonDeserializer.java:535)
at o.a.k.c.c.i.Fetcher.parseRecord(Fetcher.java:1387)
at o.a.k.c.c.i.Fetcher.access$3400(Fetcher.java:133)
at o.a.k.c.c.i.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618)
at o.a.k.c.c.i.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454)
at o.a.k.c.c.i.Fetcher.fetchRecords(Fetcher.java:687)
at o.a.k.c.c.i.Fetcher.fetchedRecords(Fetcher.java:638)
at o.a.k.c.c.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1233)
at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1206)
at j.i.r.GeneratedMethodAccessor109.invoke(Unknown Source)
at j.i.r.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at j.l.reflect.Method.invoke(Unknown Source)
at o.s.a.s.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at o.s.a.f.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
at c.s.proxy.$Proxy186.poll(Unknown Source)
at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1413)
at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1250)
at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1162)
at j.u.c.Executors$RunnableAdapter.call(Unknown Source)
at j.u.c.FutureTask.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
放google搜索了一下,說(shuō)是反序列化找不到類型信息導(dǎo)致的,并建議不要使用JSON反序列化。
查了一下配置變化記錄,確實(shí)加了一個(gè)kafka的反序列化配置變更:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
把spring.consumer.value-deserializer 改成:org.apache.kafka.common.serialization.StringDeserializer
就恢復(fù)了。
了解了一下,同事希望在消費(fèi)者的方法參數(shù)上,直接使用對(duì)象,而不是使用String,所以做了這個(gè)修改。
而正好出錯(cuò)的這個(gè)消費(fèi)者,消費(fèi)的是其它項(xiàng)目生產(chǎn)的消息,里面并不包含類型信息。
而這個(gè)異常,是在spring的底層拋出的,業(yè)務(wù)代碼上無(wú)法進(jìn)行try 捕捉,代碼同時(shí)又設(shè)置了手工提交ack,導(dǎo)致代碼進(jìn)入了死循環(huán)。
為了避免這種問(wèn)題,還是建議使用StringDeserializer反序列化,自己在代碼里反序列化比較好。
6、broker單個(gè)故障,導(dǎo)致消費(fèi)者無(wú)法提交偏移量的問(wèn)題
生產(chǎn)環(huán)境,為了性能和故障轉(zhuǎn)移,部署了6個(gè)broker,某天有一個(gè)broker故障下線了,按理應(yīng)該會(huì)自動(dòng)切換,實(shí)際上,所有的消費(fèi)者都開(kāi)始拋異常:“error when storing group assignment during syncgroup”
直到人工恢復(fù)broker并上線,故障才恢復(fù)。
最終排查結(jié)果:
- 運(yùn)維把kafka的一個(gè)內(nèi)部topic:
__consumer_offsets
副本數(shù)配置為2, - 同時(shí)配置
min.insync.replicas=2
,該配置的含義是ISR列表最小同步副本數(shù)不得少于2個(gè) - 故障下線的broker,正好包含該topic:
__consumer_offsets
的一個(gè)副本,導(dǎo)致該主題的副本數(shù)只剩下一個(gè),不符合min.insync.replicas=2
配置要求,從而停止工作 - topic:
__consumer_offsets
的作用,是接收并存儲(chǔ)所有消費(fèi)者組的消費(fèi)偏移量,該主題不工作,就會(huì)導(dǎo)致消費(fèi)者無(wú)法提交偏移量,從而導(dǎo)致所有消費(fèi)不正常,會(huì)重復(fù)消費(fèi)數(shù)據(jù)。
知道問(wèn)題了,調(diào)整就是把 topic: __consumer_offsets
的副本數(shù)調(diào)整為3(默認(rèn)值就是3,運(yùn)維改錯(cuò)了)
7、所有消費(fèi)者都不消費(fèi)任何消息
如果消費(fèi)者先啟動(dòng),然后才創(chuàng)建topic,會(huì)導(dǎo)致消費(fèi)者消費(fèi)不到數(shù)據(jù),可以重啟消費(fèi)者試試文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-495742.html
8、spring中的kafka,是否存在線程安全問(wèn)題
生產(chǎn)者使用的KafkaTemplate是線程安全的,經(jīng)測(cè)試,都是使用同一個(gè)線程進(jìn)行消息發(fā)送。
同樣,消費(fèi)者也是線程安全的,每個(gè)消費(fèi)者也是單線程處理所有接收到的消息。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-495742.html
9、kafka消息堵塞怎么處理
- 如果消息不重要,可以直接刪除主題重建,該主題下的所有消息自然就沒(méi)了,注意需要重建topic,然后重啟消費(fèi)者;
- 如果所有消息都需要消費(fèi),基于kafka的分區(qū)特性,每個(gè)分區(qū)只能一個(gè)消費(fèi)者,因此無(wú)法簡(jiǎn)單通過(guò)增加消費(fèi)者來(lái)解決問(wèn)題
- 確認(rèn)消費(fèi)者有沒(méi)有出現(xiàn)異常,需要注意的是,有些開(kāi)發(fā)人員會(huì)吞掉異常,導(dǎo)致你認(rèn)為消費(fèi)者正常的,可以通過(guò)業(yè)務(wù)數(shù)據(jù)是否持續(xù)增長(zhǎng)來(lái)判別,如果是消費(fèi)者異常了,修復(fù)bug即可。
- 如果消費(fèi)正常,再確認(rèn)是否突發(fā)消息數(shù)據(jù)增長(zhǎng),簡(jiǎn)單判斷就是主題的LAG是否在持續(xù)按正常速率降低,觀察幾分鐘,如果持續(xù)降低,基于判斷是突發(fā)消息,可以耐心等待。
- 如果消息不會(huì)正常下降,基本判斷是消費(fèi)速度慢,
- 先用工具確認(rèn)該主題下每個(gè)分區(qū)的LAG,如果只是某個(gè)分區(qū)的LAG特別高,其它分區(qū)正常(未堵塞),那么應(yīng)該是消息分配不均衡,這個(gè)分區(qū)的消息特別多,考慮調(diào)整生產(chǎn)者的消息key,確保所有分區(qū)的消息數(shù)量均衡;
- 如果對(duì)消息時(shí)序無(wú)特別要求,可以代碼里通過(guò)線程池異步處理消息,注意要控制線程池?cái)?shù)量,不要導(dǎo)致應(yīng)用oom了;
- 考慮增加幾個(gè)分區(qū),再增加幾個(gè)消費(fèi)者,這樣新生產(chǎn)的消息會(huì)重新分散到不同分區(qū),降低舊消費(fèi)者的壓力;
- 堵塞的消息,考慮加個(gè)新消費(fèi)者,消費(fèi)到另外的臨時(shí)主題,臨時(shí)主題多加個(gè)幾倍的分區(qū)和消費(fèi)者,進(jìn)行快速消費(fèi),注意不要對(duì)下游或數(shù)據(jù)庫(kù)造成沖擊,導(dǎo)致其它問(wèn)題。
到了這里,關(guān)于kafka使用詳解、最佳實(shí)踐和問(wèn)題排查的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!