一、背景
周所周知,Kafka是一個非常成熟的消息產品,開源社區(qū)也已經經歷了多年的不斷迭代,特性列表更是能裝下好幾馬車,比如:冪等消息、事務支持、多副本高可用、ACL、Auto Rebalance、HW、Leader Epoch、Time Index、Producer Snapshot、Stream、Connector、多級存儲、MirrorMaker、消息壓縮、Fetch Session、Metrics、Quota等等,Kafka的特性列表真要往出列的話,可能會占滿半個屏幕
然后我們今天不去探討這些“炫酷”的feature,只將目光聚焦在消息的生產、存儲、消費上,同時這3個功能也是大部分用戶接觸最多、最基礎的功能,值得花時間去探究
再聲名一點,零拷貝只存在于消息消費環(huán)節(jié)中,消息生產因為需要將消息放入堆內存中進行各種校驗,因此不存在零拷貝的場景
二、消息協(xié)議
我們知道Kafka在演變的過程中,經歷了3個消息協(xié)議版本的迭代,其中V0與V1版大同小異:
同樣這2個版本也是Kafka早期的版本,當然使用廣泛度較低,較為普及的還是V2版本
當然我們這次的敘述均是以V2版本展開的,也就是大家耳熟能詳?shù)?/span>Record Batch
注:關于V2版本的細節(jié)不是本文的重點,這里不再展開,可自行Google,讀者只需要知道V2版本引入了Record Batch機制,即一個Record Batch可能含有1-N條消息
三、消息生產
消息生產端Producer這里沒有太多需要同步的,一言蔽之就是將消息封裝后發(fā)送給Broker端,不過讀者這里想強調一下 Record Batch 的概念
在默認情況下,單Batch的上限是16K,一個Batch可以存儲1條或者多條消息,這個取決于Producer端的配置,如果Producer設置了黏性分區(qū)策略,linger.ms聚批時間設置足夠長(例如1000ms),那么很容易將Batch填滿;又或者linger.ms配置了默認值(linger.ms=0),那么聚批將不會被觸發(fā),那一個Batch上就只有一條消息。因此無論怎樣,Record Batch是消息的載體,也是消息讀取的最小單位(注意不是消息本身,這里在后文還會提及)
上圖表明了,某個 Record Batch 中可能只有一條消息,也有可能存在多條,甚至將16K全部填充滿;無論哪種case,Producer 都是以 Record Batch 粒度將消息發(fā)送至Broker的
四、消息存儲
為了滿足基本的生產、存儲、消費需求的話,只需要2個文件足矣:
- xxxxxxxxx.log 例如:00000000000000000000.log
- xxxxxxxxx.index 例如:00000000000000000000.index
其中l(wèi)og文件是用來存儲消息的,而index文件則是用來存儲稀疏索引的
- log文件:通過append的方式向文件內進行追加,每個Segment對應一個log文件
- index文件:索引文件,每隔4K存儲一次offset+position,幫助快速定位指定位點的文件position用的
注:這里為什么要隔4K做一次稀疏索引,而不是3K或者5K呢?其實這里主要是與硬件兼容,現(xiàn)在多數(shù)廠商的硬件,單次掃數(shù)據的大小一般都是4K對齊的,很多硬件都提升到了8K甚至16K,稀疏索引設置為4K,能保證即便是當前的 Record Batch 只有 1 個字節(jié),后續(xù)的內容也能緩存在Page Cache中,下次掃描的時候,可以直接從緩存中讀取,而不用掃描磁盤
另外,基于V2的存儲版本,消息的查詢都是以 Record Batch 作為最小粒度查詢的,而 Producer 設置的 Record Batch 的默認值為16K,即如果消息攢批合理的話,稀疏索引可能是每隔16K構建起來的
既然index并不是針對每條消息的offset做存儲的,那單憑這2個文件是如何做到可以查詢任意一條消息呢?
五、消息消費
消費的時候,需要設置2個非常重要的參數(shù)
- fetch.min.bytes 單次拉取最小字節(jié)數(shù),默認 1 byte
- fetch.max.bytes 單次拉取最大字節(jié)數(shù),默認 50 * 1024 * 1024 byte,即50M
這2個字段有什么作用呢?
- fetch.min.bytes 表明單次拉取消息的最小字節(jié)數(shù),只要某次拉取的消息數(shù)大于了這個配置項,即便是其還未達到fetch.max.bytes,那么也會直接返回
- fetch.max.bytes 表明單次拉取消息的最大字節(jié)數(shù),注意,這里是嚴格意義上的字節(jié)數(shù),包括消息體即消息協(xié)議
但是很容易想到一個問題,如果 fetch.max.bytes 配置的大小是1M,而下一條消息有2M,那豈不是永遠都拉不到消息了? 這種場景,其實Kafka的策略是至少會返回一條消息,即便是這條消息的大小超過了 fetch.max.bytes,也會將其返回
看一個非常直接的問題:consumer要拉取消息,且從位點offset 4567 開始查詢,fetch.min.bytes = 10K,fetch.max.bytes = 100K。RocketMQ的索引文件存儲了每條消息的position,可以想象為KV對兒:offset:position,根據offset可以快速定位該條消息對應的CommitLog的文件position,而基于稀疏索引的Kafka,如何快速定位查詢呢?
筆者認為,論設計來看,Kafka的稀疏索引可能更合理,更貼合操作文件的模式
?
既然是稀疏索引,那么勢必沒有將每條消息對應的position做存儲,那么如何定位單條消息的具體位置呢?
這里先直接給出結論:
- 先讀取稀疏索引定位大致位置
- 然后讀取log文件準確定位
5.1、粗略定位
假定稀疏索引.index
文件的內容如下
Offset |
Position |
100 |
4000 |
110 |
8200 |
120 |
13000 |
130 |
18000 |
當我們要尋找 offset 為115位點對應的文件position時,因為115介于「110-120」之間,因此稀疏索引能夠提供的信息就是需要從 8200 的位置開始往后找,這樣也就粗略定位了115的大致position
這里延深一些,110對應的8200如何尋找呢? 真實環(huán)境是這個數(shù)據存儲在index文件中,而且會有很多KV對;其實這里使用的是二分查找,當知道某個隊列的起始結束offset,快速定位其中的某個offset時,二分查找是個非常不錯的方案,具體實現(xiàn)類在scala/kafka/log/AbstractIndex.scala
本文不再贅述
現(xiàn)在只是根據稀疏索引定位到了大致位置,具體應該從哪里返回數(shù)據呢?這就涉及第二步精細定位
5.2、精細定位
粗略定位是掃描.index
文件,而精細定位則是掃描.log
文件。在5.1粗略定位中,LogSegment.scala 有一段代碼
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
val mapping = offsetIndex.lookup(offset)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
其中
-
val mapping = offsetIndex.lookup(offset)
是用來粗略定位的,主要返回mapping.position
,即精細查找的起始position -
log.searchForOffsetWithSize
精細定位,通過迭代 Record Batch 實現(xiàn)
現(xiàn)在已經知道了開始掃描的文件起始position,那么接下來就要逐一掃描 Record Batch
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position and the size of the message (including log overhead) at the returned offset. If
* no such offsets are found, return null.
*
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
}
return null;
}
因為V2版本已經指定了 Record Batch 的總長度、start offset、last offset等,因此很快可以定位
我們再舉個例子,將5.1與5.2串起來
在稀疏索引中,offset與文件position(文件的物理position)的對應存儲只有3個:
5:500
13: 1300
24: 2400
當某次consumer要從offset=20的位置拉取消息時
- 第一步,需要二分查找稀疏索引,這個時候發(fā)現(xiàn),比20小的最大offset是13,因此找到13對應的文件position,即1300,繼而繼續(xù)從log文件中查找
- 第二步,從1300位置讀取 Record Batch,不過只讀取header部分即可,從而獲取到此Batch的最小offset是13,最大offset是18,length是XX(參照V2消息協(xié)議,其中都有存儲這些數(shù)據);發(fā)現(xiàn)目標offset 20并不在當前Batch中,那么繼而掃描下一個 Record Batch,并最終定位 Record Batch 3 就是目標Batch,然后返回此Batch 的start position (注:不是20對應的position,而是Record Batch 3的position)
Kafka這樣設計,我們還是有一些疑問的
-
精細定位的時候,需要逐一遍歷
.log
文件,而 Record Batch 可能也會有多個,這樣是否存在頻繁訪問磁盤的bad case?
- 首先確認一點,這個問題是不存在的;
- 當我們通過稀疏索引定位到大致的位置后,目標offset離我們其實已經不遠了,可能只有4K的數(shù)據,當我們掃描這4K數(shù)據的時候,借助于操作系統(tǒng)的預讀,在第一次讀取時,會將這4K的數(shù)據都加載到 Page Cache 中,后續(xù)再讀取時,其實是直接從 Page Cache 中讀的,性能很高
- 還有一種情況是,Record Batch 比較大,比如有16K,這種情況基本上是一個 Record Batch 就對應了稀疏索引中的一條數(shù)據,因此掃描1條 Record Batch 即定位數(shù)據
- 上例中,既然我們能夠找到 offset=20 對應的文件物理position,為什么還要返回此 Record Batch 的position?
- 這里主要是跟V2版本的協(xié)議有關,不論是數(shù)據生產還是消費,其所有交互中,Record Batch 是最小單位,即便是單個 Record Batch 中只有1條數(shù)據。因此如果直接返回offset=20的position,會導致client端數(shù)據解析的失敗
5.3、數(shù)據拉取
數(shù)據拉取就相對比較簡單了,這里用到了零拷貝技術,也就是FileChannel.transferTo(long position, long count)
,將磁盤中的數(shù)據直接拷貝給網卡??截惖钠鹗紁osition就是上文中定位的,而拷貝的長度則是fetch.max.bytes
。但fetch.max.bytes
是用戶指定的,如何確??截惖臄?shù)據段結尾正好落在 Record Batch 的結尾?
其實這里是無法保證最后一個 Record Batch 是完整的,也就是存在拷貝了半個 Record Batch 的情況;而這種情況的兼容則放在了Consumer端來做,當Consumer發(fā)現(xiàn)拉取的數(shù)據不完整時,便直接截斷了,截取org/apache/kafka/common/record/ByteBufferLogInputStream.java
部分代碼:
public MutableRecordBatch nextBatch() {
int remaining = buffer.remaining();
Integer batchSize = nextBatchSize();
if (batchSize == null || remaining < batchSize)
return null;
byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
ByteBuffer batchSlice = buffer.slice();
batchSlice.limit(batchSize);
buffer.position(buffer.position() + batchSize);
if (magic > RecordBatch.MAGIC_VALUE_V1)
return new DefaultRecordBatch(batchSlice);
else
return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
}
其中 (remaining < batchSize) {return null;}
就是判斷半條消息的case
那這種情況,豈不是存在浪費資源的情況,最后一個 Record Batch 可能會被拷貝2次?
確實存在,不過我們要分情況來討論:
- Consumer熱讀,即Consumer讀取數(shù)據時,消息還在 Page Cache 中有停留,這也是大部分的Kafka的消費場景,因為默認單次拉取的最大數(shù)據為50M,因此一般可以拉去全量未讀數(shù)據,因此也就不存在浪費資源的情況
- Consumer冷讀,直接從磁盤中讀取數(shù)據,是大概率存在讀取了半條消息的case的,不過由于單次可能會拉取50M的數(shù)據,而浪費的數(shù)據可能只有幾K,因此是可以接受的
六、場景演練
我們站在client端的角度,來看下幾種場景下消息消費的情況
6.1、單Batch單消息
元數(shù)據: |
創(chuàng)建topic3,單分區(qū)、單副本 |
生產消息: |
向其發(fā)放5條消息,每條消息1K,然后linger.ms保持默認值0 |
通過這樣配置,因為linger.ms=0,因此topic3中就存在了5個 Record Batch,然后每個Batch中的消息數(shù)量為 1
然后設置 fetch.max.bytes=3500 后進行查詢,通過debug查看org/apache/kafka/common/record/ByteBufferLogInputStream.java
類中ByteBuffer在執(zhí)行消息反序列化之前的狀態(tài)
因為設置最大的拉取數(shù)據大小是3500字節(jié),而我們單條消息設置了1K,且1個Batch中只有1條消息,因此預期會有拉取半條消息的case
果然打開debug后發(fā)現(xiàn),Consumer直接從Broker中拉取了3500byte的數(shù)據,也就是拉取了3條完整的Batch+半條消息。處理完這3條消息后,第二次發(fā)起查詢:
發(fā)現(xiàn)只返回了最后2條消息,且不存在半條消息的case。這種情況下,第4條消息的前半段實際上是重復發(fā)送了2次
6.2、單Batch多消息
元數(shù)據: |
創(chuàng)建topic4,單分區(qū)、單副本 |
生產消息: |
連續(xù)向其發(fā)放2條消息,第一條消息1K,第二條2K,linger.ms=1000 |
因為linger.ms=1000,因此topic4的兩條消息將會發(fā)生聚批,也就是1個Batch中存儲了2條消息
然后設置 fetch.max.bytes=10 后進行查詢,因為單Batch的消息已經有3K,因此預期會返回3K的數(shù)據
與預期相符,這次我們通過assign的方式,并指定 offset=1 開始消費,并打印消費的消息的條數(shù)
private void consume() {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getCommonProperties());
kafkaConsumer.commitAsync();
kafkaConsumer.assign(Collections.singletonList(new TopicPartition(Common.TOPIC_NAME, 0)));
TopicPartition topicPartition = new TopicPartition(Common.TOPIC_NAME, 0);
kafkaConsumer.seek(topicPartition, 1);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
System.out.println("records.count is : " + records.count());
if (records.count() > 0) {
} else {
System.out.println("none msg");
}
}
}
運行后發(fā)現(xiàn),確實只收到了一條消息
records.count is : 1
records.count is : 0
none msg
records.count is : 0
然后debug看consumer真實收到的數(shù)據長度
不出意外,Broker返回了offset=1所在的 Record Batch 的所有數(shù)據,而過濾、兼容則都由Consumer來完成文章來源:http://www.zghlxwxcb.cn/news/detail-772397.html
再考慮到Consumer的各種Rebalance以及Producer聚批等,說Kafka的client是重客戶端,我想大抵就是如此文章來源地址http://www.zghlxwxcb.cn/news/detail-772397.html
到了這里,關于Kafka干貨之「零拷貝」的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!