6.2 水位線
6.2.1 概述
- 分類
- 有序流
- 無(wú)序流
判斷的時(shí)間延遲
- 延遲時(shí)間判定
6.2.2 水位線的設(shè)置
- 分析
DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本質(zhì)還是個(gè)算子,傳入的參數(shù)是WatermarkStrategy的生成策略
但是WatermarkStrategy是一個(gè)接口
- 有序流
因此調(diào)用靜態(tài)方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator
AscendingTimestampsWatermarks這個(gè)繼承自BoundOutOfOrdernessWatermarks
BoundOutOfOrdernessWatermarks這個(gè)類有onEvent和onPeriodicEmit這兩方法,因?yàn)閷?shí)現(xiàn)了WatermarkGenerator這個(gè)接口
然后在調(diào)用接口中的默認(rèn)方法withTimestampAssigner得到返回WatermarkStrategy,參數(shù)是new SerializableTimestampAssigner的對(duì)象,重寫(xiě)extractTimestamp方法,這個(gè)方法作用是怎么樣從數(shù)據(jù)里面提取時(shí)間戳
- 亂序流
因此調(diào)用靜態(tài)方法forBoundedOutOfOrderness(參數(shù)為最大亂序程度,也就是延遲時(shí)間)后new BoundOutOfOrdernessWatermarks返回 WatermarkGernerator
BoundOutOfOrdernessWatermarks這個(gè)類有onEvent和onPeriodicEmit這兩方法,因?yàn)閷?shí)現(xiàn)了WatermarkGenerator這個(gè)接口(跟上面一樣了)
后面也跟有序一樣,然后在調(diào)用接口中的默認(rèn)方法withTimestampAssigner得到返回WatermarkStrategy
- 關(guān)系圖
- 完整代碼
public class WatermarkTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.輸入
SingleOutputStreamOperator<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L))
// //有序流的watermark生成
// //forMonotonousTimestamps前指定泛型
// .assignTimestampsAndWatermarks(WatermarkStrategy
// .<Event>forMonotonousTimestamps()//得到WatermarkGenerator
// .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {//返回WatermarkStrategy
// @Override
// //參數(shù)是當(dāng)前傳過(guò)來(lái)的數(shù)據(jù)element,另一個(gè)傳出的recordTimestamp是時(shí)間戳
// public long extractTimestamp(Event element, long recordTimestamp) {
// return element.timestamp;
// }
// })
// )
.assignTimestampsAndWatermarks(WatermarkStrategy
//forMonotonousTimestamps前指定泛型
//forMonotonousTimestamps參數(shù)是最大亂序時(shí)間
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
env.execute();
}
}
6.2.3 自定義水位線
- 分析
或者直接new 一個(gè)接口WatermarkStrategy重寫(xiě)createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取時(shí)間戳分配器的方法(生成TimeStampAssigner)創(chuàng)建watermark
WatermarkGenerator是個(gè)接口,有兩個(gè)方法分別是onEvent方法,主要目標(biāo)是要發(fā)出一個(gè)WatermarkOutput,另一個(gè)是onperiodicEmit方法,表示周期性的生成,周期性生成時(shí)間默認(rèn)是2秒,env調(diào)用getConfig后調(diào)用setAutoWatermarkInterval后可以更改周期性生成時(shí)間
WatermarkOutput也是一個(gè)接口,調(diào)用emitWatermark就能發(fā)出一個(gè)watermark,
除了WatermarkGenerator接口還有TimeStampAssigner也是個(gè)接口,里面只有一個(gè)方法叫做extractTimestamp,目的是從當(dāng)前數(shù)據(jù)提取時(shí)間戳,同時(shí)也會(huì)作為WatermarkGenerator這個(gè)接口中onEvent方法中傳入的參數(shù)eventTimestamp時(shí)間戳
- 關(guān)系圖
這圖估計(jì)也就我自己能看的懂了。。。
- 代碼
- 正常水位線
// 自定義水位線的產(chǎn)生
public class CustomWatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.print();
env.execute();
}
//內(nèi)部靜態(tài)類
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
//createTimestampAssigner方法生成TimeStampAssigner
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
//extractTimestamp,目的是從當(dāng)前數(shù)據(jù)提取時(shí)間戳
public long extractTimestamp(Event element, long recordTimestamp)
{
return element.timestamp; // 告訴程序數(shù)據(jù)源里的時(shí)間戳是哪一個(gè)字段
}
};
}
@Override
//createWatermarkGenerator生成WatermarkGenerator
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomPeriodicGenerator();
}
}
//CustomPeriodicGenerator實(shí)現(xiàn)WatermarkGenerator接口,并重寫(xiě)方法
public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L; // 延遲時(shí)間
private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 觀察到的最大時(shí)間戳
@Override
//更新當(dāng)前時(shí)間戳,這邊不發(fā)送水位線,目的是保存時(shí)間戳
public void onEvent(Event event, long eventTimestamp, WatermarkOutput
output) {
// 每來(lái)一條數(shù)據(jù)就調(diào)用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大時(shí)間戳
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 發(fā)射水位線,默認(rèn) 200ms 調(diào)用一次
//-1毫秒都是為了貼切窗口閉合的時(shí)候左閉右開(kāi)設(shè)計(jì)
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
}
- 斷點(diǎn)水位線
在onevent根據(jù)條件觸發(fā),onPeriodicEmit這個(gè)方法中就不用做了
public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的 itemId 時(shí),才發(fā)出水位線
if (r.user.equals("Mary")) {
output.emitWatermark(new Watermark(r.timestamp - 1));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 不需要做任何事情,因?yàn)槲覀冊(cè)?onEvent 方法中發(fā)射了水位線
}
}
- 在自定義數(shù)據(jù)源中發(fā)送水位線
使用 collectWithTimestamp 方法將數(shù)據(jù)發(fā)送出去,原來(lái)直接out.collect()的
參數(shù)是當(dāng)前數(shù)據(jù)還有當(dāng)前數(shù)據(jù)的時(shí)間戳,跟水位線生成中extractTimestamp(Event element, long recordTimestamp)這個(gè)類似,也是一個(gè)數(shù)據(jù)是什么,一個(gè)時(shí)間戳是啥
然后發(fā)送水位線,用emitWatermark方法生成
public class CustomWatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new ClickSourceWithWatermark()).print();
env.execute();
}
// 泛型是數(shù)據(jù)源中的類型
public static class ClickSourceWithWatermark implements SourceFunction<Event>
{
private boolean running = true;
@Override
public void run(SourceFunction.SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
String[] userArr = {"Mary", "Bob", "Alice"};
String[] urlArr = {"./home", "./cart", "./prod?id=1"};
while (running) {
long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒時(shí)間戳
String username = userArr[random.nextInt(userArr.length)];
String url = urlArr[random.nextInt(urlArr.length)];
Event event = new Event(username, url, currTs);
// 使用 collectWithTimestamp 方法將數(shù)據(jù)發(fā)送出去,并指明數(shù)據(jù)中的時(shí)間戳的字段
sourceContext.collectWithTimestamp(event, event.timestamp);
// 發(fā)送水位線
sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
6.2.4 水位線的傳遞
針對(duì)多個(gè)分區(qū),上游需要告訴下游水位線情況,采用的是廣播的方式給所有下游子任務(wù)
但是上游如果也是并行的,向下傳輸?shù)乃痪€可能有多個(gè),以上游發(fā)過(guò)來(lái)最小的時(shí)鐘為準(zhǔn),并且下游會(huì)有一個(gè)分區(qū)專門(mén)保存上游發(fā)過(guò)來(lái)的水位線最小的數(shù)據(jù)
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-413700.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-413700.html
到了這里,關(guān)于Flink-水位線的設(shè)置以及傳遞的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!