官方網(wǎng)址
源碼:https://kafka.apache.org/downloads
快速開始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合
發(fā)送消息流程
主線程:主線程只負(fù)責(zé)組織消息,如果是同步發(fā)送會(huì)阻塞,如果是異步發(fā)送需要傳入一個(gè)回調(diào)函數(shù)。
Map集合:存儲(chǔ)了主線程的消息。
Sender線程:真正的發(fā)送其實(shí)是sender去發(fā)送到broker中。
源碼閱讀
1 首先打開Producer.send()可以看到里面的內(nèi)容
// 返回值是一個(gè) Future 參數(shù)為ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定義了這些信息
// 主題
private final String topic;
// 分區(qū)
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 時(shí)間戳
private final Long timestamp;
2 發(fā)送之前的前置處理
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
// 這里給開發(fā)者提供了前置處理的勾子
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
// 我們最終發(fā)送的是經(jīng)過處理后的消息 并且如果是異步發(fā)送會(huì)有callback 這個(gè)是用戶定義的
return doSend(interceptedRecord, callback);
}
3 進(jìn)入真正的發(fā)送邏輯Future doSend()
- 由于是網(wǎng)絡(luò)通信,所以我們要序列化,在這個(gè)函數(shù)里面就做了序列化的內(nèi)容。
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
- 然后我們獲取分區(qū)
// 然后這里又是一個(gè)策略者模式 也是由用戶可以配置的 DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了這樣三個(gè)分區(qū)器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
4 到了我們的RecordAccumulator,也就是先由主線程發(fā)送到了RecordAccumulator
// 也就是對(duì)圖中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
我們發(fā)現(xiàn)里面是用一個(gè)MAP存儲(chǔ)的一個(gè)分區(qū)和ProducerBatch 是講這個(gè)消息寫到內(nèi)存里面MemoryRecordsBuilder 通過這個(gè)進(jìn)行寫入
// 可以看到是一個(gè)鏈表實(shí)現(xiàn)的雙向隊(duì)列,也就是消息會(huì)按append的順序?qū)懙?內(nèi)存記錄中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
5 接著我們看,我們append了以后,會(huì)有一個(gè)判斷去喚醒sender線程,見下面的注釋
// 如果說哦我們當(dāng)前的 這個(gè)batch滿了或者 我們創(chuàng)建了一個(gè)新的batch 這個(gè)時(shí)候喚醒 sender線程去發(fā)送數(shù)據(jù)
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 喚醒sender 去發(fā)送數(shù)據(jù)
this.sender.wakeup();
}
// 實(shí)現(xiàn)了Runnable 所以我們?nèi)タ匆幌翿UN方法的邏輯
public class Sender implements Runnable
好上來就是一個(gè)循環(huán)
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
接著進(jìn)入runOnece方法,直接看核心邏輯
// 從RecordAccumulator 拿數(shù)據(jù) 然后發(fā)送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
// 中間省去了非核心邏輯
sendProduceRequests(batches, now);
如果繼續(xù)跟蹤的話最終是走到了selector.send()里面:
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
6 接著我們就要看返回邏輯了,可以看到在sendRequest里面sendProduceRequest方法是通過傳入了一個(gè)回調(diào)函數(shù)處理返回的。
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
// 如果有返回
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
}
追蹤到ProducerBatch
if (this.finalState.compareAndSet(null, tryFinalState)) {
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
return true;
}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
// execute callbacks
for (Thunk thunk : thunks) {
try {
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
if (thunk.callback != null)
thunk.callback.onCompletion(metadata, null);
} else {
if (thunk.callback != null)
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
Thunk 這個(gè)其實(shí)就是我們?cè)贏ppend的時(shí)候的回調(diào):
至此整個(gè)流程就完成了,從發(fā)送消息,到響應(yīng)后回調(diào)我們的函數(shù)。
消息可靠性
// 所有消費(fèi)者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";
acks = 0:異步形式,單向發(fā)送,不會(huì)等待 broker 的響應(yīng)
acks = 1:主分區(qū)保存成功,然后就響應(yīng)了客戶端,并不保證所有的副本分區(qū)保存成功
acks = all 或 -1:等待 broker 的響應(yīng),然后 broker 等待副本分區(qū)的響應(yīng),總之?dāng)?shù)據(jù)落地到所有的分區(qū)后,才能給到producer 一個(gè)響應(yīng)
在可靠性的保證下,假設(shè)一些故障:
文章來源:http://www.zghlxwxcb.cn/news/detail-838011.html
- Broker 收到消息后,同步 ISR 異常:只要在 -1 的情況下,其實(shí)不會(huì)造成消息的丟失,因?yàn)橛兄卦嚈C(jī)制
- Broker 收到消息,并同步 ISR 成功,但是響應(yīng)超時(shí):只要在 -1 的情況下,其實(shí)不會(huì)造成消息的丟失,因?yàn)橛兄卦嚈C(jī)制
可靠性能保證哪些,不能保障哪些?
文章來源地址http://www.zghlxwxcb.cn/news/detail-838011.html
- 保證了消息不會(huì)丟失
- 不保證消息一定不會(huì)重復(fù)(消息有重復(fù)的概率,需要消費(fèi)者有冪等性控制機(jī)制)
到了這里,關(guān)于消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!