一.基本概念
官網(wǎng)介紹
Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink 被設(shè)計為在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存中的速度和任何規(guī)模執(zhí)行計算。
1.無限流有一個開始,但沒有定義的結(jié)束。它們不會在生成數(shù)據(jù)時終止并提供數(shù)據(jù)。必須連續(xù)處理無限流,即事件必須在攝取后立即處理。不可能等待所有輸入數(shù)據(jù)到達(dá),因為輸入是無限的,并且在任何時間點都不會完成。處理無界數(shù)據(jù)通常需要按特定順序(例如事件發(fā)生的順序)引入事件,以便能夠推斷結(jié)果完整性。(即實時數(shù)據(jù))
2.有界流具有定義的開始和結(jié)束??梢酝ㄟ^在執(zhí)行任何計算之前引入所有數(shù)據(jù)來處理有界流。處理有界流不需要有序引入,因為始終可以對有界數(shù)據(jù)集進(jìn)行排序。有界流的處理也稱為批處理。(即存儲的數(shù)據(jù))
有狀態(tài)流處-flink處理流程
較為合適的應(yīng)用場景
傳統(tǒng)事務(wù)處理
二.Flink和Spark
-
概念區(qū)別
- Spark強(qiáng)勁的分布式大數(shù)據(jù)處理框架.它使用內(nèi)存中緩存和優(yōu)化的查詢執(zhí)行方式,可針對任何規(guī)模的數(shù)據(jù)進(jìn)行快速分析查詢,支持跨多個工作負(fù)載重用代碼—批處理、交互式查詢、實時分析、機(jī)器學(xué)習(xí)和圖形處理等。Spark底層基于批處理.(流是批處理不可切分的特殊情況)
- Flink基于流(批處理是一種有界流)
-
數(shù)據(jù)模型
- spark采用RDD模型,spark streaming 的 DStream 實際上也就是一組組小批數(shù)據(jù)RDD的集合
- flink基本數(shù)據(jù)模型是數(shù)據(jù)流,以及事件(Event)序列
-
運(yùn)行時架構(gòu)
- spark是批計算,將DAG劃分為不同的 stage,一個完成后才可以計算下一個
- flink是標(biāo)準(zhǔn)的流執(zhí)行模式,一個事件在一個節(jié)點處理完后可以直接發(fā)往下一個節(jié)點進(jìn)行處理
三. Flink配置文件
jobmanager.sh 資源調(diào)度,工作分配腳本
taskmanager.sh 工作任務(wù)執(zhí)行腳本
flink 啟動集群后,命令執(zhí)行器
四. yarn部署flink
4.1 session-cluster模式
# 啟動hadhoop集群
# -n(--container) taskManager的數(shù)量 不建議指定.動態(tài)分配
# -s(--slot) 每個taskManager的slot數(shù)量,默認(rèn)一個slot一個core.默認(rèn)每個taskmanager的slot個數(shù)為1
# -jm: jobManager的內(nèi)存 mb.
# -tm: 每個taskManager的內(nèi)存 mb
# -nm: yarn的appName
./yarn-session.sh -s 2 -jm 1024 -tm 1024 -nm test -d
# 提交job
./flink run -c com.vector.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost -port 7777
./flink list -a
# 取消yarn-session
yarn application --kill application_12451231_0001
4.2 pre-job-cluster模式
1)啟動hadoop集群(略)
2)不啟動yarn-session ,直接執(zhí)行job
./flink run -m yarn-cluster -c com.vector.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost -port 7777
??五.Flink運(yùn)行時架構(gòu)
flink運(yùn)行時組件: jobManager,TaskManager,ResourceManager,Dispacher
JobManager控制一個應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說,每個應(yīng)用程序都會被一個不同的JobManager所控制執(zhí)行。
- JobManager 會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括: 作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。
- JobManager 會把JobGraph轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖,這個圖被叫做"“執(zhí)行圖”(ExecutionGraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。
- JobManager 會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器( TaskManager)上的插槽((slot)。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運(yùn)行它們的TaskManager上。而在運(yùn)行過程中,JobManager會負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)
TaskManager
- Flink中的工作進(jìn)程。通常在Flink中會有多個TaskManager運(yùn)行,每一個TaskManager都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量。
- 啟動之后,TaskManager會向資源管理器注冊它的插槽;收到資源管理器的指令后,TaskManager就會將一個或者多個插槽提供給JobManager調(diào)用。JobManager就可以向插槽分配任務(wù)(tasks)來執(zhí)行了。
- 在執(zhí)行過程中,一個TaskManager可以跟其它運(yùn)行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)。
ResourceManager
- 主要負(fù)責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot) ,TaskManger插槽是Flink中定義的處理資源單元。
- Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARN.Mesos、K8s,以及standalone部署。
- 當(dāng)JobManager申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配給JobManager。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供平臺發(fā)起會話,以提供啟動TaskManager進(jìn)程的容器。
Dispacher
- 可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了REST接口。
- 當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給一個JobManager。
- Dispatcher也會啟動一個Web Ul,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息。
- Dispatcher在架構(gòu)中可能并不是必需的,這取決于應(yīng)用提交運(yùn)行的方式。
5.1 任務(wù)提交流程
5.2 如何實現(xiàn)并行計算
并行度 可以在代碼中指定,提交job指定,也可以在集群配置給默認(rèn)的并行度.
優(yōu)先級:代碼>提交job>集群配置的并行度
- 一個特定算子的子任務(wù) (subtask)的個數(shù)被稱之為其并行度(parallelism) 。一般情況下,一個stream 的并行度,可以認(rèn)為就是其所有算子中最大的并行度。
slots
推薦按照cpu核心數(shù)設(shè)置slot
- Flink 中每一個TaskManager都是一個JVM進(jìn)程,它可能會在獨立的線程上執(zhí)行一個或多個子任務(wù)
- 為了控制一個TaskManager能接收多少個task,taskManager通過task slot來進(jìn)行控制(一個TaskManager至少有一個slot)
- 默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享slot,即使它們是不同任務(wù)的子任務(wù)。這樣的結(jié)果是,一個slot可以保存作業(yè)的整個管道。
- Task Slot是靜態(tài)的概念,是指TaskManager具有的并發(fā)執(zhí)行能力
至少需要的slot數(shù) = SUM(MAX(同一個共享組的任務(wù)數(shù),同一個共享組的任務(wù)數(shù)的最大并行度))
情況1
情況2
.setParallelism(4).slotSharingGroup("01"); 設(shè)置并行度和共享組 顯示設(shè)置共享組可以指定不同的slot并行執(zhí)行.如果有地方?jīng)]配,則和前一個處于同一個共享組.如果為首部.則為defalut共享組
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設(shè)置并行度
env.setParallelism(8);
// 從文件中讀取數(shù)據(jù) 有界流
// String inputPath = System.getProperty("user.dir") + "/src/main/resources/text.txt";
// FileSource<String> source = FileSource
// .forRecordStreamFormat(
// new TextLineInputFormat("UTF-8"),
// new Path(inputPath))
// .build();
// DataStream<String> inputDataStream =
// env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "text");
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
// String host = parameterTool.get("host");
// int port = parameterTool.getInt("port");
// 從socket文本流讀取數(shù)據(jù) nc -lk 7777 無界流
DataStreamSource<String> inputDataStream =
env.socketTextStream("localhost", 7777);
// 基于數(shù)據(jù)流進(jìn)行轉(zhuǎn)換計算
SingleOutputStreamOperator<Tuple2<String, Integer>> resultSet =
inputDataStream.flatMap(new WordCount.MyFlatMapper())
.slotSharingGroup("02")
.keyBy(KeySelector -> KeySelector.f0)
.sum(1).setParallelism(4).slotSharingGroup("01");
resultSet.print();
// 執(zhí)行任務(wù)
env.execute();
}
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
// 按句號分詞
String[] words = s.split("");
// 遍歷所有word,包成二元組輸出
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
5.3 執(zhí)行圖
Flink 中的執(zhí)行圖可以分成四層: StreamGraph-> JobGraph -> ExecutionGraph->物理執(zhí)行圖
StreamGraph:是根據(jù)用戶通過Stream API編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。
JobGraph: StreamGraph經(jīng)過優(yōu)化后生成了JobGraph,提交給JobManager的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點chain 在一起作為一個節(jié)點
ExecutionGraph: JobManager根據(jù)JobGraph生成ExecutionGraph。ExecutionGraph: 是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
物理執(zhí)行圖: JobManager根據(jù)ExecutionGraph對Job進(jìn)行調(diào)度后,在各個TaskManager上部署Task后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。
5.4 數(shù)據(jù)的傳輸形式
算子之間傳輸數(shù)據(jù)的形式可以是one-to-one (forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決于算子的種類
One-to-one: stream維護(hù)著分區(qū)以及元素的順序(比如source和map之間)。這意味著map算子的子任務(wù)看到的元素的個數(shù)以及順序跟source算子的子任務(wù)生產(chǎn)的元素的個數(shù)、順序相同。map、fliter、flatMap等算子都是one-to-one的對應(yīng)關(guān)系。
Redistributing: stream的分區(qū)會發(fā)生改變。每一個算子的子任務(wù)依據(jù)所選擇的transformation發(fā)送數(shù)據(jù)到不同的目標(biāo)任務(wù)。例如,keyBy基于hashCode重分區(qū)、而broadcast和rebalance 會隨機(jī)重新分區(qū),這些算子都會引起redistribute過程,而redistribute過程就類似于Spark 中的shuffle 過程。
5.5 任務(wù)鏈
Flink 采用了一種稱為任務(wù)鏈的優(yōu)化技術(shù),可以在特定條件下減少本地通信的開銷。為了滿足任務(wù)鏈的要求,必須將兩個或多個算子設(shè)為相同的并行度,并通過本地轉(zhuǎn)發(fā)(local forward)的方式進(jìn)行連接
·相同并行度的one-to-one操作,F(xiàn)link這樣相連的算子鏈接在一起形成一個task,原來的算子成為里面的subtask
·并行度相同、并且是one-to-one操作
·位于同一個共享組 三個條件缺一不可
六. 流處理API
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
flink系列 1.17.0版本
創(chuàng)建一個執(zhí)行環(huán)境,表示當(dāng)前執(zhí)行程序的上下文。如果程序是獨立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境,也就是說,getExecutionEnvironment會根據(jù)查詢運(yùn)行的方式?jīng)Q定返回什么樣的運(yùn)行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
如果沒有設(shè)置并行度默認(rèn)flink-conf.yaml配置文件的1
// 封裝了對本地執(zhí)行環(huán)境和遠(yuǎn)程執(zhí)行環(huán)境的判斷 流處理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 批處理
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
source.txt
sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,32.8
sensor_1,1547714191,26.8
sensor_3,1547718202,6.7
SensorReadingEntity .class
// 傳感器溫度讀數(shù)數(shù)據(jù)類型
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SensorReadingEntity {
private String id;
private Long timestamp;
private Double temperature;
}
6.1文件處理
public class SourceTest2_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 從文件中讀取數(shù)據(jù)
String inputPath = System.getProperty("user.dir") + "/src/main/resources/source.txt";
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path(inputPath))
.build();
DataStream<String> dataStream = env
.fromSource(build, WatermarkStrategy.noWatermarks(),"source.txt");
// 打印輸出
dataStream.print();
env.execute();
}
}
??6.2kafka處理
基本配置
# 修改kafka主機(jī)ip 配置localhost或127.0.0.1在wsl上可能會有問題
conf/server.properties
listeners = PLAINTEXT://非回環(huán)ip:9092
advertised.listeners=PLAINTEXT://非回環(huán)ip:9092
#開啟kafka zookeeper服務(wù)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 開啟kafka服務(wù)
bin/kafka-server-start.sh config/server.properties
實例化交換機(jī)消息隊列
# 交換機(jī)ip端口 172.27.188.96:9092 主題交換機(jī)名稱aaaaa
bin/kafka-console-producer.sh --broker-list 172.27.188.96:9092 --topic aaaaa
java連接代碼
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String brokers = "172.27.188.96:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("aaaaa")
// .setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest()) // 從最早的數(shù)據(jù)開始讀取
.setValueOnlyDeserializer(new SimpleStringSchema()) // 只需要value
.build();
DataStream<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 打印
kafkaSource.print();
// 執(zhí)行
env.execute();
}
??6.3 自定義數(shù)據(jù)源
/**
* @author YuanJie
* @projectName flink
* @package com.vector.apitest
* @className com.vector.apitest.SourceTest4_UDF
* @copyright Copyright 2020 vector, Inc All rights reserved.
* @date 2023/8/21 15:15
*/
public class SourceTest4_UDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 從自定義的數(shù)據(jù)源讀取數(shù)據(jù)
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
// 打印輸出
dataStream.print();
// 執(zhí)行
env.execute();
}
// 實現(xiàn)自定義的SourceFunction<OUT>接口,自定義的數(shù)據(jù)源
private static class MySensorSource implements SourceFunction<SensorReading> {
// 定義一個標(biāo)志位,用來表示數(shù)據(jù)源是否正常運(yùn)行發(fā)出數(shù)據(jù)
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> sourceContext) throws Exception {
// 定義一個隨機(jī)數(shù)發(fā)生器
Random random = new Random();
// 設(shè)置10個傳感器的初始溫度 0~120℃正態(tài)分布
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
}
while (running) {
// 在while循環(huán)中,隨機(jī)生成SensorReading數(shù)據(jù)
for (String sensorId : sensorTempMap.keySet()) {
// 在當(dāng)前溫度基礎(chǔ)上隨機(jī)波動
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId, newTemp);
sourceContext.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
}
// 每隔1秒鐘發(fā)送一次傳感器數(shù)據(jù)
TimeUnit.MILLISECONDS.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
??6.4 Transform-轉(zhuǎn)換算子
6.4.1 map
6.4.2 flatMap
6.4.3 filter
demo
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 從文件讀數(shù)據(jù)
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
// map 轉(zhuǎn)換成長度輸出
DataStream<Integer> map = env.fromSource(build, WatermarkStrategy.noWatermarks(), "source.txt")
.map(String::length);
// flatMap 按逗號分隔
DataStream<String> flatMap = env.fromSource(build, WatermarkStrategy.noWatermarks(), "source.txt")
.flatMap((String s,Collector<String> collector) -> {
String[] split = s.split(",");
for (String s1 : split) {
collector.collect(s1);
}
})
.returns(Types.STRING);
// filter 篩選 sensor_1 開頭的id
DataStream<String> filter = env.fromSource(build, WatermarkStrategy.noWatermarks(), "source.txt")
.filter(s -> s.startsWith("sensor_1"));
map.print("map");
flatMap.print("flatMap");
filter.print("filter");
env.execute();
}
6.5 ??分組聚合
6.5.1 keyBy
Flink中分組后才能聚合
根據(jù)某個字段將數(shù)據(jù)分到不同分區(qū).每個分區(qū)包含相同的key. 內(nèi)部以hash形式實現(xiàn).(存在hash沖突,因此能保證需要的均在某分區(qū),但無法保證該分區(qū)key唯一)
6.5.2 滾動聚合算子(Rolling Aggregation)
這些算子可以針對KeyedStream的每一個支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
demo - max
求每組中首個設(shè)備溫度變化
sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,39.8
sensor_1,1547714191,40.8
sensor_3,1547718202,6.7
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
DataStream<String> streamSource = env.fromSource(build,
WatermarkStrategy.noWatermarks(),
"source.txt");
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
// 分組 滾動聚合
DataStream<SensorReadingEntity> temperature = dataStream.keyBy(SensorReadingEntity::getId)
// 滾動聚合
.max("temperature");
temperature.print("temperature");
env.execute();
}
demo-maxBy
求實時數(shù)據(jù)中每組當(dāng)前最大溫度數(shù)據(jù)
6.5.3 reduce
求最大溫度值,以及當(dāng)前最新的時間戳
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
DataStream<String> streamSource = env.fromSource(build,
WatermarkStrategy.noWatermarks(),
"source.txt");
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
// reduce 聚合 求最大溫度值,以及當(dāng)前最新的時間戳
DataStream<SensorReadingEntity> reduce = dataStream.keyBy(SensorReadingEntity::getId)
.reduce((curState, newData) -> {
return new SensorReadingEntity(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));
});
reduce.print("reduce");
env.execute();
}
??6.6 多流轉(zhuǎn)換算子 分流
以下api的demo集合
sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,32.8
sensor_1,1547714191,26.8
sensor_3,1547718202,6.7
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
DataStream<String> streamSource = env.fromSource(build,
WatermarkStrategy.noWatermarks(),
"source.txt");
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
// 分流 側(cè)輸出流 定義低溫流標(biāo)識
final OutputTag<SensorReadingEntity> outputTag = new OutputTag<SensorReadingEntity>("low") {};
SingleOutputStreamOperator<SensorReadingEntity> process = dataStream.process(new ProcessFunction<SensorReadingEntity, SensorReadingEntity>() {
@Override
public void processElement(SensorReadingEntity sensorReadingEntity, ProcessFunction<SensorReadingEntity, SensorReadingEntity>.Context context, Collector<SensorReadingEntity> collector) throws Exception {
if (sensorReadingEntity.getTemperature() > 30) {
collector.collect(sensorReadingEntity);
} else {
context.output(outputTag, sensorReadingEntity);
}
}
});
// 低溫流 側(cè)輸出流
process.getSideOutput(outputTag).print("low");
// 高溫流 主流
process.print("high");
// 連接流(數(shù)據(jù)類型不同) + 合流(完全相同) 轉(zhuǎn)換為二元組
DataStream<Object> map = process.map(item -> {
SensorReadingEntity sensorReadingEntity = (SensorReadingEntity) item;
return new Tuple2<>(sensorReadingEntity.getId(), sensorReadingEntity.getTemperature());
})
.returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
.connect(process.getSideOutput(outputTag))
.map(new CoMapFunction<Tuple2<String, Double>, SensorReadingEntity, Object>() {
@Override
public Object map1(Tuple2<String, Double> tuple2) throws Exception {
return new Tuple3<>(tuple2.f0, tuple2.f1, "高溫報警");
}
@Override
public Object map2(SensorReadingEntity sensorReadingEntity) throws Exception {
return new Tuple2<>(sensorReadingEntity.getId(), "正常");
}
});
map.print("connect");
// union 合流
DataStream<SensorReadingEntity> union = process.union(process.getSideOutput(outputTag));
union.print("union");
env.execute();
}
6.6.1 getSideOutput 分流輸出
用于將實時數(shù)據(jù),根據(jù)條件分流輸出.
根據(jù)30℃標(biāo)準(zhǔn)值分為高溫主流和低溫 側(cè)輸出流
6.6.2 connect 和CoMap 合流
connect只能連接兩條流
6.6.3 union 合流
可以合并多條流,但是流的類型必須一致
DataStream →DataStream:對兩個或者兩個以上的DataStream進(jìn)行union操作,產(chǎn)生一個包含所有DataStream元素的新DataStream.
七. 函數(shù)類
7.1 ??UDF函數(shù)類
Flink暴露了所有udf函數(shù)的接口(實現(xiàn)方式為接口或者抽象類)。例如MapFunction,FilterFunction,ProcessFunction等.
7.2 ??RichFunction
“富函數(shù)”是DataStream API提供的一個函數(shù)類的接口,所有Flink函數(shù)類都有其Rich 版本。它與常規(guī)函數(shù)的不同在于,可以獲取運(yùn)行環(huán)境的上下文,并擁有一些生命周期方法,所以可以實現(xiàn)更復(fù)雜的功能。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
DataStream<String> streamSource = env.fromSource(build,
WatermarkStrategy.noWatermarks(),
"source.txt");
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
DataStream<Tuple2<String, Integer>> resultStream = dataStream.map(new MyMapFunction());
resultStream.print("MyMapFunction");
DataStream<Tuple2<String, Integer>> resultRichStream = dataStream.map(new MyRichMapFunction());
resultStream.print("MyRichMapFunction");
env.execute();
}
public static class MyMapFunction implements MapFunction<SensorReadingEntity, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(SensorReadingEntity value) throws Exception {
return new Tuple2<>(value.getId(), value.getId().length());
}
}
public static class MyRichMapFunction extends RichMapFunction<SensorReadingEntity,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReadingEntity value) throws Exception {
return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
// 初始化工作,一般是定義狀態(tài),或者建立數(shù)據(jù)庫連接
// 每個并行實例都會調(diào)用一次
System.out.println("open");
}
@Override
public void close() throws Exception {
// 一般是關(guān)閉連接和清空狀態(tài)的收尾操作
// 每個并行實例都會調(diào)用一次
System.out.println("close");;
}
}
7.3 重分區(qū)
上述介紹KeyBy是Hash重分區(qū)
broadcast 下游廣播
shuffle 隨機(jī)把當(dāng)前任務(wù)分配到下游子分區(qū)
forward 直通分區(qū)
rebalance 輪詢分配到下游子分區(qū)
rescale 分組輪詢到下游子分區(qū)
global 只傳輸?shù)较掠蔚谝粋€子分區(qū)
partitionCustom 自定義傳輸分區(qū)
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
DataStream<String> streamSource = env.fromSource(build,
WatermarkStrategy.noWatermarks(),
"source.txt");
streamSource.print("input");
// 1. shuffle
DataStream<String> shuffle = streamSource.shuffle();
shuffle.print("shuffle");
// 2. rebalance
DataStream<String> rebalance = streamSource.rebalance();
rebalance.print("rebalance");
// 3. rescale
DataStream<String> rescale = streamSource.rescale();
rescale.print("rescale");
// 4. global
DataStream<String> global = streamSource.global();
global.print("global");
// 5. broadcast
DataStream<String> broadcast = streamSource.broadcast();
broadcast.print("broadcast");
// 6. forward
DataStream<String> forward = streamSource.forward();
forward.print("forward");
// 7. keyBy
streamSource.keyBy(item -> {
String[] split = item.split(",");
return split[0];
}).print("keyBy");
env.execute();
}
7.4 sink 寫入庫
Flink沒有類似于spark 中 foreach方法,讓用戶進(jìn)行迭代的操作。雖有對外的輸出操作都要利用sink完成。最后通過類似如下方式完成整個任務(wù)最終輸出操作。
stream. addsink( new Mysink ( xxxx))
flink1.17.0提供的連接器
??7.4.1 讀kafka-寫kafka
Kafka基本配置
# 修改kafka主機(jī)ip 配置localhost或127.0.0.1在wsl上可能會有問題
conf/server.properties
listeners = PLAINTEXT://非回環(huán)ip:9092
advertised.listeners=PLAINTEXT://非回環(huán)ip:9092
#開啟kafka zookeeper服務(wù)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 開啟kafka服務(wù)
bin/kafka-server-start.sh config/server.properties
實例化交換機(jī)消息隊列
# 交換機(jī)ip端口 172.27.181.61:9092 主題交換機(jī)名稱topic-sink 生產(chǎn)者
bin/kafka-console-producer.sh --broker-list 172.27.181.61:9092 --topic topic-producer
# 消費者
bin/kafka-console-consumer.sh --bootstrap-server 172.27.181.61:9092 --topic topic-sink
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String brokers = "172.27.181.61:9092";
// 從kafka讀取數(shù)據(jù)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("topic-producer")
.setStartingOffsets(OffsetsInitializer.latest()) // 從最新的數(shù)據(jù)開始讀取
.setValueOnlyDeserializer(new SimpleStringSchema()) // 只需要value
.build();
DataStream<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 轉(zhuǎn)換成SensorReading 類型
DataStream<String> dataStream = kafkaSource.map(item -> {
String[] split = item.split(",");
try {
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2])).toString();
} catch (Exception e) {
return item;
}
});
// 輸出到kafka
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-sink")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 至少一次
.build();
dataStream.sinkTo(sink);
env.execute();
}
}
??7.4.2 讀kafka-寫非關(guān)系數(shù)據(jù)庫redis-redisson-自定義flink連接器
1.flink官方為我們提供了多種連接器,我們可以直接使用
官方連接地址 這里就不講這種方法了
2.自定義連接器,以整合redisson為例
額外引入pom
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.0</version>
</dependency>
</dependencies>
配置redisson數(shù)據(jù)源,springboot可以修改該配置
//@Configuration
@Slf4j
//@ConfigurationProperties(prefix = "redisson")
//@Data
public class RedissonConfig {
// private String host;
//private String password;
//private Integer database;
// @Bean(destroyMethod = "shutdown")
public static RedissonClient redisson() {
Config config = new Config();
//config.useClusterServers().addNodeAddress("127.0.0.1:6379").setPassword("123456");
config.useSingleServer()
.setAddress("redis://"+ "localhost:6379") // 單機(jī)模式
.setPassword("123456") // 密碼
.setSubscriptionConnectionMinimumIdleSize(1) // 對于支持多個Redis連接的RedissonClient對象,
.setSubscriptionConnectionPoolSize(50) // 對于支持綁定多個Redisson連接的RedissonClient對象,
.setConnectionMinimumIdleSize(32) // 最小空閑連接數(shù)
.setConnectionPoolSize(64) // 只能用于單機(jī)模式
.setDnsMonitoringInterval(5000) // DNS監(jiān)控間隔時間,單位:毫秒
.setIdleConnectionTimeout(10000) // 空閑連接超時時間,單位:毫秒
.setConnectTimeout(10000) // 連接超時時間,單位:毫秒
.setPingConnectionInterval(10000) // 集群狀態(tài)掃描間隔時間,單位:毫秒
.setTimeout(5000) // 命令等待超時時間,單位:毫秒
.setRetryAttempts(3) // 命令重試次數(shù)
.setRetryInterval(1500) // 命令重試發(fā)送時間間隔,單位:毫秒
.setDatabase(0) // 數(shù)據(jù)庫編號
.setSubscriptionsPerConnection(5); // 每個連接的最大訂閱數(shù)量
config.setCodec(new JsonJacksonCodec()); // 設(shè)置編碼方式
return Redisson.create(config);
}
}
source.txt
sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,32.8
sensor_1,1547714191,26.8
sensor_3,1547718202,6.7
自定義RichSinkFunction
open在創(chuàng)建sink時候只調(diào)用一次,所以這里可以用于初始化一些資源配置
invok(Object value, Context context)方法在每次有數(shù)據(jù)流入時都會調(diào)用,所以從源中每過來一個數(shù)據(jù)都會執(zhí)行
close()方法用于關(guān)閉sink時調(diào)用,一般用于釋放資源
如果使用springboot整合那直接注入,就不需要在open中初始化
public class SinkTest2_Redis_UDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
FileSource<String> build = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
.build();
DataStream<String> streamSource = env.fromSource(build,
WatermarkStrategy.noWatermarks(),
"source.txt");
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
dataStream.addSink(new RedisSink_UDF());
env.execute();
}
public static class RedisSink_UDF extends RichSinkFunction<SensorReadingEntity> {
RedissonClient redisClient;
@Override
public void open(Configuration parameters) throws Exception {
// springboot不需要這樣獲取,直接注入redisson配置類
super.open(parameters);
redisClient = RedissonConfig.redisson();
}
@Override
public void close() throws Exception {
super.close();
redisClient.shutdown();
}
@Override
public void invoke(SensorReadingEntity sensorReadingEntity, Context context) throws Exception {
if(redisClient == null){
redisClient = RedissonConfig.redisson();
}
System.out.println(sensorReadingEntity);
RMap<String, SensorReadingEntity> map = redisClient.getMap("real-time-key");
map.expire(10, TimeUnit.MINUTES);
map.put(sensorReadingEntity.getId(),sensorReadingEntity);
}
}
}
八. Window API
一般真實的流都是無界的,怎樣處理無界的數(shù)據(jù)? 即需要統(tǒng)計數(shù)據(jù)到未來某個時間段.
- 可以把無限的數(shù)據(jù)流進(jìn)行切分,得到有限的數(shù)據(jù)集進(jìn)行處理——也就是得到有界流
- 窗口(window)就是將無限流切割為有限流的一種方式,它會將流數(shù)據(jù)分發(fā)到有限大小的桶(bucket)中進(jìn)行分析. flink的窗口概念更類似令牌桶(條件桶)的概念,因為無限數(shù)據(jù)是無法確定數(shù)據(jù)量的,需要將符合條件的數(shù)據(jù)放入對應(yīng)的窗口.亂序數(shù)據(jù)也因此可以變得有序.
window類型
時間窗口 (按時間截取)
- 滾動時間窗口
- 滑動時間窗口
- 會話窗口
計數(shù)窗口 (按數(shù)據(jù)個數(shù)截取)
- 滾動計數(shù)窗口
- 滑動計數(shù)窗口
會話窗口 session window
由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新數(shù)據(jù)就會生成新的窗口
特點:時間無對齊
8.1 窗口分配器 && 窗口函數(shù)
窗口分配器———window()方法
我們可以用.window()來定義一個窗口,然后基于這個window去做一些聚合或者其它處理操作。注意window()方法必須在keyBy之后才能用。Flink 提供了更加簡單的.timeWindow和.countWindow方法,用于定義時間窗口和計數(shù)窗口。
sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,58.8
sensor_1,1547714191,40.8
sensor_3,1547718202,6.7
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// socketText nc -lk 7777
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 7777);
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
// 窗口測試
// dataStream.windowAll(); //global
DataStream<Integer> resultStream = dataStream
.keyBy(SensorReadingEntity::getId)
// 滾動窗口 參數(shù)1為一個窗口1分鐘s接收時間,參數(shù)2時間偏移15s開一個窗口
.window(TumblingProcessingTimeWindows.of((Time.minutes(1)), Time.seconds(15)))
.aggregate(new AggregateFunction<SensorReadingEntity, AtomicInteger, Integer>() {
@Override
public AtomicInteger createAccumulator() {
return new AtomicInteger(0);
}
@Override
public AtomicInteger add(SensorReadingEntity sensorReadingEntity, AtomicInteger atomicInteger) {
atomicInteger.incrementAndGet();
return atomicInteger;
}
@Override
public Integer getResult(AtomicInteger atomicInteger) {
return atomicInteger.get();
}
@Override
public AtomicInteger merge(AtomicInteger atomicInteger, AtomicInteger acc1) {
int i = atomicInteger.get() + acc1.get();
atomicInteger.set(i);
return atomicInteger;
}
});
resultStream.print();
// dataStream.keyBy(SensorReadingEntity::getId)
// // 滑動窗口 1個窗口30s接收時間,窗口間隔15s 開一個窗口
// .window(SlidingProcessingTimeWindows.of(Time.seconds(30),Time.seconds(15)));
// dataStream.keyBy(SensorReadingEntity::getId)
// // session窗口
// .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)));
//
// dataStream.keyBy(SensorReadingEntity::getId)
// // 滾動統(tǒng)計窗口
// .countWindow(5);
// dataStream.keyBy(SensorReadingEntity::getId)
// // 滑動統(tǒng)計窗口
// .countWindow(5,3);
env.execute();
}
先前的窗口分配器將分組的數(shù)據(jù)做了分桶. 窗口函數(shù)是對桶數(shù)據(jù)做聚合運(yùn)算.
window function定義了要對窗口中收集的數(shù)據(jù)做的計算操作可以分為兩類
8.1.1 增量聚合函數(shù)(incremental aggregation functions)
- 每條數(shù)據(jù)到來就進(jìn)行計算,保持一個簡單的狀態(tài).
- ReduceFunction, AggregateFunction
AggregateFunction 參數(shù)1 入?yún)㈩愋?參數(shù)2 中間累加狀態(tài) 參數(shù)3 輸出類型
8.1.2 全窗口函數(shù)(full window functions)
- 先把窗口所有數(shù)據(jù)收集起來,等到計算的時候會遍歷所有數(shù)據(jù).
- ProcessWindowFunction,WindowFunction
8.1.3 窗口統(tǒng)計函數(shù)
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// socketText nc -lk 7777
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 7777);
// 轉(zhuǎn)換成SensorReading 類型
DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
String[] split = item.split(",");
return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
});
// 開窗計數(shù)窗口
DataStream<Double> aggregate = dataStream.keyBy(SensorReadingEntity::getId)
.countWindow(10, 2)
.aggregate(new AvgTemp());
aggregate.print();
env.execute();
}
public static class AvgTemp implements AggregateFunction<SensorReadingEntity, Tuple2<Double,Integer>,Double> {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReadingEntity sensorReadingEntity, Tuple2<Double, Integer> objects) {
objects.f0 += sensorReadingEntity.getTemperature();
objects.f1 += 1;
return objects;
}
@Override
public Double getResult(Tuple2<Double, Integer> objects) {
return objects.f0/objects.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> objects, Tuple2<Double, Integer> acc1) {
return new Tuple2<>(objects.f0+acc1.f0,objects.f1+acc1.f1);
}
}
8.1.4 其他api函數(shù)
.trigger()——觸發(fā)器定義 window 什么時候關(guān)閉,觸發(fā)計算并輸出結(jié)果
.evictor()——移除器定義移除某些數(shù)據(jù)的邏輯
.allowedLateness()——允許處理遲到的數(shù)據(jù)
.sideOutputLateData()——將遲到的數(shù)據(jù)放入側(cè)輸出流
.getSideOutput()——獲取側(cè)輸出流
8.1.5 windows api總結(jié)
九.時間定義
- Event Time 事件創(chuàng)建時間
- Ingestion Time: 數(shù)據(jù)進(jìn)入Flink的時間
- Processing Time: 執(zhí)行本地算子的本地系統(tǒng)時間,和操作系統(tǒng)相關(guān)
一般的更關(guān)心事件時間.而不是系統(tǒng)處理時間. 以用戶產(chǎn)生時間為準(zhǔn),所見即所得.
flink默認(rèn)時間語義在1.12已改為是事件時間
在Flink 1.12中,默認(rèn)的流時間特性已更改為 TimeCharacteristic.EventTime,因此您不再需要調(diào)用此方法來啟用事件時間支持。顯式地使用處理時間窗口和計時器在事件時間模式下工作。如果您需要禁用水印,請使用 ExecutionConfig.setAutoWatermarkInterval(長). 如果你正在使用 TimeCharacteristic。IngestionTime,請手動設(shè)置合適的 WatermarkStrategy. 如果您正在使用通用的“時間窗口”操作(例如 org.apache.flink.streaming.api.datastream.KeyedStream.timeWindow (org.apache.flink.streaming.api.windowing.time.Time) 根據(jù)時間特征改變行為,請使用顯式指定處理時間或事件時間的等效操作。
當(dāng)然還要將事件時間關(guān)聯(lián)數(shù)據(jù)的時間
,當(dāng)Flink 以Event Time模式處理數(shù)據(jù)流時,它會根據(jù)數(shù)據(jù)里的時間戳來處理基于時間的算子. 由于網(wǎng)絡(luò)、分布式等原因,會導(dǎo)致亂序數(shù)據(jù)的產(chǎn)生
9.1 WaterMark定義
怎樣避免亂序數(shù)據(jù)帶來計算不正確?
遇到一個時間戳達(dá)到了窗口關(guān)閉時間,不應(yīng)該立刻觸發(fā)窗口計算,而是等待一段時間,等遲到的數(shù)據(jù)來了再關(guān)閉窗口
Watermark是一種衡量Event Time進(jìn)展的機(jī)制,可以設(shè)定延遲觸發(fā)
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來實現(xiàn);
數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark 的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
watermark用來讓程序自己平衡延遲和結(jié)果正確性
watermark是一條特殊的數(shù)據(jù)記錄
watermark必須單調(diào)遞增,以確保任務(wù)的事件時間時鐘在向前推進(jìn),而不是在后退
watermark 與數(shù)據(jù)的時間戳相關(guān)
基于上述條件. 當(dāng)數(shù)據(jù)亂序來臨時 —>watermark = Max(當(dāng)前窗口最大事件時間,新來的事件時間) - 設(shè)置的延遲時間
即為watermark 標(biāo)準(zhǔn)時間.當(dāng)watermark 標(biāo)準(zhǔn)時間到達(dá)關(guān)閉要求,則直接關(guān)閉舊窗口文章來源:http://www.zghlxwxcb.cn/news/detail-674860.html
9.2 WaterMark上下游算子傳遞
WaterMark是慢鐘標(biāo)準(zhǔn)時間, 因此廣播到下游算子.
由于flink多是并行計算,那并行的上游任務(wù)WaterMark可能不相同.那下游獲得的WaterMark以哪個上游任務(wù)的WaterMark為準(zhǔn)呢?下游WaterMark = Min(上游并行算子)
文章來源地址http://www.zghlxwxcb.cn/news/detail-674860.html
到了這里,關(guān)于Flink1.17.0數(shù)據(jù)流的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!