?一、介紹
Join大體分類只有兩種:Window Join和Interval Join
Window Join有可以根據(jù)Window的類型細分出3種:Tumbling(滾動) Window Join、Sliding(滑動) Window Join、Session(會話) Widnow Join。
??????????Window 類型的join都是利用window的機制,先將數(shù)據(jù)緩存在Window State中,當窗口觸發(fā)計算時,執(zhí)行join操作。
??????????Interval join也是利用state存儲數(shù)據(jù)再處理,區(qū)別在于state中的數(shù)據(jù)有失效機制,依靠數(shù)據(jù)觸發(fā)數(shù)據(jù)清理,目前Stream join的結果是數(shù)據(jù)的卡爾積。
二、Window Join
?Tumbling Window Join
????????執(zhí)行翻滾窗口聯(lián)接時,具有公共鍵和公告翻滾窗口的所有元素將成對組合聯(lián)接,并傳遞JoinFunction或FlatJoinFunction。因為它的行為類似于內部連接,所以一個流中的元素在其滾動窗口中沒有來自另一個流的元素,因此不會被發(fā)射。
????????如圖所示,我們定義了一個為2毫秒的翻滾窗口,結果窗口的形式為[0,1]、[2,3]..............該圖顯示了每個窗口中所以元素的成對組合,這些元素將傳遞給JoinFunction。注意在翻滾窗口[6,7]中沒有發(fā)射任何東西,因為綠色流中不存在與橙色元素⑥和⑦結合的元素。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
?Sliding Window Join
? ? ? ? 在執(zhí)行滑動窗口聯(lián)接時,具有公共鍵和公共滑動窗口的所以元素將作為成對組合聯(lián)接,并傳遞JoinFunction或FlatJoinFunction。在當前滑動窗口中,一個流的元素沒有來自另一個流的元素,則不會發(fā)射!請注意,某些元素可能會聯(lián)接到一個滑動窗口中,但不會聯(lián)接到另一個滑動窗口中!
? ? ? ? 在本例中,我們使用大小為2毫秒的滑動窗口,并將其滑動1毫秒,從而產生滑動窗口[-1,0],[1,2],[2,3]...........x軸下方的連續(xù)元素時傳遞給每個滑動窗口的Join Function的元素。在這里,你還可以看到,例如在窗口[2,3]中,橙色②和綠色③連接,但在窗口[1,2]中沒有與任何對象連接。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
?Session Window Join
? ? ? ? 在執(zhí)行會話窗口聯(lián)接時,具有相同鍵(當“組合”滿足會話條件)的所有元素以成對組合方式聯(lián)接,并傳遞給JoinFunction或FlatJoinFunction。同樣,這執(zhí)行一個內部連接,所以如果有一個會話窗口只包含來自一個流的元素,則不會發(fā)出任何輸出!
? ? ? ? 這里,我們定義一個會話窗口連接,其中每個會話被至少1毫秒的時間分割。有三個會話,在前兩個會話中,來自兩個流的連接元素被傳遞給JoinFunction。在第三個會話中,綠色流中沒有元素,所以⑧和⑨沒有連接!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
三、Interval Join
????????前面學習的Window Join必須要在一個Window中進行Join,那如果沒有Window如何處理呢?interval join也是使用相同的key來join兩個流(流A、流B),并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]?or
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
也就是:流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且,流B的元素的時間戳 ≤ 流A的元素時間戳
?
在上面的示例中,我們將兩個流“orange”和“green”連接起來,其下限為-2毫秒,上限為+1毫秒。默認情況下,這些邊界是包含的,但是可以應用.lowerBoundExclusive()和.upperBoundExclusive來更改行為orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound?文章來源:http://www.zghlxwxcb.cn/news/detail-835296.html
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
?文章來源地址http://www.zghlxwxcb.cn/news/detail-835296.html
到了這里,關于Flink雙流(join)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!