01 基本概念
Apache Flink 是一個流式處理框架,被廣泛應(yīng)用于大數(shù)據(jù)領(lǐng)域的實時數(shù)據(jù)處理和分析任務(wù)中。在 Flink 中,F(xiàn)ileSource 是一個重要的組件,用于從文件系統(tǒng)中讀取數(shù)據(jù)并將其轉(zhuǎn)換為 Flink 的數(shù)據(jù)流。本文將深入探討 FileSource 的工作原理、用法以及與其他數(shù)據(jù)源的比較。
02 工作原理
FileSource 是 Flink 提供的一種用于從文件系統(tǒng)中讀取數(shù)據(jù)的源。它能夠處理各種類型的文件,包括文本文件、壓縮文件、序列文件等。FileSource 的工作原理可以概括為以下幾個步驟:
1.文件分配(File Assignment)
在 Flink 集群中,每個任務(wù)都會負責讀取文件的一個分片。FileSource 會根據(jù)文件的大小和數(shù)量將文件分配給不同的任務(wù)進行處理。
2.并行讀?。≒arallel Reading)
每個任務(wù)會并行地讀取分配給它的文件分片。這意味著文件中的數(shù)據(jù)會被同時讀取,從而提高了整體的讀取速度和處理效率。
3.數(shù)據(jù)解析(Data Parsing)
讀取的數(shù)據(jù)會經(jīng)過解析器進行解析,將其轉(zhuǎn)換為 Flink 中的數(shù)據(jù)結(jié)構(gòu),如 DataSet 或 DataStream。
4.數(shù)據(jù)分發(fā)(Data Distribution)
解析后的數(shù)據(jù)會被分發(fā)到后續(xù)的算子中進行進一步的處理和分析。
03 數(shù)據(jù)流實現(xiàn)
-
有界流(Bounded Streams)
有界流是指具有明確結(jié)束點的數(shù)據(jù)流,即數(shù)據(jù)流在某個時刻會結(jié)束,數(shù)據(jù)量是有限的。例如,從靜態(tài)文件、數(shù)據(jù)庫或有限數(shù)據(jù)集中讀取的數(shù)據(jù)流就是有界流。有界流的特點包括:
- 數(shù)據(jù)量是有限的,流的結(jié)束點是已知的。
- 可以對整個數(shù)據(jù)流進行批處理式的分析和處理,因為所有數(shù)據(jù)都可用且有限。
- 可以使用批處理算法和優(yōu)化技術(shù),例如排序、分組聚合等。
-
無界流(Unbounded Streams)
無界流是指沒有明確結(jié)束點的數(shù)據(jù)流,即數(shù)據(jù)流會持續(xù)不斷地產(chǎn)生,數(shù)據(jù)量可能是無限的。例如,實時傳感器數(shù)據(jù)、日志流、消息隊列中的數(shù)據(jù)等都是無界流。無界流的特點包括:
- 數(shù)據(jù)源持續(xù)不斷地產(chǎn)生數(shù)據(jù),流沒有明確的結(jié)束點。
- 通常用于實時流式處理,要求系統(tǒng)能夠?qū)崟r處理數(shù)據(jù)并在流中進行持續(xù)的分析和計算。
- 需要采用流式處理的技術(shù)和算法,例如窗口計算、流式聚合、事件時間處理等。
-
不同數(shù)據(jù)流實現(xiàn)
-
創(chuàng)建一個 File Source 時, 默認情況下,Source 為有界/批的模式;
//創(chuàng)建一個FileSource數(shù)據(jù)源,并設(shè)置為批模式,讀取完文件后結(jié)束 final FileSource<String> source = FileSource.forRecordStreamFormat(...) .build();
-
設(shè)置參數(shù)monitorContinuously(Duration.ofMillis(5)) 可以把 Source 設(shè)置為持續(xù)的流模式
//創(chuàng)建一個FileSource數(shù)據(jù)源,并設(shè)置為流模式,每隔5分鐘檢查路徑新文件,并讀取 final FileSource<String> source = FileSource.forRecordStreamFormat(...) .monitorContinuously(Duration.ofMillis(5)) .build();
-
04 項目實戰(zhàn)
1.FileSource支持多種數(shù)據(jù)格式數(shù)據(jù)讀取與解析,本期以Text File文件為例展開。
2.jdk版本11
3.Flink版本1.18.0
4.下面是兩個簡單的示例代碼,演示如何在 Flink 中使用 FileSource 讀取文件數(shù)據(jù)
4.1 項目結(jié)構(gòu)
4.2 maven依賴
<!-- flink讀取Text File文件依賴 start-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.18.0</version>
</dependency>
<!-- flink讀取Text File文件依賴 end-->
<!-- flink基礎(chǔ)依賴 start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<!-- flink基礎(chǔ)依賴 end -->
4.3 StreamFormat讀取文件數(shù)據(jù)
- StreamFormat從文件流中讀取文件內(nèi)容。它是最簡單的格式實現(xiàn), 并且提供了許多拆箱即用的特性(如 Checkpoint 邏輯),但是限制了可應(yīng)用的優(yōu)化(例如對象重用,批處理等等)。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
/**
* 描述:
* flink集成FileSource & forRecordStreamFormat使用 & 流模式
* StreamFormat:從文件流中讀取文件內(nèi)容。它是最簡單的格式實現(xiàn),
* 并且提供了許多拆箱即用的特性(如 Checkpoint 邏輯),
* 但是限制了可應(yīng)用的優(yōu)化(例如對象重用,批處理等等)。
*
* @author 淺夏的貓
* @version 1.0.0
* @date 2024-02-07 15:30:22
*/
public class FileSourceRecordStreamingJob {
public static void main(String[] args) throws Exception {
// 創(chuàng)建 需要讀取的文件路徑Path
Path path = new Path("D:\\flink\\file_source.txt");
// 創(chuàng)建 讀取文件的格式函數(shù)
TextLineInputFormat textLineInputFormat = new TextLineInputFormat();
// 創(chuàng)建 FileSource
FileSource<String> fileSource = FileSource.
forRecordStreamFormat(textLineInputFormat, path)
//放開注釋則使用流模式,每隔5分鐘檢查是否有新文件否則默認使用批模式
// .monitorContinuously(Duration.ofMillis(5))
.build();
// 創(chuàng)建 執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加 FileSource 到數(shù)據(jù)流
env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();
// 執(zhí)行任務(wù)
env.execute("FileSourceRecordStreamingJob");
}
}
4.4 BulkFormat讀取文件數(shù)據(jù)
- BulkFormat從文件中一次讀取一批記錄,雖然是最 “底層” 的格式實現(xiàn),但是提供了優(yōu)化實現(xiàn)的最大靈活性。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
/**
* 描述:flink集成FileSource & forBulkFileFormat使用 & 流模式
* BulkFormat:從文件中一次讀取一批記錄。 它雖然是最 “底層” 的格式實現(xiàn),但是提供了優(yōu)化實現(xiàn)的最大靈活性。
*
* @author 淺夏的貓
* @version 1.0.0
* @date 2024-02-07 15:30:22
*/
public class FileSourceBulkStreamingJob {
public static void main(String[] args) throws Exception {
//創(chuàng)建 批量讀取文件的格式函數(shù),其實底層還是通過對單行文件讀取
BulkFormat<String, FileSourceSplit> bulkFormat = new StreamFormatAdapter<>(new TextLineInputFormat());
// 創(chuàng)建 FileSource
FileSource<String> fileSource = FileSource.
forBulkFileFormat(bulkFormat, new Path("D:\\flink\\file_source.txt"))
//放開注釋則使用流模式,每隔5分鐘檢查是否有新文件,否則默認使用批模式
// .monitorContinuously(Duration.ofMillis(5))
.build();
// 創(chuàng)建 執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加 FileSource 到數(shù)據(jù)流
env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "FileSource").print();
// 執(zhí)行任務(wù)
env.execute("FileSourceBulkStreamingJob");
}
}
4.5 使用小結(jié)
在上面的示例中,我們使用FileSource方法從指定路徑讀取文本文件,并將其轉(zhuǎn)換為一個數(shù)據(jù)流,選擇不同的輸入格式和解析方式,然后我們調(diào)用 print 方法將數(shù)據(jù)流中的數(shù)據(jù)打印出來。
05 數(shù)據(jù)源比較
FileSource 是 Flink 中常用的數(shù)據(jù)源之一,與其他數(shù)據(jù)源相比,它具有一些優(yōu)勢和劣勢,根據(jù)實際情況和需求,可以選擇不同的數(shù)據(jù)源來滿足任務(wù)的要求。
-
優(yōu)勢:
- 支持讀取大規(guī)模的文件數(shù)據(jù),適用于大數(shù)據(jù)處理場景。
- 支持并行讀取和處理,能夠充分利用集群資源,提高處理效率。
- 支持多種文件格式和壓縮方式,靈活性強。
-
劣勢:
- 對于小文件的處理效率較低,可能會導(dǎo)致資源浪費和性能下降。
- 無法實時監(jiān)控文件的變化,需要手動觸發(fā)重新讀取。
06 總結(jié)
FileSource 是 Apache Flink 中用于讀取文件數(shù)據(jù)的重要組件,它能夠高效地處理大規(guī)模的文件數(shù)據(jù),并提供豐富的功能和靈活的用法。通過深入了解 FileSource 的工作原理和用法,可以更好地利用 Flink 來實現(xiàn)大規(guī)模數(shù)據(jù)文件的處理和分析任務(wù)。文章來源:http://www.zghlxwxcb.cn/news/detail-834242.html
通過以上詳細介紹,可以對 Apache Flink 中的 FileSource 有一個全面的了解,從而更好地應(yīng)用于實際的數(shù)據(jù)處理項目中文章來源地址http://www.zghlxwxcb.cn/news/detail-834242.html
到了這里,關(guān)于【天衍系列 01】深入理解Flink的 FileSource 組件:實現(xiàn)大規(guī)模數(shù)據(jù)文件處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!