目錄
0、版本說(shuō)明
1、什么是水位線?
2、水位線使用場(chǎng)景?
3、設(shè)計(jì)水位線主要為了解決什么問(wèn)題?
4、怎樣在flink中生成水位線?
4.1、自定義標(biāo)記 Watermark 生成器
4.2、自定義周期性 Watermark 生成器
4.3、內(nèi)置Watermark生成器 - 有序流水位線生成器
4.4、內(nèi)置Watermark生成器 - 亂序流水位線生成器
4.5、在 讀取數(shù)據(jù)源時(shí) 添加水位線
5、水位線和窗口的關(guān)系?
6、水位線在各個(gè)算子間的傳遞
6.1、測(cè)試用例 - 不設(shè)置?withIdleness 超時(shí)時(shí)間
6.2、測(cè)試用例 - 設(shè)置?withIdleness 超時(shí)時(shí)間
0、版本說(shuō)明
? ? ? ? 開發(fā)語(yǔ)言:java1.8
? ? ? ? Flink版本:1.17
? ? ? ? 官網(wǎng)鏈接:官網(wǎng)鏈接
1、什么是水位線?
? ? ? ? Flink中水位線是一條特殊的數(shù)據(jù)(long timestamp)
????????它會(huì)以時(shí)間戳的形式作為一條標(biāo)識(shí)數(shù)據(jù)插入到數(shù)據(jù)流中
2、水位線使用場(chǎng)景?
????????使用事件時(shí)間(EventTime)做流式計(jì)算任務(wù)時(shí),需要根據(jù)事件時(shí)間生成水位線(Watermark)
????????通過(guò)水位線來(lái)觸發(fā)窗口計(jì)算,水位線作為衡量事件時(shí)間(EventTime)進(jìn)展的標(biāo)識(shí)
3、設(shè)計(jì)水位線主要為了解決什么問(wèn)題?
????????設(shè)計(jì)水位線主要是為了解決實(shí)時(shí)流中數(shù)據(jù)亂序和遲到的問(wèn)題
????????思考:什么原因造成了數(shù)據(jù)流的亂序呢?
? ? ? ? ? ? ? ? 如今數(shù)據(jù)采集、數(shù)據(jù)傳輸大多都在分布式系統(tǒng)中完成
????????????????各個(gè)機(jī)器節(jié)點(diǎn)因?yàn)榫W(wǎng)絡(luò)和自身性能的原因 導(dǎo)致了數(shù)據(jù)的亂序和遲到
4、怎樣在flink中生成水位線?
????????Flink中支持在 數(shù)據(jù)源和普通DataStream上添加水位線生成策略(WatermarkStrategy)
4.1、自定義標(biāo)記 Watermark 生成器
標(biāo)記 Watermark 生成器特點(diǎn):
????????每條數(shù)據(jù)到來(lái)后,都會(huì)為其生成一條?Watermark
適用場(chǎng)景:
????????數(shù)據(jù)量小且數(shù)據(jù)有序
代碼示例:????????
Step1:自定義 標(biāo)記水位線生成器 實(shí)現(xiàn)類
// 自定義 標(biāo)記水位線生成器 實(shí)現(xiàn)類
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {
// 每進(jìn)入一條數(shù)據(jù),都會(huì)調(diào)用一次 onEvent 方法
@Override
/*
* 參數(shù)說(shuō)明:
* @event : 進(jìn)入到該方法的事件數(shù)據(jù)
* @eventTimestamp : 時(shí)間戳提取器提取的時(shí)間戳
* */
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
//發(fā)射水位線
output.emitWatermark(new Watermark(eventTimestamp));
}
// 不需要實(shí)現(xiàn)
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
Step2:自定義 標(biāo)記性水位線生成策略 實(shí)現(xiàn)類
// TODO 自定義 標(biāo)記性水位線生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
// TODO 實(shí)例化一個(gè) 事件時(shí)間提取器
@Override
public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
};
return timestampAssigner;
}
// TODO 實(shí)例化一個(gè) watermark 生成器
@Override
public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new PeriodWatermarkGenerator<>();
}
}
Step3:使用?標(biāo)記性水位線生成策略
// TODO 使用 自定義標(biāo)記 Watermark 生成器
public class UserPeriodWatermarkStrategy {
public static void main(String[] args) throws Exception {
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
}
}
);
// 3.為 DataStream 添加水位線生成策略 (使用 自定義WatermarkStrategy 實(shí)現(xiàn)類)
SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());
// 4.通過(guò) processFunction實(shí)例 查看生成的水位線
SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
process.print();
// 5.觸發(fā)程序執(zhí)行
env.execute();
}
}
查看運(yùn)行結(jié)果:
4.2、自定義周期性 Watermark 生成器
標(biāo)記 Watermark 生成器特點(diǎn):
? ? ? ? 基于處理時(shí)間,周期性生成?Watermark
適用場(chǎng)景:
????????數(shù)據(jù)量大且可能存在一定程度數(shù)據(jù)延遲(亂序)
代碼示例:????????
Step1:自定義 周期性水位線生成器 實(shí)現(xiàn)類
// 自定義 周期性水位線生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
// 設(shè)置變量,用來(lái)保存 當(dāng)前最大的事件時(shí)間
private long currentMaxTimestamp;
// 設(shè)置變量,指定最大的亂序時(shí)間(等待時(shí)間)
private final long maxOutOfOrderness = 0000; // 3 秒
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
// 只更新當(dāng)前最大時(shí)間戳,不再發(fā)生水位線
if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;
}
// 周期性 生成水位線
// 每個(gè) setAutoWatermarkInterval 時(shí)間,調(diào)用一次該方法
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 發(fā)出的 watermark = 當(dāng)前最大時(shí)間戳 - 最大亂序時(shí)間
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}
}
Setp2:自定義 周期性水位線生成策略 實(shí)現(xiàn)類
// 自定義 周期性水位線生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {
// TODO 實(shí)例化一個(gè) 事件時(shí)間提取器
@Override
public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
};
return timestampAssigner;
}
// TODO 實(shí)例化一個(gè) watermark 生成器
@Override
public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new PunctuatedWatermarkGenerator<>();
}
}
Step3:周期性水位線生成策略
// TODO 使用 自定義周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {
public static void main(String[] args) throws Exception {
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 設(shè)置周期性生成水位線的時(shí)間間隔(默認(rèn)為200毫秒)
env.getConfig().setAutoWatermarkInterval(3 * 1000L);
// 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
}
}
);
// TODO 獲取 WatermarkStrategy實(shí)例 (方式1:通過(guò) WatermarkStrategy實(shí)現(xiàn)類獲取)
PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();
// TODO 獲取 WatermarkStrategy實(shí)例 (方式2:通過(guò) WatermarkStrategy工具類獲取) 推薦
WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
.withTimestampAssigner((event, timestamp) -> event.f1);
// 3.使用 自定義水位線策略實(shí)例 來(lái)提取時(shí)間戳&生成水位線
SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);
// 4.通過(guò) processFunction實(shí)例 查看生成的水位線
SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
process.print();
// 3.觸發(fā)程序執(zhí)行
env.execute();
}
}
查看運(yùn)行結(jié)果:
4.3、內(nèi)置Watermark生成器 - 有序流水位線生成器
有序流水位線生成器特點(diǎn):
? ? ? ? 基于處理時(shí)間,周期性生成?Watermark,最大亂序時(shí)間為0
適用場(chǎng)景:
? ? ? ? 大數(shù)量有序流
代碼示例:
// TODO 內(nèi)置Watermark生成器 - 有序流水位線生成器
public class UserForMonotonousTimestamps {
public static void main(String[] args) throws Exception {
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 設(shè)置周期性生成水位線的時(shí)間間隔(默認(rèn)為200毫秒)
env.getConfig().setAutoWatermarkInterval(3 * 1000L);
// 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
}
}
);
// TODO 創(chuàng)建 內(nèi)置水位線生成策略
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((element,recordTimestamp) -> element.f1);
// 3.使用 內(nèi)置水位線生成策略
SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);
// 4.通過(guò) processFunction實(shí)例 查看生成的水位線
SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
process.print();
// 3.觸發(fā)程序執(zhí)行
env.execute();
}
}
查看運(yùn)行結(jié)果:
4.4、內(nèi)置Watermark生成器 - 亂序流水位線生成器
亂序流水位線生成器特點(diǎn):
? ? ? ? 基于處理時(shí)間,周期性生成?Watermark,可以這是最大亂序時(shí)間
適用場(chǎng)景:
? ? ? ? 大數(shù)量亂序流
代碼示例:
// TODO 內(nèi)置Watermark生成器 - 亂序流水位線生成器
public class UserForBoundedOutOfOrderness {
public static void main(String[] args) throws Exception {
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO 設(shè)置周期性生成水位線的時(shí)間間隔(默認(rèn)為200毫秒)
env.getConfig().setAutoWatermarkInterval(3 * 1000L);
// 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
}
}
);
// TODO 獲取 WatermarkStrategy實(shí)例
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 設(shè)置最大亂序時(shí)間為1s
.withTimestampAssigner((element,recordTimestamp) -> element.f1);
// 3.使用 內(nèi)置水位線生成策略
SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);
// 4.通過(guò) processFunction實(shí)例 查看生成的水位線
SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());
process.print();
// 3.觸發(fā)程序執(zhí)行
env.execute();
}
}
查看運(yùn)行結(jié)果:
4.5、在 讀取數(shù)據(jù)源時(shí) 添加水位線
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.創(chuàng)建 Source 對(duì)象
Source source = DataGeneratorSource、KafkaSource...
// 3.讀取 source時(shí)添加水位線
env
.fromSource(source, WatermarkStrategy實(shí)例, "source name")
.print()
;
// 4.觸發(fā)程序執(zhí)行
env.execute();
5、水位線和窗口的關(guān)系?
窗口什么時(shí)候創(chuàng)建?
????????當(dāng)窗口內(nèi)的第一條數(shù)據(jù)到達(dá)時(shí)
窗口什么時(shí)候觸發(fā)計(jì)算?
????????當(dāng)閾值水位線到達(dá)窗口時(shí)
6、水位線在各個(gè)算子間的傳遞
????????下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值
測(cè)試代碼:
// TODO 測(cè)試水位線的傳遞
public class TransmitWaterMark {
public static void main(String[] args) throws Exception {
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 2.將socket作為數(shù)據(jù)源(開啟socket端口: nc -lk 9999)
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
source
.partitionCustom(
new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
if (key.equals("a")) {
return 0;
} else if (key.equals("b")) {
return 1;
} else {
return 2;
}
}
}, value -> value.split(",")[0]
)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
//.<Tuple2<String, Long>>forMonotonousTimestamps()
.<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy())
.withTimestampAssigner((element,recordTimestamp) -> element.f1)
.withIdleness(Duration.ofSeconds(5)) //空閑等待5s
)
.process(new ShowProcessFunction()).setParallelism(1)
.print();
env.execute();
}
}
6.1、測(cè)試用例 - 不設(shè)置?withIdleness 超時(shí)時(shí)間
現(xiàn)象:如果上游某一個(gè)子任務(wù)一直沒(méi)有數(shù)據(jù)更新,下游算子的水位線一直不會(huì)變化
6.2、測(cè)試用例 - 設(shè)置?withIdleness 超時(shí)時(shí)間
現(xiàn)象:如果上游某一個(gè)子任務(wù)`在指定時(shí)間內(nèi)`數(shù)據(jù)更新,下游算子的水位線將不受該子任務(wù)最小值的影響
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-720219.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-720219.html
到了這里,關(guān)于7.2、如何理解Flink中的水位線(Watermark)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!