雙流連結(jié)(Join):根據(jù)某個(gè)字段的值將數(shù)據(jù)聯(lián)結(jié)起來(lái),“配對(duì)”去做處理
窗口聯(lián)結(jié)(Window Join)
可以定義時(shí)間窗口,并將兩條流中共享一個(gè)公共鍵(key)的數(shù)據(jù)放在窗口中進(jìn)行配對(duì)處理
代碼邏輯
首先需要調(diào)用 DataStream 的.join()
方法來(lái)合并兩條流,得到一個(gè) JoinedStreams;接著通過(guò).where()
和.equalTo()
方法指定兩條流中聯(lián)結(jié)的 key;然后通過(guò).window()
開(kāi)窗口,并調(diào)用.apply()
傳入聯(lián)結(jié)窗口函數(shù)進(jìn)行處理計(jì)算
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
對(duì)于JoinFunction
:
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT join(IN1 first, IN2 second) throws Exception;
}
使用時(shí)需要實(shí)現(xiàn)內(nèi)部的join
方法,有兩個(gè)參數(shù),分別表示兩條流中成對(duì)匹配的數(shù)據(jù)
JoinFunciton 并不是真正的“窗口函數(shù)”,它只是定義了窗口函數(shù)在調(diào)用時(shí)對(duì)匹配數(shù)據(jù)的具體處理邏輯
實(shí)現(xiàn)流程
兩條流的數(shù)據(jù)到來(lái)之后,首先會(huì)按照 key 分組、進(jìn)入對(duì)應(yīng)的窗口中存儲(chǔ);當(dāng)?shù)竭_(dá)窗口結(jié)束時(shí)間時(shí),算子會(huì)先統(tǒng)計(jì)出窗口內(nèi)兩條流的數(shù)據(jù)的所有組合,也就是對(duì)兩條流中的數(shù)據(jù)做一個(gè)笛卡爾積(相當(dāng)于表的交叉連接,cross join),然后進(jìn)行遍歷,把每一對(duì)匹配的數(shù)據(jù),作為參數(shù)(first,second)傳入 JoinFunction 的.join()方法進(jìn)行計(jì)算處理。所以窗口中每有一對(duì)數(shù)據(jù)成功聯(lián)結(jié)匹配,JoinFunction 的.join()方法就會(huì)被調(diào)用一次,并輸出一個(gè)結(jié)果
實(shí)例分析
// 基于窗口的join
public class WindowJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env
.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env
.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.join(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
return left + "=>" + right;
}
})
.print();
env.execute();
}
}
運(yùn)行結(jié)果如下:
間隔聯(lián)結(jié)(Interval Join)
統(tǒng)計(jì)一段時(shí)間內(nèi)的數(shù)據(jù)匹配情況,不應(yīng)該用滾動(dòng)窗口或滑動(dòng)窗口來(lái)處理——因?yàn)槠ヅ涞膬蓚€(gè)數(shù)據(jù)有可能剛好“卡在”窗口邊緣兩側(cè)
因此需要采用”間隔聯(lián)結(jié)“的操作,針對(duì)一條流的每個(gè)數(shù)據(jù),開(kāi)辟出其時(shí)間戳前后的一段時(shí)間間隔,看這期間是否有來(lái)自另一條流的數(shù)據(jù)匹配
原理
給定兩個(gè)時(shí)間點(diǎn),分別叫作間隔的“上界”(upperBound)和“下界”(lowerBound);
于是對(duì)于一條流(不妨叫作 A)中的任意一個(gè)數(shù)據(jù)元素 a,就可以開(kāi)辟一段時(shí)間間隔:[a.timestamp + lowerBound, a.timestamp + upperBound]
,即以 a 的時(shí)間戳為中心,下至下界點(diǎn)、上至上界點(diǎn)的一個(gè)閉區(qū)間:我們就把這段時(shí)間作為可以匹配另一條流數(shù)據(jù)的“窗口”范圍
所以對(duì)于另一條流(不妨叫 B)中的數(shù)據(jù)元素 b,如果它的時(shí)間戳落在了這個(gè)區(qū)間范圍內(nèi),a 和 b 就可以成功配對(duì),進(jìn)而進(jìn)行計(jì)算輸出結(jié)果
匹配條件為:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
舉例分析如下:
下方的流 A 去間隔聯(lián)結(jié)上方的流 B,所以基于 A 的每個(gè)數(shù)據(jù)元素,都可以開(kāi)辟一個(gè)間隔區(qū)間。我們這里設(shè)置下界為-2 毫秒,上界為 1 毫秒。于是對(duì)于時(shí)間戳為 2 的 A 中元素,它的可匹配區(qū)間就是[0, 3],流 B 中有時(shí)間戳為 0、1 的兩個(gè)元素落在這個(gè)范圍內(nèi),所以就可以得到匹配數(shù)據(jù)對(duì)(2, 0)和(2, 1)。同樣地,A 中時(shí)間戳為 3 的元素,可匹配區(qū)間為[1, 4],B 中只有時(shí)間戳為 1 的一個(gè)數(shù)據(jù)可以匹配,于是得到匹配數(shù)據(jù)對(duì)(3, 1)
代碼邏輯
基于KeyedStream進(jìn)行聯(lián)結(jié)操作:
①DataStream 通過(guò) keyBy 得到KeyedStream
②調(diào)用.intervalJoin()
來(lái)合并兩條流,得到的是一個(gè) IntervalJoin 類型
③通過(guò).between()
方法指定間隔的上下界,再調(diào)用.process()
方法,定義對(duì)匹配數(shù)據(jù)對(duì)的處理操作
④調(diào)用.process()
需要傳入一個(gè)處理函數(shù):ProcessJoinFunction
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
實(shí)例分析
// 基于間隔的join
public class IntervalJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
Tuple3.of("Mary", "order-1", 5000L),
Tuple3.of("Alice", "order-2", 5000L),
Tuple3.of("Bob", "order-3", 20000L),
Tuple3.of("Alice", "order-4", 20000L),
Tuple3.of("Cary", "order-5", 51000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
})
);
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Alice", "./prod?id=200", 3500L),
new Event("Bob", "./prod?id=2", 2500L),
new Event("Alice", "./prod?id=300", 36000L),
new Event("Bob", "./home", 30000L),
new Event("Bob", "./prod?id=1", 23000L),
new Event("Bob", "./prod?id=3", 33000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
orderStream.keyBy(data -> data.f0)
.intervalJoin(clickStream.keyBy(data -> data.user))
.between(Time.seconds(-5), Time.seconds(10))
.process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
@Override
public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
out.collect(right + " => " + left);
}
})
.print();
env.execute();
}}
運(yùn)行結(jié)果如下:
窗口同組聯(lián)結(jié)(Window CoGroup)
用法跟 window join
非常類似,也是將兩條流合并之后開(kāi)窗處理匹配的元素,調(diào)用時(shí)只需要將.join()
換為.coGroup()
就可以了
代碼邏輯
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
與 window join 的區(qū)別在于,調(diào)用.apply()方法定義具體操作時(shí),傳入的是一個(gè)CoGroupFunction
:
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
coGroup
方法與FlatJoinFunction
中.join()
方法類似,其中的三個(gè)參數(shù)依舊是兩條流中的數(shù)據(jù)以及用于輸出的收集器;但前兩個(gè)參數(shù)不再是單獨(dú)的配對(duì)數(shù)據(jù),而是可遍歷的數(shù)據(jù)集合;
因此該方法中直接把收集到的所有數(shù)據(jù)一次性傳入,然后自定義配對(duì)方式,不需要再計(jì)算窗口中兩條流數(shù)據(jù)集的笛卡爾積;
實(shí)例分析
// 基于窗口的join
public class CoGroupTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env
.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env
.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.coGroup(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
collector.collect(iter1 + "=>" + iter2);
}
})
.print();
env.execute();
}
}
運(yùn)行結(jié)果如下:
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-825459.html
學(xué)習(xí)課程鏈接:【尚硅谷】Flink1.13實(shí)戰(zhàn)教程(涵蓋所有flink-Java知識(shí)點(diǎn))_嗶哩嗶哩_bilibili??文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-825459.html
到了這里,關(guān)于Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!