DataStream API (基礎(chǔ)篇)
注: 本文只涉及DataStream
- 原因:隨著大數(shù)據(jù)和流式計算需求的增長,處理實時數(shù)據(jù)流變得越來越重要。因此,DataStream由于其處理實時數(shù)據(jù)流的特性和能力,逐漸替代了DataSet成為了主流的數(shù)據(jù)處理方式。
目錄
DataStream API (基礎(chǔ)篇)
前摘:
一、執(zhí)行環(huán)境
1. 創(chuàng)建執(zhí)行環(huán)境
2. 執(zhí)行模式
3. 觸發(fā)程序執(zhí)行
二、源算子(source)
三、轉(zhuǎn)換算子(Transformation)
四、輸出算子(sink)
前摘:
一個 Flink 程序,其實就是對 DataStream 的各種轉(zhuǎn)換。具體來說,代碼基本上都由以下幾 部分構(gòu)成,如圖所示:
- 獲取執(zhí)行環(huán)境(Execution Environment)
- 讀取數(shù)據(jù)源(Source)
- 定義基于數(shù)據(jù)的轉(zhuǎn)換操作(Transformations)
- 定義計算結(jié)果的輸出位置(Sink)
- 觸發(fā)程序執(zhí)行(Execute)
其中,獲取環(huán)境和觸發(fā)執(zhí)行,都可以認(rèn)為是針對執(zhí)行環(huán)境的操作。所以本章我們就從執(zhí)行 環(huán)境、數(shù)據(jù)源(source)、轉(zhuǎn)換操作(Transformation)、輸出(Sink)四大部分,對常用的 DataStream API 做基本介紹。
一、執(zhí)行環(huán)境
1. 創(chuàng)建執(zhí)行環(huán)境
- 編寫Flink程序的第一步就是創(chuàng)建執(zhí)行環(huán)境。
- 我 們 要 獲 取 的 執(zhí) 行 環(huán) 境 , 是 StreamExecutionEnvironment 類的對象,這是所有 Flink 程序的基礎(chǔ)
- 在代碼中創(chuàng)建執(zhí)行環(huán)境的 方式,就是調(diào)用這個類的靜態(tài)方法,具體有以下三種。
- getExecutionEnvironment
最簡單的方式,就是直接調(diào)用 getExecutionEnvironment 方法。它會根據(jù)當(dāng)前運(yùn)行的上下文 直接得到正確的結(jié)果;//此處的 env 是 StreamExecutionEnvironment 對象 val env = StreamExecutionEnvironment.getExecutionEnvironment
- createLocalEnvironment
這個方法返回一個本地執(zhí)行環(huán)境。可以在調(diào)用時傳入一個參數(shù),指定默認(rèn)的并行度;如果 不傳入,則默認(rèn)并行度就是本地的 CPU 核心數(shù)。//此處的 localEnvironment 是 StreamExecutionEnvironment 對象 val localEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
-
createRemoteEnvironment
這個方法返回集群執(zhí)行環(huán)境。需要在調(diào)用時指定 JobManager 的主機(jī)名和端口號,并指定 要在集群中運(yùn)行的 Jar 包。//此處的 remoteEnv 是 StreamExecutionEnvironment 對象 val remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // JobManager 主機(jī)名 1234, // JobManager 進(jìn)程端口號 "path/to/jarFile.jar" // 提交給 JobManager 的 JAR 包 )
2. 執(zhí)行模式
而從 1.12.0 版本起,F(xiàn)link 實現(xiàn)了 API 上的流批統(tǒng)一。DataStream API 新增了一個重要特 性:可以支持不同的“執(zhí)行模式”(execution mode),通過簡單的設(shè)置就可以讓一段 Flink 程序 在流處理和批處理之間切換。這樣一來,DataSet API 也就沒有存在的必要了。
- 流執(zhí)行模式(STREAMING) 這是 DataStream API 最經(jīng)典的模式,一般用于需要持續(xù)實時處理的無界數(shù)據(jù)流。默認(rèn)情 況下,程序使用的就是 STREAMING 執(zhí)行模式。
- 批執(zhí)行模式(BATCH) 專門用于批處理的執(zhí)行模式, 這種模式下,F(xiàn)link 處理作業(yè)的方式類似于 MapReduce 框架。 對于不會持續(xù)計算的有界數(shù)據(jù),我們用這種模式處理會更方便。
- 自動模式(AUTOMATIC) 在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界,來自動選擇執(zhí)行模式
由于 Flink 程序默認(rèn)是 STREAMING 模式,我們這里重點介紹一下 BATCH 模式的配置。 主要有兩種方式:
(1)通過命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作業(yè)時,增加 execution.runtime-mode 參數(shù),指定值為 BATCH。
(2)通過代碼配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
3. 觸發(fā)程序執(zhí)行
我們需要顯式地調(diào)用執(zhí)行環(huán)境的 execute()方法,來觸發(fā)程序執(zhí)行。execute()方法將一直等 待作業(yè)完成,然后返回一個執(zhí)行結(jié)果(JobExecutionResult)。
env.execute()
二、源算子(source)
Source源算子(基礎(chǔ)篇二)
三、轉(zhuǎn)換算子(Transformation)
Transformation轉(zhuǎn)換算子(基礎(chǔ)篇三)文章來源:http://www.zghlxwxcb.cn/news/detail-817784.html
四、輸出算子(sink)
持續(xù)更新中文章來源地址http://www.zghlxwxcb.cn/news/detail-817784.html
到了這里,關(guān)于大數(shù)據(jù)學(xué)習(xí)之Flink算子、了解DataStream API(基礎(chǔ)篇一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!