DataStream API 是 Flink 的核心層 API。一個 Flink 程序,其實就是對 DataStream 的各種轉(zhuǎn)換。具體來說,代碼基本上都由以下幾部分構成:
1)執(zhí)行環(huán)境(Execution Environment)
Flink 程序可以在各種上下文環(huán)境中運行:我們可以在本地 JVM 中執(zhí)行程序,也可以提交到遠程集群上運行。
不同的環(huán)境,代碼的提交運行的過程會有所不同。這就要求我們在提交作業(yè)執(zhí)行計算時,首先必須獲取當前 Flink 的運行環(huán)境,從而建立起與 Flink 框架之間的聯(lián)系。
1.1.創(chuàng)建執(zhí)行環(huán)境
我們要獲取的執(zhí)行環(huán)境,是 StreamExecutionEnvironment
類的對象,這是所有 Flink 程序的基礎。在代碼中創(chuàng)建執(zhí)行環(huán)境的方式,就是調(diào)用這個類的靜態(tài)方法,具體有以下三種。
1、getExecutionEnvironment
最簡單的方式,就是直接調(diào)用 getExecutionEnvironment
方法。它會根據(jù)當前運行的上下文直接得到正確的結(jié)果:如果程序是獨立運行的,就返回一個本地執(zhí)行環(huán)境;如果是創(chuàng)建了jar 包,然后從命令行調(diào)用它并提交到集群執(zhí)行,那么就返回集群的執(zhí)行環(huán)境。也就是說,這個方法會根據(jù)當前運行的方式,自行決定該返回什么樣的運行環(huán)境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
這種方式,用起來簡單高效,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
2、createLocalEnvironment
這個方法返回一個本地執(zhí)行環(huán)境??梢栽谡{(diào)用時傳入一個參數(shù),指定默認的并行度;如果不傳入,則默認并行度就是本地的 CPU 核心數(shù)。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
這個方法返回集群執(zhí)行環(huán)境。需要在調(diào)用時指定 JobManager 的主機名和端口號,并指定要在集群中運行的 Jar 包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主機名
1234, // JobManager 進程端口號
"path/to/jarFile.jar" // 提交給 JobManager 的 JAR 包
);
在獲取到程序執(zhí)行環(huán)境后,我們還可以對執(zhí)行環(huán)境進行靈活的設置。比如可以全局設置程序的并行度、禁用算子鏈,還可以定義程序的時間語義、配置容錯機制。
1.2.執(zhí)行模式(Execution Mode)
從 Flink 1.12 開始,官方推薦的做法是直接使用 DataStream API,在提交任務時通過將執(zhí)行模式設為 BATCH 來進行批處理。不建議使用 DataSet API。
// 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream API 執(zhí)行模式包括:流執(zhí)行模式、批執(zhí)行模式和自動模式。
-
流執(zhí)行模式(Streaming)
這是 DataStream API 最經(jīng)典的模式,一般用于需要持續(xù)實時處理的無界數(shù)據(jù)流。默認情況下,程序使用的就是 Streaming 執(zhí)行模式。
-
批執(zhí)行模式(Batch)
專門用于批處理的執(zhí)行模式。
-
自動模式(AutoMatic)
在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界,來自動選擇執(zhí)行模式。批執(zhí)行模式的使用。主要有兩種方式:
(1)通過命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作業(yè)時,增加 execution.runtime-mode 參數(shù),指定值為 BATCH。
(2)通過代碼配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代碼中,直接基于執(zhí)行環(huán)境調(diào)用 setRuntimeMode 方法,傳入 BATCH 模式。
實際應用中一般不會在代碼中配置,而是使用命令行,這樣更加靈活。
1.3.觸發(fā)程序執(zhí)行
需要注意的是,寫完輸出(sink)操作并不代表程序已經(jīng)結(jié)束。因為當 main() 方法被調(diào)用時,其實只是定義了作業(yè)的每個執(zhí)行操作,然后添加到數(shù)據(jù)流圖中;這時并沒有真正處理數(shù)據(jù)——因為數(shù)據(jù)可能還沒來。Flink 是由事件驅(qū)動的,只有等到數(shù)據(jù)到來,才會觸發(fā)真正的計算,這也被稱為“延遲執(zhí)行”或“懶執(zhí)行”。
所以我們需要顯式地調(diào)用執(zhí)行環(huán)境的 execute() 方法,來觸發(fā)程序執(zhí)行。execute() 方法將一直等待作業(yè)完成,然后返回一個執(zhí)行結(jié)果(JobExecutionResult)。
env.execute();
2)源算子(Source)
Flink 可以從各種來源獲取數(shù)據(jù),然后構建 DataStream 進行轉(zhuǎn)換處理。一般將數(shù)據(jù)的輸入來源稱為數(shù)據(jù)源(data source),而讀取數(shù)據(jù)的算子就是源算子(source operator)。所以,source 就是我們整個處理程序的輸入端。
在 Flink1.12 以前,舊的添加 source 的方式,是調(diào)用執(zhí)行環(huán)境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
方法傳入的參數(shù)是一個“源函數(shù)”(source function),需要實現(xiàn) SourceFunction 接口。
從 Flink1.12 開始,主要使用流批統(tǒng)一的新 Source 架構:
DataStreamSource<String> stream = env.fromSource(…)
Flink 直接提供了很多預實現(xiàn)的接口,此外還有很多外部連接工具也幫我們實現(xiàn)了對應的 Source,通常情況下足以應對我們的實際需求。
2.1.準備工作
具體代碼如下:
public class WaterSensor {
public String id;
public Long ts;
public Integer vc;
// 一定要提供一個 空參 的構造器
public WaterSensor() {
}
public WaterSensor(String id, Long ts, Integer vc) {
this.id = id;
this.ts = ts;
this.vc = vc;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTs() {
return ts;
}
public void setTs(Long ts) {
this.ts = ts;
}
public Integer getVc() {
return vc;
}
public void setVc(Integer vc) {
this.vc = vc;
}
@Override
public String toString() {
return "WaterSensor{" +
"id='" + id + '\'' +
", ts=" + ts +
", vc=" + vc +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WaterSensor that = (WaterSensor) o;
return Objects.equals(id, that.id) &&
Objects.equals(ts, that.ts) &&
Objects.equals(vc, that.vc);
}
@Override
public int hashCode() {
return Objects.hash(id, ts, vc);
}
}
這里需要注意,我們定義的 WaterSensor,有這樣幾個特點:
- 類是公有(public)的
- 有一個
無參的構造方法
- 所有屬性都是公有(public)的
- 所有屬性的類型都是可以序列化的
Flink 會把這樣的類作為一種特殊的 POJO
(Plain Ordinary Java Object 簡單的 Java 對象,實際就是普通 JavaBeans)數(shù)據(jù)類型來對待,方便數(shù)據(jù)的解析和序列化。另外我們在類中還重寫了 toString 方法,主要是為了測試輸出顯示更清晰。
我們這里自定義的 POJO 類會在后面的代碼中頻繁使用,所以在后面的代碼中碰到,把這里的 POJO 類導入就好了。
2.2.從集合中讀取數(shù)據(jù)
最簡單的讀取數(shù)據(jù)的方式,就是在代碼中直接創(chuàng)建一個 Java 集合,然后調(diào)用執(zhí)行環(huán)境的 fromCollection 方法進行讀取。這相當于將數(shù)據(jù)臨時存儲到內(nèi)存中,形成特殊的數(shù)據(jù)結(jié)構后,作為數(shù)據(jù)源使用,一般用于測試。
public class CollectionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 從集合讀取數(shù)據(jù)
DataStreamSource<Integer> source = env
.fromElements(1,2,33); // 從元素讀
// .fromCollection(Arrays.asList(1, 22, 3)); // 從集合讀
source.print();
env.execute();
}
}
2.3.從文件讀取數(shù)據(jù)
真正的實際應用中,自然不會直接將數(shù)據(jù)寫在代碼中。通常情況下,我們會從存儲介質(zhì)中獲取數(shù)據(jù),一個比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式。
讀取文件,需要添加文件連接器依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
示例如下:
public class FileSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 從文件讀: 新Source架構
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/word.txt")
)
.build();
env
.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
.print();
env.execute();
}
}
/**
*
* 新的Source寫法:
* env.fromSource(Source的實現(xiàn)類,Watermark,名字)
*
*/
說明:
- 參數(shù)可以是目錄,也可以是文件;還可以從 HDFS 目錄下讀取,使用路徑 hdfs://…;
- 路徑可以是相對路徑,也可以是絕對路徑;
- 相對路徑是從系統(tǒng)屬性 user.dir 獲取路徑:idea 下是 project 的根目錄,standalone 模式下是集群節(jié)點根目錄;
2.4.從 Socket 讀取數(shù)據(jù)
不論從集合還是文件,我們讀取的其實都是有界數(shù)據(jù)。在流處理的場景中,數(shù)據(jù)往往是無界的。
我們之前用到的讀取 socket 文本流,就是流處理場景。但是這種方式由于吞吐量小、穩(wěn)定性較差,一般也是用于測試。
DataStream<String> stream = env.socketTextStream("localhost", 7777);
2.5.從 Kafka 讀取數(shù)據(jù)
Flink 官方提供了連接工具 flink-connector-kafka ,直接幫我們實現(xiàn)了一個消費者 FlinkKafkaConsumer,它就是用來讀取 Kafka 數(shù)據(jù)的 SourceFunction。
所以想要以 Kafka 作為數(shù)據(jù)源獲取數(shù)據(jù),我們只需要引入 Kafka 連接器的依賴。Flink 官方提供的是一個通用的 Kafka 連接器,它會自動跟蹤最新版本的 Kafka 客戶端。目前最新版本只支持 0.10.0 版本以上的 Kafka。這里我們需要導入的依賴如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
代碼如下:
public class KafkaSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 從Kafka讀: 新Source架構
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定kafka節(jié)點的地址和端口
.setGroupId("atguigu") // 指定消費者組的id
.setTopics("topic_1") // 指定消費的 Topic
.setValueOnlyDeserializer(new SimpleStringSchema()) // 指定 反序列化器,這個是反序列化value
.setStartingOffsets(OffsetsInitializer.latest()) // flink消費kafka的策略
.build();
env
// .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
.print();
env.execute();
}
}
/**
* kafka消費者的參數(shù):
* auto.reset.offsets
* earliest: 如果有offset,從offset繼續(xù)消費; 如果沒有offset,從 最早 消費
* latest : 如果有offset,從offset繼續(xù)消費; 如果沒有offset,從 最新 消費
*
* flink的kafkasource,offset消費策略:OffsetsInitializer,默認是 earliest
* earliest: 一定從 最早 消費
* latest : 一定從 最新 消費
*
*
*
*/
2.6.從數(shù)據(jù)生成器讀取數(shù)據(jù)
Flink 從 1.11 開始提供了一個內(nèi)置的 DataGen 連接器,主要是用于生成一些隨機數(shù),用于在沒有數(shù)據(jù)源的時候,進行流任務的測試以及性能測試等。1.17 提供了新的 Source 寫法,需要導入依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
代碼如下:
public class DataGeneratorDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 如果有n個并行度, 最大值設為a
// 將數(shù)值 均分成 n份, a/n ,比如,最大100,并行度2,每個并行度生成50個
// 其中一個是 0-49,另一個50-99
env.setParallelism(2);
/**
* 數(shù)據(jù)生成器Source,四個參數(shù):
* 第一個: GeneratorFunction接口,需要實現(xiàn), 重寫map方法, 輸入類型固定是Long
* 第二個: long類型, 自動生成的數(shù)字序列(從0自增)的最大值(小于),達到這個值就停止了
* 第三個: 限速策略, 比如 每秒生成幾條數(shù)據(jù)
* 第四個: 返回的類型
*/
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
100,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
env
.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator")
.print();
env.execute();
}
}
2.7.Flink 支持的數(shù)據(jù)類型
1、Flink 的類型系統(tǒng)
Flink 使用“類型信息”(TypeInformation)來統(tǒng)一表示數(shù)據(jù)類型。TypeInformation 類是 Flink 中所有類型描述符的基類。它涵蓋了類型的一些基本屬性,并為每個數(shù)據(jù)類型生成特定的序列化器、反序列化器和比較器。
2、Flink 支持的數(shù)據(jù)類型
對于常見的 Java 和 Scala 數(shù)據(jù)類型,F(xiàn)link 都是支持的。Flink 在內(nèi)部,F(xiàn)link 對支持不同的類型進行了劃分,這些類型可以在 Types 工具類中找到:
(1)基本類型
所有 Java 基本類型及其包裝類,再加上 Void、String、Date、BigDecimal 和 BigInteger。
(2)數(shù)組類型
包括基本類型數(shù)組(PRIMITIVE_ARRAY)和對象數(shù)組(OBJECT_ARRAY)。
(3)復合數(shù)據(jù)類型
- Java 元組類型(TUPLE):這是 Flink 內(nèi)置的元組類型,是 Java API 的一部分。最多 25 個字段,也就是從 Tuple0~Tuple25,不支持空字段。
- Scala 樣例類及 Scala 元組:不支持空字段。
- 行類型(ROW):可以認為是具有任意個字段的元組,并支持空字段。
- POJO:Flink 自定義的類似于 Java bean 模式的類。
(4)輔助類型
Option、Either、List、Map 等。
(5)泛型類型(GENERIC)
Flink 支持所有的 Java 類和 Scala 類。不過如果沒有按照上面 POJO 類型的要求來定義,就會被 Flink 當作泛型類來處理。Flink 會把泛型類型當作黑盒,無法獲取它們內(nèi)部的屬性;它們也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。
在這些類型中,元組類型和 POJO 類型最為靈活,因為它們支持創(chuàng)建復雜類型。而相比之下,POJO 還支持在鍵(key)的定義中直接使用字段名,這會讓我們的代碼可讀性大大增加。所以,在項目實踐中,往往會將流處理程序中的元素類型定為 Flink 的 POJO 類型。
Flink 對 POJO 類型的要求如下:
- 類是公有(public)的
- 有一個無參的構造方法
- 所有屬性都是公有(public)的
- 所有屬性的類型都是可以序列化的
3、類型提示(Type Hints)
Flink 還具有一個類型提取系統(tǒng),可以分析函數(shù)的輸入和返回類型,自動獲取類型信息,從而獲得對應的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情況下(比如 Lambda 表達式中),自動提取的信息是不夠精細的——只告訴 Flink 當前的元素由“船頭、船身、船尾”構成,根本無法重建出“大船”的模樣;這時就需要顯式地提供類型信息,才能使應用程序正常工作或提高其性能。
為了解決這類問題,Java API 提供了專門的“類型提示”(type hints)
。
回憶一下之前的 word count 流處理程序,我們在將 String 類型的每個詞轉(zhuǎn)換成(word,count)二元組后,就明確地用 returns 指定了返回的類型。因為對于 map 里傳入的 Lambda 表達式,系統(tǒng)只能推斷出返回的是 Tuple2 類型,而無法得到 Tuple2<String, Long>。只有顯式地告訴系統(tǒng)當前的返回類型,才能正確地解析出完整數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-814530.html
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink 還專門提供了 TypeHint
類,它可以捕獲泛型的類型信息,并且一直記錄下來,為運行時提供足夠的信息。我們同樣可以通過.returns()
方法,明確地指定轉(zhuǎn)換之后的 DataStream 里元素的類型。文章來源地址http://www.zghlxwxcb.cn/news/detail-814530.html
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
到了這里,關于【Flink-1.17-教程】-【四】Flink DataStream API(1)源算子(Source)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!