kafka尚硅谷視頻:
10_尚硅谷_Kafka_生產(chǎn)者_原理_嗶哩嗶哩_bilibili
?
???? 1. producer初始化:加載默認配置,以及配置的參數(shù),開啟網(wǎng)絡(luò)線程
???? 2. 攔截器攔截
???? 3. 序列化器進行消息key, value序列化
???? 4. 進行分區(qū)
???? 5. kafka broker集群 獲取metaData
???? 6. 消息緩存到RecordAccumulator收集器,分配到該分區(qū)的DQueue(RecordBatch)
???? 7. batch.size滿了,或者linker.ms到達指定時間,喚醒sender線程, 實例化networkClient
???????? RecordBatch ==>RequestClient 發(fā)送消息體,
????? 8. 與分區(qū)相同broker建立網(wǎng)絡(luò)連接,發(fā)送到對應(yīng)broker
?1. send()方法參數(shù)producerRecord對象:
?
??? 關(guān)于分區(qū):
?
????? a.指定分區(qū),則發(fā)送到該分區(qū)????
????? b.不指定分區(qū),k值沒有傳入,使用黏性分區(qū)(sticky partition)
???????????????? 第一次調(diào)用時隨機生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增),將這個值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說的 round-robin 算法???
??? ? c.不指定分區(qū),傳入k值,k值先進行hash獲取hashCodeValue, 再與topic下的分區(qū)數(shù)進行求模取余,進行分區(qū)。
????? 如 k hash = 5 topic目前的分區(qū)數(shù)2? 則 分區(qū)為:1
????????? k? hash =6? topic目前的分區(qū)數(shù)2? 則 分區(qū)為:0
2. KafkaProducer 異步, 同步發(fā)送api:
??? 異步發(fā)送:
??????????????????? producer.send(producerRecord對象);
??? 同步發(fā)送則send()方法后面.get()?
kafka 的send方法核心邏輯:
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 攔截器集合。多個攔截對象循環(huán)遍歷
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
// 獲取集群信息metadata
try {
this.throwIfProducerClosed();
long nowMs = this.time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
} catch (KafkaException var22) {
if (this.metadata.isClosed()) {
throw new KafkaException("Producer closed while send in progress", var22);
}
throw var22;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 序列化器 key序列化
byte[] serializedKey;
try {
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException var21) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
}
// 序列化器 value序列化
byte[] serializedValue;
try {
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException var20) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
}
// 分區(qū)
int partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
this.setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
this.ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (this.log.isTraceEnabled()) {
this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
}
Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
// RecordAccumulator.append() 添加數(shù)據(jù)轉(zhuǎn) ProducerBatch
RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
this.partitioner.onNewBatch(record.topic(), cluster, partition);
partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (this.log.isTraceEnabled()) {
this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
}
interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (this.transactionManager != null) {
this.transactionManager.maybeAddPartition(tp);
}
// 判斷是否滿了,滿了喚醒sender , sender繼承了runnable
if (result.batchIsFull || result.newBatchCreated) {
this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (ApiException var23) {
this.log.debug("Exception occurred during message send:", var23);
if (tp == null) {
tp = ProducerInterceptors.extractTopicPartition(record);
}
Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
interceptCallback.onCompletion((RecordMetadata)null, var23);
this.errors.record();
this.interceptors.onSendError(record, tp, var23);
return new FutureFailure(var23);
} catch (InterruptedException var24) {
this.errors.record();
this.interceptors.onSendError(record, tp, var24);
throw new InterruptException(var24);
} catch (KafkaException var25) {
this.errors.record();
this.interceptors.onSendError(record, tp, var25);
throw var25;
} catch (Exception var26) {
this.interceptors.onSendError(record, tp, var26);
throw var26;
}
}
? Sender類 run()方法:
public void run() {
this.log.debug("Starting Kafka producer I/O thread.");
while(this.running) {
try {
this.runOnce();
} catch (Exception var5) {
this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
}
}
this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {
try {
this.runOnce();
} catch (Exception var4) {
this.log.error("Uncaught error in kafka producer I/O thread: ", var4);
}
}
while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
if (!this.transactionManager.isCompleting()) {
this.log.info("Aborting incomplete transaction due to shutdown");
this.transactionManager.beginAbort();
}
try {
this.runOnce();
} catch (Exception var3) {
this.log.error("Uncaught error in kafka producer I/O thread: ", var3);
}
}
if (this.forceClose) {
if (this.transactionManager != null) {
this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
this.transactionManager.close();
}
this.log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception var2) {
this.log.error("Failed to close network client", var2);
}
this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
void runOnce() {
if (this.transactionManager != null) {
try {
this.transactionManager.maybeResolveSequences();
if (this.transactionManager.hasFatalError()) {
RuntimeException lastError = this.transactionManager.lastError();
if (lastError != null) {
this.maybeAbortBatches(lastError);
}
this.client.poll(this.retryBackoffMs, this.time.milliseconds());
return;
}
this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
if (this.maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException var5) {
this.log.trace("Authentication exception while processing transactional request", var5);
this.transactionManager.authenticationFailed(var5);
}
}
long currentTimeMs = this.time.milliseconds();
// 發(fā)送數(shù)據(jù)
long pollTimeout = this.sendProducerData(currentTimeMs);
this.client.poll(pollTimeout, currentTimeMs);
}
? sendProducerData() :
????? 最終轉(zhuǎn)換為ClientRequest對象
?????????ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback); ?????????this.client.send(clientRequest, now);
private long sendProducerData(long now) {
Cluster cluster = this.metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Iterator iter;
if (!result.unknownLeaderTopics.isEmpty()) {
iter = result.unknownLeaderTopics.iterator();
while(iter.hasNext()) {
String topic = (String)iter.next();
this.metadata.add(topic, now);
}
this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while(iter.hasNext()) {
Node node = (Node)iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
this.addToInflightBatches(batches);
List expiredBatches;
Iterator var11;
ProducerBatch expiredBatch;
if (this.guaranteeMessageOrder) {
Iterator var9 = batches.values().iterator();
while(var9.hasNext()) {
expiredBatches = (List)var9.next();
var11 = expiredBatches.iterator();
while(var11.hasNext()) {
expiredBatch = (ProducerBatch)var11.next();
this.accumulator.mutePartition(expiredBatch.topicPartition);
}
}
}
this.accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);
expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
if (!expiredBatches.isEmpty()) {
this.log.trace("Expired {} batches in accumulator", expiredBatches.size());
}
var11 = expiredBatches.iterator();
while(var11.hasNext()) {
expiredBatch = (ProducerBatch)var11.next();
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);
if (this.transactionManager != null && expiredBatch.inRetry()) {
this.transactionManager.markSequenceUnresolved(expiredBatch);
}
}
this.sensors.updateProduceRequestMetrics(batches);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0L);
if (!result.readyNodes.isEmpty()) {
this.log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0L;
}
this.sendProduceRequests(batches, now);
return pollTimeout;
}
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
Iterator var4 = collated.entrySet().iterator();
while(var4.hasNext()) {
Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();
this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());
}
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (!batches.isEmpty()) {
Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());
byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();
Iterator var9 = batches.iterator();
while(var9.hasNext()) {
ProducerBatch batch = (ProducerBatch)var9.next();
if (batch.magic() < minUsedMagic) {
minUsedMagic = batch.magic();
}
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
Iterator var16 = batches.iterator();
while(var16.hasNext()) {
ProducerBatch batch = (ProducerBatch)var16.next();
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
if (!records.hasMatchingMagic(minUsedMagic)) {
records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();
}
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
transactionalId = this.transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));
RequestCompletionHandler callback = (response) -> {
this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
// this.client 為KafkaClient接口 實現(xiàn)類:NetworkClient對象
this.client.send(clientRequest, now);
this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
}
?NetworkClient send()方法:
public void send(ClientRequest request, long now) {
this.doSend(request, false, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
this.ensureActive();
String nodeId = clientRequest.destination();
if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
} else {
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
short version;
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
}
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
}
this.doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException var9) {
this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);
if (!isInternalRequest) {
this.abortedSends.add(clientResponse);
} else if (clientRequest.apiKey() == ApiKeys.METADATA) {
this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));
}
}
}
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (this.log.isDebugEnabled()) {
this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
}
Send send = request.toSend(header);
// clientRequest convert InFlightRequest 對象
InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
this.inFlightRequests.add(inFlightRequest);
// nio channel。。。selector 發(fā)送消息信息
//this.selector is Selectable interface KafkaChannel is implement
this.selector.send(new NetworkSend(clientRequest.destination(), send));
}
總結(jié):直接閱讀源碼很快就能想明白kafka 生產(chǎn)者發(fā)送邏輯,kafka-client.jar。? 核心==>
?? 本文第一張圖片文章來源:http://www.zghlxwxcb.cn/news/detail-672545.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-672545.html
到了這里,關(guān)于Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!