国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka干貨之「零拷貝」

這篇具有很好參考價值的文章主要介紹了Kafka干貨之「零拷貝」。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、背景

周所周知,Kafka是一個非常成熟的消息產品,開源社區(qū)也已經經歷了多年的不斷迭代,特性列表更是能裝下好幾馬車,比如:冪等消息、事務支持、多副本高可用、ACL、Auto Rebalance、HW、Leader Epoch、Time Index、Producer Snapshot、Stream、Connector、多級存儲、MirrorMaker、消息壓縮、Fetch Session、Metrics、Quota等等,Kafka的特性列表真要往出列的話,可能會占滿半個屏幕

然后我們今天不去探討這些“炫酷”的feature,只將目光聚焦在消息的生產、存儲、消費上,同時這3個功能也是大部分用戶接觸最多、最基礎的功能,值得花時間去探究

再聲名一點,零拷貝只存在于消息消費環(huán)節(jié)中,消息生產因為需要將消息放入堆內存中進行各種校驗,因此不存在零拷貝的場景

二、消息協(xié)議

我們知道Kafka在演變的過程中,經歷了3個消息協(xié)議版本的迭代,其中V0與V1版大同小異:

Kafka干貨之「零拷貝」

同樣這2個版本也是Kafka早期的版本,當然使用廣泛度較低,較為普及的還是V2版本

Kafka干貨之「零拷貝」

當然我們這次的敘述均是以V2版本展開的,也就是大家耳熟能詳?shù)?/span>Record Batch 注:關于V2版本的細節(jié)不是本文的重點,這里不再展開,可自行Google,讀者只需要知道V2版本引入了Record Batch機制,即一個Record Batch可能含有1-N條消息

三、消息生產

消息生產端Producer這里沒有太多需要同步的,一言蔽之就是將消息封裝后發(fā)送給Broker端,不過讀者這里想強調一下 Record Batch 的概念

在默認情況下,單Batch的上限是16K,一個Batch可以存儲1條或者多條消息,這個取決于Producer端的配置,如果Producer設置了黏性分區(qū)策略,linger.ms聚批時間設置足夠長(例如1000ms),那么很容易將Batch填滿;又或者linger.ms配置了默認值(linger.ms=0),那么聚批將不會被觸發(fā),那一個Batch上就只有一條消息。因此無論怎樣,Record Batch是消息的載體,也是消息讀取的最小單位(注意不是消息本身,這里在后文還會提及

Kafka干貨之「零拷貝」

上圖表明了,某個 Record Batch 中可能只有一條消息,也有可能存在多條,甚至將16K全部填充滿;無論哪種case,Producer 都是以 Record Batch 粒度將消息發(fā)送至Broker的

四、消息存儲

為了滿足基本的生產、存儲、消費需求的話,只需要2個文件足矣:

  1. xxxxxxxxx.log 例如:00000000000000000000.log
  2. xxxxxxxxx.index 例如:00000000000000000000.index

其中l(wèi)og文件是用來存儲消息的,而index文件則是用來存儲稀疏索引的

  • log文件:通過append的方式向文件內進行追加,每個Segment對應一個log文件
  • index文件:索引文件,每隔4K存儲一次offset+position,幫助快速定位指定位點的文件position用的

注:這里為什么要隔4K做一次稀疏索引,而不是3K或者5K呢?其實這里主要是與硬件兼容,現(xiàn)在多數(shù)廠商的硬件,單次掃數(shù)據的大小一般都是4K對齊的,很多硬件都提升到了8K甚至16K,稀疏索引設置為4K,能保證即便是當前的 Record Batch 只有 1 個字節(jié),后續(xù)的內容也能緩存在Page Cache中,下次掃描的時候,可以直接從緩存中讀取,而不用掃描磁盤

另外,基于V2的存儲版本,消息的查詢都是以 Record Batch 作為最小粒度查詢的,而 Producer 設置的 Record Batch 的默認值為16K,即如果消息攢批合理的話,稀疏索引可能是每隔16K構建起來的

既然index并不是針對每條消息的offset做存儲的,那單憑這2個文件是如何做到可以查詢任意一條消息呢?

五、消息消費

消費的時候,需要設置2個非常重要的參數(shù)

  1. fetch.min.bytes 單次拉取最小字節(jié)數(shù),默認 1 byte
  2. fetch.max.bytes 單次拉取最大字節(jié)數(shù),默認 50 * 1024 * 1024 byte,即50M

這2個字段有什么作用呢?

  • fetch.min.bytes 表明單次拉取消息的最小字節(jié)數(shù),只要某次拉取的消息數(shù)大于了這個配置項,即便是其還未達到fetch.max.bytes,那么也會直接返回
  • fetch.max.bytes 表明單次拉取消息的最大字節(jié)數(shù),注意,這里是嚴格意義上的字節(jié)數(shù),包括消息體即消息協(xié)議

但是很容易想到一個問題,如果 fetch.max.bytes 配置的大小是1M,而下一條消息有2M,那豈不是永遠都拉不到消息了? 這種場景,其實Kafka的策略是至少會返回一條消息,即便是這條消息的大小超過了 fetch.max.bytes,也會將其返回

看一個非常直接的問題:consumer要拉取消息,且從位點offset 4567 開始查詢,fetch.min.bytes = 10K,fetch.max.bytes = 100K。RocketMQ的索引文件存儲了每條消息的position,可以想象為KV對兒:offset:position,根據offset可以快速定位該條消息對應的CommitLog的文件position,而基于稀疏索引的Kafka,如何快速定位查詢呢?

筆者認為,論設計來看,Kafka的稀疏索引可能更合理,更貼合操作文件的模式

?

既然是稀疏索引,那么勢必沒有將每條消息對應的position做存儲,那么如何定位單條消息的具體位置呢?

這里先直接給出結論:

  1. 先讀取稀疏索引定位大致位置
  2. 然后讀取log文件準確定位

5.1、粗略定位

假定稀疏索引.index文件的內容如下

Offset

Position

100

4000

110

8200

120

13000

130

18000

當我們要尋找 offset 為115位點對應的文件position時,因為115介于「110-120」之間,因此稀疏索引能夠提供的信息就是需要從 8200 的位置開始往后找,這樣也就粗略定位了115的大致position

Kafka干貨之「零拷貝」

這里延深一些,110對應的8200如何尋找呢? 真實環(huán)境是這個數(shù)據存儲在index文件中,而且會有很多KV對;其實這里使用的是二分查找,當知道某個隊列的起始結束offset,快速定位其中的某個offset時,二分查找是個非常不錯的方案,具體實現(xiàn)類在scala/kafka/log/AbstractIndex.scala

Kafka干貨之「零拷貝」

本文不再贅述

現(xiàn)在只是根據稀疏索引定位到了大致位置,具體應該從哪里返回數(shù)據呢?這就涉及第二步精細定位

5.2、精細定位

粗略定位是掃描.index文件,而精細定位則是掃描.log文件。在5.1粗略定位中,LogSegment.scala 有一段代碼

  @threadsafe
  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
    val mapping = offsetIndex.lookup(offset)
    log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
  }

其中

  • val mapping = offsetIndex.lookup(offset)是用來粗略定位的,主要返回mapping.position,即精細查找的起始position
  • log.searchForOffsetWithSize 精細定位,通過迭代 Record Batch 實現(xiàn)

現(xiàn)在已經知道了開始掃描的文件起始position,那么接下來就要逐一掃描 Record Batch

/**
 * Search forward for the file position of the last offset that is greater than or equal to the target offset
 * and return its physical position and the size of the message (including log overhead) at the returned offset. If
 * no such offsets are found, return null.
 *
 * @param targetOffset The offset to search for.
 * @param startingPosition The starting position in the file to begin searching from.
 */
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
    for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
        long offset = batch.lastOffset();
        if (offset >= targetOffset)
            return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
    }
    return null;
}

因為V2版本已經指定了 Record Batch 的總長度、start offset、last offset等,因此很快可以定位

我們再舉個例子,將5.1與5.2串起來

Kafka干貨之「零拷貝」

在稀疏索引中,offset與文件position(文件的物理position)的對應存儲只有3個:

5:500

13: 1300

24: 2400

當某次consumer要從offset=20的位置拉取消息時

  • 第一步,需要二分查找稀疏索引,這個時候發(fā)現(xiàn),比20小的最大offset是13,因此找到13對應的文件position,即1300,繼而繼續(xù)從log文件中查找
  • 第二步,從1300位置讀取 Record Batch,不過只讀取header部分即可,從而獲取到此Batch的最小offset是13,最大offset是18,length是XX(參照V2消息協(xié)議,其中都有存儲這些數(shù)據);發(fā)現(xiàn)目標offset 20并不在當前Batch中,那么繼而掃描下一個 Record Batch,并最終定位 Record Batch 3 就是目標Batch,然后返回此Batch 的start position (注:不是20對應的position,而是Record Batch 3的position

Kafka這樣設計,我們還是有一些疑問的

  1. 精細定位的時候,需要逐一遍歷.log文件,而 Record Batch 可能也會有多個,這樣是否存在頻繁訪問磁盤的bad case?
    1. 首先確認一點,這個問題是不存在的;
    2. 當我們通過稀疏索引定位到大致的位置后,目標offset離我們其實已經不遠了,可能只有4K的數(shù)據,當我們掃描這4K數(shù)據的時候,借助于操作系統(tǒng)的預讀,在第一次讀取時,會將這4K的數(shù)據都加載到 Page Cache 中,后續(xù)再讀取時,其實是直接從 Page Cache 中讀的,性能很高
    3. 還有一種情況是,Record Batch 比較大,比如有16K,這種情況基本上是一個 Record Batch 就對應了稀疏索引中的一條數(shù)據,因此掃描1條 Record Batch 即定位數(shù)據
  1. 上例中,既然我們能夠找到 offset=20 對應的文件物理position,為什么還要返回此 Record Batch 的position?
    1. 這里主要是跟V2版本的協(xié)議有關,不論是數(shù)據生產還是消費,其所有交互中,Record Batch 是最小單位,即便是單個 Record Batch 中只有1條數(shù)據。因此如果直接返回offset=20的position,會導致client端數(shù)據解析的失敗

5.3、數(shù)據拉取

數(shù)據拉取就相對比較簡單了,這里用到了零拷貝技術,也就是FileChannel.transferTo(long position, long count),將磁盤中的數(shù)據直接拷貝給網卡??截惖钠鹗紁osition就是上文中定位的,而拷貝的長度則是fetch.max.bytes。但fetch.max.bytes是用戶指定的,如何確??截惖臄?shù)據段結尾正好落在 Record Batch 的結尾?

其實這里是無法保證最后一個 Record Batch 是完整的,也就是存在拷貝了半個 Record Batch 的情況;而這種情況的兼容則放在了Consumer端來做,當Consumer發(fā)現(xiàn)拉取的數(shù)據不完整時,便直接截斷了,截取org/apache/kafka/common/record/ByteBufferLogInputStream.java 部分代碼:

public MutableRecordBatch nextBatch() {
    int remaining = buffer.remaining();

    Integer batchSize = nextBatchSize();
    if (batchSize == null || remaining < batchSize)
        return null;

    byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);

    ByteBuffer batchSlice = buffer.slice();
    batchSlice.limit(batchSize);
    buffer.position(buffer.position() + batchSize);

    if (magic > RecordBatch.MAGIC_VALUE_V1)
        return new DefaultRecordBatch(batchSlice);
    else
        return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
}

其中 (remaining < batchSize) {return null;} 就是判斷半條消息的case

那這種情況,豈不是存在浪費資源的情況,最后一個 Record Batch 可能會被拷貝2次?

確實存在,不過我們要分情況來討論:

  • Consumer熱讀,即Consumer讀取數(shù)據時,消息還在 Page Cache 中有停留,這也是大部分的Kafka的消費場景,因為默認單次拉取的最大數(shù)據為50M,因此一般可以拉去全量未讀數(shù)據,因此也就不存在浪費資源的情況
  • Consumer冷讀,直接從磁盤中讀取數(shù)據,是大概率存在讀取了半條消息的case的,不過由于單次可能會拉取50M的數(shù)據,而浪費的數(shù)據可能只有幾K,因此是可以接受的

六、場景演練

我們站在client端的角度,來看下幾種場景下消息消費的情況

6.1、單Batch單消息

元數(shù)據:

創(chuàng)建topic3,單分區(qū)、單副本

生產消息:

向其發(fā)放5條消息,每條消息1K,然后linger.ms保持默認值0

通過這樣配置,因為linger.ms=0,因此topic3中就存在了5個 Record Batch,然后每個Batch中的消息數(shù)量為 1

然后設置 fetch.max.bytes=3500 后進行查詢,通過debug查看org/apache/kafka/common/record/ByteBufferLogInputStream.java 類中ByteBuffer在執(zhí)行消息反序列化之前的狀態(tài)

因為設置最大的拉取數(shù)據大小是3500字節(jié),而我們單條消息設置了1K,且1個Batch中只有1條消息,因此預期會有拉取半條消息的case

Kafka干貨之「零拷貝」

果然打開debug后發(fā)現(xiàn),Consumer直接從Broker中拉取了3500byte的數(shù)據,也就是拉取了3條完整的Batch+半條消息。處理完這3條消息后,第二次發(fā)起查詢:

Kafka干貨之「零拷貝」

發(fā)現(xiàn)只返回了最后2條消息,且不存在半條消息的case。這種情況下,第4條消息的前半段實際上是重復發(fā)送了2次

6.2、單Batch多消息

元數(shù)據:

創(chuàng)建topic4,單分區(qū)、單副本

生產消息:

連續(xù)向其發(fā)放2條消息,第一條消息1K,第二條2K,linger.ms=1000

因為linger.ms=1000,因此topic4的兩條消息將會發(fā)生聚批,也就是1個Batch中存儲了2條消息

然后設置 fetch.max.bytes=10 后進行查詢,因為單Batch的消息已經有3K,因此預期會返回3K的數(shù)據

Kafka干貨之「零拷貝」

與預期相符,這次我們通過assign的方式,并指定 offset=1 開始消費,并打印消費的消息的條數(shù)

private void consume() {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getCommonProperties());
    kafkaConsumer.commitAsync();
    kafkaConsumer.assign(Collections.singletonList(new TopicPartition(Common.TOPIC_NAME, 0)));
    
    TopicPartition topicPartition = new TopicPartition(Common.TOPIC_NAME, 0);
    kafkaConsumer.seek(topicPartition, 1);

    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
        System.out.println("records.count is : " + records.count());
        if (records.count() > 0) {
        } else {
            System.out.println("none msg");
        }
    }
}

運行后發(fā)現(xiàn),確實只收到了一條消息

records.count is : 1
records.count is : 0
none msg
records.count is : 0

然后debug看consumer真實收到的數(shù)據長度

Kafka干貨之「零拷貝」

不出意外,Broker返回了offset=1所在的 Record Batch 的所有數(shù)據,而過濾、兼容則都由Consumer來完成

再考慮到Consumer的各種Rebalance以及Producer聚批等,說Kafka的client是重客戶端,我想大抵就是如此文章來源地址http://www.zghlxwxcb.cn/news/detail-772397.html

到了這里,關于Kafka干貨之「零拷貝」的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 大數(shù)據課程I4——Kafka的零拷貝技術

    大數(shù)據課程I4——Kafka的零拷貝技術

    文章作者郵箱:yugongshiye@sina.cn? ? ? ? ? ? ? 地址:廣東惠州 ??掌握Kafka的零拷貝技術; ? 了解常規(guī)的文件傳輸過程; ?表面上一個很簡單的網絡文件輸出的過程,在OS底層,會發(fā)現(xiàn)數(shù)據會被拷貝4次。 內核態(tài)可以理解為特權態(tài),可以訪問計算機的所有資源。 而用戶態(tài)的訪

    2024年02月13日
    瀏覽(18)
  • 【QA02】云技術交流群一周知識匯總

    一周概覽 云菜雞:helo 大家好我是云菜雞,春風得意馬蹄疾,大佬帶我行不行;山重水復疑無路,跟著大佬不迷路。本周主要話題有:賬單,CI/CD,數(shù)據庫,操作系統(tǒng),自動化運維工具等相關話題,如果您閱讀技術類話題感到焦慮,Q10是為您準備的。 Q1 AWS SMS 相關 云菜雞:大

    2024年02月06日
    瀏覽(20)
  • Kafka的零拷貝技術Zero-Copy

    Kafka的零拷貝技術Zero-Copy

    流程步驟: (1)操作系統(tǒng)將數(shù)據從磁盤文件中讀取到內核空間的頁面緩存; (2)應用程序將數(shù)據從內核空間讀入用戶空間緩沖區(qū); (3)應用程序將讀到數(shù)據寫回內核空間并放入socket緩沖區(qū); (4)操作系統(tǒng)將數(shù)據從socket緩沖區(qū)復制到網卡接口,此時數(shù)據才能通過網絡發(fā)送

    2024年02月08日
    瀏覽(20)
  • 干貨 | 成本低誤差小,攜程基于 Kafka 的 Serverless 延遲隊列的實踐

    干貨 | 成本低誤差小,攜程基于 Kafka 的 Serverless 延遲隊列的實踐

    作者簡介 Pin,關注 RPC、Service Mesh、Serverless 等云原生技術。 一、背景 隨著上云項目的不斷推進,大量的應用需要部署到 aws 上,其中有很多應用都依賴延遲隊列的功能。而在?aws?上,我們選擇以 Kafka 作為消息隊列,但是 Kafka 本身不支持延遲隊列,這就需要思考如何基于

    2024年02月13日
    瀏覽(15)
  • mysql怎么將已有的數(shù)據庫拷貝為另一個數(shù)據庫

    您可以使用MySQL的 CREATE DATABASE 和 CREATE TABLE 語句將已有的數(shù)據庫和其表拷貝到另一個數(shù)據庫中。下面是具體的步驟: 在MySQL客戶端中,使用 CREATE DATABASE 語句創(chuàng)建一個新的空數(shù)據庫。例如,如果您想將現(xiàn)有的數(shù)據庫 source_db 拷貝到新數(shù)據庫 target_db 中,可以執(zhí)行以下命令: 在

    2024年02月08日
    瀏覽(18)
  • 【Web安全】小白怎么快速挖到第一個漏洞,src漏洞挖掘經驗分享,絕對干貨!

    【Web安全】小白怎么快速挖到第一個漏洞,src漏洞挖掘經驗分享,絕對干貨!

    src漏洞挖掘經驗分享 – 掌控安全以恒 一、公益src 公益src是一個白帽子提交隨機發(fā)現(xiàn)的漏洞的品臺,我們可以把我們隨機發(fā)現(xiàn)或者是主動尋找到的漏洞在漏洞盒子進行提交。 在挖掘src的時候不能越紅線,一般情況下遇到SQL注入 只獲取數(shù)據庫名字以證明漏洞的存在即可,最好

    2024年02月13日
    瀏覽(25)
  • 【Unity】給游戲添加一個背景圖

    【Unity】給游戲添加一個背景圖

    一、選擇一張高清背景圖jpg,拖到Unity的Project面板下,設置Texture Type為Sprite。 二、在場景中創(chuàng)建一個空物體并命名為background,點擊add component添加一個sprite renderer組件,設置Sprite為想要的背景圖片,把Layer設置為新圖層Background。 三、設置相機的Clear Flags為Depth only,Culling Mas

    2024年02月11日
    瀏覽(88)
  • Java中將本服務器的文件拷貝到另一個服務器(Windows to Linux)

    在Java中,將文件從Windows服務器復制到Linux服務器,常用的方式是使用SSH進行安全的文件傳輸。Java有一個名為 JSch 的庫,可以用于SSH連接和操作。 首先,你需要將 JSch 添加到你的項目依賴中。如果你使用的是Maven,你可以添加以下依賴: 然后,你可以使用以下代碼將文件從

    2024年02月11日
    瀏覽(24)
  • Java中將本服務器的文件拷貝到另一個服務器(Linux to Linux)

    在Java中,將文件從一個服務器復制到另一個服務器,你可以使用Secure Copy(SCP)進行操作。Java中的 JSch 庫可以進行此操作。 首先,需要添加 JSch 庫依賴到你的項目中。如果你使用的是Maven,可以添加以下依賴: 以下是一個使用 JSch 進行SCP操作的示例: 這段代碼首先創(chuàng)建了一

    2024年02月13日
    瀏覽(29)
  • 記錄--一個純樣式花里胡哨的動態(tài)漸變背景塊

    記錄--一個純樣式花里胡哨的動態(tài)漸變背景塊

    閑來無事寫了個有意思的東西,鼠標放在小方塊上會放大并擠壓周圍方塊,背景顏色會動態(tài)改變。這里沒有用一行 js 代碼,純樣式(Sass)實現(xiàn)。 下面只展示核心代碼,完整代碼請參照上方代碼片段。 先畫一個 6 x 6 的正方形,利用 v-for 循環(huán)出 dom 元素。當然也可以不用 Vue

    2024年02月05日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包