目錄
1.背景
2.coGroup算子源碼分析
2.1完整的coGroup算子調(diào)用流程
2.2coGroup方法入口
2.3?CoGroupedStreams對(duì)象分析
2.4WithWindow內(nèi)部類分析
2.5CoGroupWindowFunction函數(shù)分析
3.修改源碼支持獲取遲到數(shù)據(jù)測(cè)輸出流
3.1復(fù)制CoGroupedStreams
3.2新增WithWindow.sideOutputLateData方法
3.3新增WithWindow構(gòu)造方法
3.4修改apply方法
3.5開放UnionTypeInfo類的public權(quán)限
?3.6編譯Flink源碼flink-streaming-java模塊
3.7項(xiàng)目中查看maven是否已經(jīng)刷新為最新代碼
4.測(cè)試
1.背景
coGroup算子開窗到時(shí)間關(guān)閉之后,遲到數(shù)據(jù)無(wú)法通過(guò)測(cè)輸出流提取,intervalJoin算子提供了api,因?yàn)閖oin算子底層就是coGroup算子,所以Join算子也不行。
flink版本 v1.17.1
2.coGroup算子源碼分析
2.1完整的coGroup算子調(diào)用流程
input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness)
.apply(cgroupFunction)
通過(guò)上述代碼可以看到?jīng)]有sideOutputLateData的相關(guān)方法,用來(lái)提取窗口關(guān)閉之后的遲到數(shù)據(jù)
2.2coGroup方法入口
其中創(chuàng)建了一個(gè)CoGroupedStreams流對(duì)象
/**
* Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and
* window can be specified.
*/
public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
return new CoGroupedStreams<>(this, otherStream);
}
2.3?CoGroupedStreams對(duì)象分析
他可以理解為構(gòu)造設(shè)計(jì)模式的一個(gè)Builder類,通過(guò)where方法配置第一條流的KeySelector,再返回一個(gè)CoGroupedStreams的內(nèi)部類Where,再通過(guò)equalTo方法配置第二條流的KeySelector,再返回EqualTo內(nèi)部類,window方法配置窗口劃分器,返回WithWindow內(nèi)部類,后續(xù)都是窗口的配置?trigger,evictor,allowedLateness配置窗口參數(shù),最后調(diào)用apply方法傳送用戶業(yè)務(wù)函數(shù)
2.4WithWindow內(nèi)部類分析
WithWindow是最終保存所有配置的內(nèi)部類包括兩條流,窗口配置,key提取器的配置,最終會(huì)用戶調(diào)用apply方法觸發(fā)CoGroup的業(yè)務(wù),在apply方法中通過(guò)union聯(lián)合兩條流,然后通過(guò)keyby轉(zhuǎn)為KeyedStream,再通過(guò)window配置窗口,最終調(diào)用窗口函數(shù)的apply方法,傳入WindowFunction,做CoGroup的業(yè)務(wù)與用戶業(yè)務(wù)。
具體代碼如下已寫好備注
/**
* A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
* well as a {@link WindowAssigner}.
*
* @param <T1> Type of the elements from the first input
* @param <T2> Type of the elements from the second input
* @param <KEY> Type of the key. This must be the same for both inputs
* @param <W> Type of {@link Window} on which the co-group operation works.
*/
@Public
public static class WithWindow<T1, T2, KEY, W extends Window> {
//第一條流
private final DataStream<T1> input1;
//第二條流
private final DataStream<T2> input2;
//第一個(gè)key提取器
private final KeySelector<T1, KEY> keySelector1;
//第二個(gè)Key提取器
private final KeySelector<T2, KEY> keySelector2;
//Key的類型
private final TypeInformation<KEY> keyType;
//窗口分配器
private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
//窗口出發(fā)計(jì)算器
private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
private final Time allowedLateness;
private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
//構(gòu)造函數(shù)給上面對(duì)象賦值
protected WithWindow(
DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
Time allowedLateness) {
this.input1 = input1;
this.input2 = input2;
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
this.keyType = keyType;
this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
this.allowedLateness = allowedLateness;
}
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
* parallelism.
*/
public <T> DataStream<T> apply(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
//創(chuàng)建合并兩個(gè)流的公共TypeInfo,UnionTypeInfo最終會(huì)將Input1,Input2的數(shù)據(jù)通過(guò)map算子轉(zhuǎn)換為該類型
UnionTypeInfo<T1, T2> unionType =
new UnionTypeInfo<>(input1.getType(), input2.getType());
//轉(zhuǎn)換成union的KeySelector
UnionKeySelector<T1, T2, KEY> unionKeySelector =
new UnionKeySelector<>(keySelector1, keySelector2);
//將taggedInput1的數(shù)據(jù)類容map成UnionTypeInfo<T1, T2>類型
SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
input1.map(new Input1Tagger<T1, T2>());
taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
taggedInput1.returns(unionType);
//將taggedInput2的數(shù)據(jù)類容map成UnionTypeInfo<T1, T2>類型
SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
input2.map(new Input2Tagger<T1, T2>());
taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
taggedInput2.returns(unionType);
//將兩個(gè)流進(jìn)行union
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
//keyBy并且開窗
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(
unionStream, unionKeySelector, keyType)
.window(windowAssigner);
//配置窗口觸發(fā)器
if (trigger != null) {
windowedStream.trigger(trigger);
}
//配置移除器
if (evictor != null) {
windowedStream.evictor(evictor);
}
//配置allowedLateness
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
//創(chuàng)建CoGroupWindowFunction ,并把用戶函數(shù)傳入進(jìn)去
return windowedStream.apply(
new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,
* TypeInformation)} method has the wrong return type and hence does not allow one to set an
* operator-specific parallelism
*
* @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
* TypeInformation)} method is fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public <T> SingleOutputStreamOperator<T> with(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}
@VisibleForTesting
Time getAllowedLateness() {
return allowedLateness;
}
//獲取窗口包裝流,但是標(biāo)記為VisibleForTesting,用戶無(wú)法調(diào)用,如果可以調(diào)用的話可以通過(guò)該方法獲取包裝流之后通過(guò)窗口流獲取遲到數(shù)據(jù)的測(cè)輸出流
@VisibleForTesting
WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
return windowedStream;
}
}
2.5CoGroupWindowFunction函數(shù)分析
CoGroupWindowFunction也是CoGroupedStreams內(nèi)部類,負(fù)責(zé)做CoGroup的業(yè)務(wù),最終將數(shù)據(jù)封裝好轉(zhuǎn)發(fā)給用戶函數(shù)(也就是2.1中apply中的cgroupFunction)
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
private static final long serialVersionUID = 1L;
public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
super(userFunction);
}
@Override
public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
throws Exception {
//緩存當(dāng)前窗口里1號(hào)流的數(shù)據(jù)
List<T1> oneValues = new ArrayList<>();
//緩存當(dāng)前窗口里2號(hào)流的數(shù)據(jù)
List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val : values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
//傳入到用戶函數(shù)中
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}
3.修改源碼支持獲取遲到數(shù)據(jù)測(cè)輸出流
思路 復(fù)制CoGroupedStreams新增一個(gè)NewCoGroupedStreams,在WithWindow函數(shù)中增加方法sideOutputLateData,讓用戶傳入outputTag,用于提取窗口關(guān)閉后的測(cè)輸出流。
3.1復(fù)制CoGroupedStreams
3.2新增WithWindow.sideOutputLateData方法
新增該方法,傳入outputTag,下圖WithWindow構(gòu)造方法是3.3新增的
@PublicEvolving
public WithWindow<T1, T2, KEY, W> sideOutputLateData(
OutputTag<TaggedUnion<T1, T2>> outputTag) {
return new WithWindow<>(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
trigger,
evictor,
allowedLateness,
outputTag
);
}
3.3新增WithWindow構(gòu)造方法
新增屬性laterDataOutputTag,用來(lái)保存構(gòu)造函數(shù)中傳入的laterOutputTag
protected WithWindow(
DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
KeySelector<T2, KEY> keySelector2,
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
Time allowedLateness,
OutputTag<TaggedUnion<T1, T2>> laterOutputTag
) {
this(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
trigger,
evictor,
allowedLateness);
this.lateDataOutputTag = laterOutputTag;
}
3.4修改apply方法
判斷l(xiāng)ateDataOutputTag 是否為null,如果不為null則調(diào)用windowedStream的sideOutputLateData設(shè)置遲到數(shù)據(jù)tag
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
* <p>Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
* parallelism.
*/
public <T> DataStream<T> apply(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo<T1, T2> unionType =
new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector =
new UnionKeySelector<>(keySelector1, keySelector2);
SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
input1.map(new Input1Tagger<T1, T2>());
taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
taggedInput1.returns(unionType);
SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
input2.map(new Input2Tagger<T1, T2>());
taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
taggedInput2.returns(unionType);
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(
unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
//判斷l(xiāng)ateDataOutputTag是否為NULL,如果不為NULL,則調(diào)用windowedStream
//的sideOutputLateData方法,傳入lateDataOutputTag讓遲到數(shù)據(jù)輸出到測(cè)輸出流中
if (lateDataOutputTag != null) {
windowedStream.sideOutputLateData(lateDataOutputTag);
}
return windowedStream.apply(
new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
3.5開放UnionTypeInfo類的public權(quán)限
該類就是union之后的公共類的類型 oneType代表Input1流的數(shù)據(jù)類型,TwoType代表Input2流的數(shù)據(jù)類型
3.6編譯Flink源碼flink-streaming-java模塊
進(jìn)入到flink-streaming-java所在磁盤目錄輸入以下命令編譯
mvn clean install -DskipTests -Dfast
編譯成功
3.7項(xiàng)目中查看maven是否已經(jīng)刷新為最新代碼
編譯之后,可以看到導(dǎo)入的maven包已經(jīng)有了新增的NewCoGroupedStreams類了,注意項(xiàng)目中的maven依賴中的flink版本,要與編譯源碼的版本一致,否則無(wú)法引入到。
4.測(cè)試
新建兩個(gè)流,通過(guò)new?NewCoGroupedStreams創(chuàng)建對(duì)象,在allowedLateness之后通過(guò)sideOutputLateData設(shè)置outputTag,然后通過(guò)with方法觸發(fā)業(yè)務(wù),with底層也是調(diào)用了apply,只不過(guò)他幫我們把返回的流轉(zhuǎn)為了SingleOutputStreamOperator類型,可以用于提取測(cè)輸出流。最后通過(guò)with.getSideOutput(outputTag)提取測(cè)輸出流,最后通過(guò)map轉(zhuǎn)換為?Tuple2<Integer, WaterSensor> 類型進(jìn)行打印
OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",
new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));
NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);
SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(outputTag)
.with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {
@Override
public void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {
out.collect(first.toString() + "======" + second.toString());
}
});
with.print();
with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {
@Override
public Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {
return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());
}
}).print();
可以看到下圖結(jié)果,ts代表時(shí)間戳,第一個(gè)打印是RichCoGroupFunction打印,代表關(guān)閉了1~10s的時(shí)間窗,后面我們?cè)谳斎?WaterSensor{id='a', ts=1, vc=1} 就通過(guò)測(cè)輸出流打印為二元組了文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-847153.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-847153.html
到了這里,關(guān)于二次開發(fā)Flink-coGroup算子支持遲到數(shù)據(jù)通過(guò)測(cè)輸出流提取的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!