問題描述
Kafka是常用的消息中間件。在Spring Boot項目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時,為了不影響主業(yè)務流程,會采用異步發(fā)送的方式,如下所示。
@Slf4j
@Component
public class KafkaSender {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendAsync(String topic, String message) {
kafkaTemplate.send(topic, message)
.addCallback(
sendResult -> log.info("Send success"),
e -> log.error("Send failed", e));
}
}
本以為采用異步發(fā)送,必然不會影響到主業(yè)務流程。但實際使用時發(fā)現(xiàn),在第一次發(fā)送消息時,如果Kafka Broker連接失敗,調(diào)用sendAsync()
方法的主線程會長時間阻塞。這點是出乎意料的。
原因分析
跟蹤源碼可知,Kafka生產(chǎn)者在第一次發(fā)送消息時,會嘗試從Broker獲取元數(shù)據(jù)Metadata(見KafkaProducer
的waitOnMetadata()
方法),如果Broker連接失敗,則會一直阻塞于此,循環(huán)嘗試獲取,直至超時(超時時間由max.block.ms
定義)。
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param partition A specific partition expected to exist in metadata, or null if there's no preference
* @param nowMs The current time in ms
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we waited in ms
* @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long remainingWaitMs = maxWaitMs;
long elapsed = 0;
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic, nowMs + elapsed);
int version = metadata.requestUpdateForTopic(topic);
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
也就是說,Kafka生產(chǎn)者在發(fā)送消息前,要先獲取到Metadata。對于異步發(fā)送,雖然消息發(fā)送的過程是非阻塞的,但獲取Metadata的過程是阻塞的。如果因為Broker連接失敗、Topic未創(chuàng)建等原因而一直獲取不到Metadata,主線程將長時間阻塞。
解決辦法
解決辦法也很簡單。如果Kafka發(fā)送消息并非關鍵業(yè)務,為了不影響主業(yè)務流程的進行,可以創(chuàng)建線程池來專門執(zhí)行消息發(fā)送工作,保證sendAsync()
方法一定是異步執(zhí)行的。注意,線程池大小和工作隊列長度需要合理限定,避免因阻塞任務過多而OOM;拒絕策略可以視情況選擇DiscardPolicy。
另外,還可以考慮指定max.block.ms
,來限制獲取Metadata的最大阻塞時間(默認60000ms):
spring:
kafka:
producer:
properties:
max.block.ms: 1000
實際上,在異步發(fā)送消息的過程中,除了因為獲取不到Metadata而阻塞外,還可能因為消息緩沖池已滿而阻塞(參考:Kafka Producer 異步發(fā)送消息居然也會阻塞?)。這2種阻塞的超時時間均由max.block.ms
定義。文章來源:http://www.zghlxwxcb.cn/news/detail-412084.html
總結(jié)
Kafka生產(chǎn)者異步發(fā)送消息的方法(如Spring Boot中的kafkaTemplate.send()
),看似異步,實則可能阻塞。由于發(fā)送消息前需要獲取元數(shù)據(jù)Metadata,如果一直獲取失?。赡茉虬˙roker連接失敗、Topic未創(chuàng)建等),將導致長時間阻塞。這點與我們的一般理解不符,需要特別注意。文章來源地址http://www.zghlxwxcb.cn/news/detail-412084.html
到了這里,關于【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!