目錄
分流
代碼示例
使用側(cè)輸出流
合流
聯(lián)合(Union)
連接(Connect)
簡單劃分的話,多流轉(zhuǎn)換可以分為“分流”和“合流”兩大類
目前分流的操作一般是通過側(cè)輸出流(side output)來實現(xiàn),而合流的算子比較豐富,根據(jù)不同的需求可以調(diào)用 union、connect、join 以及 coGroup 等接口進行連接合并操作
分流
將一條數(shù)據(jù)流拆分成完全獨立的兩條、甚至多條流。也就是基于一個DataStream
,得到完全平等的多個子DataStream
代碼示例
調(diào)用.filter()
方法進行篩選,將符合條件的數(shù)據(jù)揀選出來放到對應(yīng)的流里
public class SplitStreamByFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
// 篩選Mary的瀏覽行為放入MaryStream流中
DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}
});
// 篩選Bob的購買行為放入BobStream流中
DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Bob");
}
});
// 篩選其他人的瀏覽行為放入elseStream流中
DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return !value.user.equals("Mary") && !value.user.equals("Bob") ;
}
});
MaryStream.print("Mary pv");
BobStream.print("Bob pv");
elseStream.print("else pv");
env.execute();
}
}
缺點:上述操作將原始流復(fù)制了三份,對每一份分別進行篩選,因此代碼冗余,不夠高效
解決:①.split()
方法(但限制了數(shù)據(jù)類型轉(zhuǎn)換,已經(jīng)廢棄)
②測輸出流
使用側(cè)輸出流
改進后的代碼如下:
public class SplitStreamByOutputTag {
// 定義輸出標(biāo)簽,側(cè)輸出流的數(shù)據(jù)類型為三元組(user, url, timestamp)
private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
if (value.user.equals("Mary")){
ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
} else if (value.user.equals("Bob")){
ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
} else {
out.collect(value);
}
}
});
processedStream.getSideOutput(MaryTag).print("Mary pv");
processedStream.getSideOutput(BobTag).print("Bob pv");
processedStream.print("else");
env.execute();
}
}
①定義OutputTag作為標(biāo)簽
②使用ctx.output
方法將符合篩選條件的數(shù)據(jù)寫入側(cè)輸出流
③使用getSideOutput
方法從側(cè)輸出流中獲得數(shù)據(jù)
合流
對于來源不同的多條流中的數(shù)據(jù)進行聯(lián)合處理(與分流相比,合流操作更為普遍)
聯(lián)合(Union)
直接將多條流合在一起(要求必須流中的數(shù)據(jù)類型必須相同),合并之后的新流會包括所有流中的元素,數(shù)據(jù)類型不變
操作:基于 DataStream 直接調(diào)用.union()
方法
參數(shù):其他 DataStream
返回值:一個 DataStream
stream1.union(stream2, stream3, ...)
水位線時效性:多流合并時處理的時效性是以最慢的那個流為準(zhǔn)的(多條流的合并,某種意義上也可以看作是多個并行任務(wù)向同一個下游任務(wù)匯合的過程)
代碼示例:
public class UnionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream1.print("stream1");
SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream2.print("stream2");
// 合并兩條流
stream1.union(stream2)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
out.collect("水位線:" + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
測試:
分別在兩臺機器上輸入以下數(shù)據(jù):
hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000
水位線的推進如下:
連接(Connect)
連接操作允許流的數(shù)據(jù)類型不同
連接流(ConnectedStreams)
連接流可以看成是兩條流形式上的“統(tǒng)一”,被放在了一個同一個流中;事實上內(nèi)部仍保持各自的數(shù)據(jù)形式不變,彼此之間是相互獨立的
要想得到新的 DataStream,還需要進一步定義一個“同處理”(co-process
)轉(zhuǎn)換操作,用來說明對于不同來源、不同類型的數(shù)據(jù),怎樣分別進行處理轉(zhuǎn)換、得到統(tǒng)一的輸出類型
代碼實現(xiàn)
①基于一條 DataStream 調(diào)用.connect()方法,傳入另外一條 DataStream 作為參數(shù),將兩條流連接起來,得到一個 ConnectedStreams
②調(diào)用同處理方法得到 DataStream(可以的調(diào)用的同處理方法有.map()
/.flatMap()
,以及.process()
方法)
public class ConnectTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream1 = env.fromElements(1,2,3);
DataStream<Long> stream2 = env.fromElements(1L,2L,3L);
ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
@Override
public String map1(Integer value) {
return "Integer: " + value;
}
@Override
public String map2(Long value) {
return "Long: " + value;
}
});
result.print();
env.execute();
}
}
①ConnectedStreams有兩個類型參數(shù),分別是stream1和stream2的類型;
②map方法中實現(xiàn)了一個CoMapFunction
,表示分別對兩條流中的數(shù)據(jù)執(zhí)行 map 操作
類型參數(shù)<IN1, IN2, OUT>
,分別表示第一條流、第二條流,以及合并后的流中的數(shù)據(jù)類型
這里我們將一條 Integer 流和一條 Long 流合并,轉(zhuǎn)換成 String 輸出。所以當(dāng)遇到第一條流輸入的整型值時,調(diào)用.map1();而遇到第二條流輸入的長整型數(shù)據(jù)時,調(diào)用.map2():最終都轉(zhuǎn)換為字符串輸出,合并成了一條字符串流
③補充:ConnectedStreams 也可以直接調(diào)用.keyBy()
進行按鍵分區(qū)的操作,得到的還是一個 ConnectedStreams
connectedStreams.keyBy(keySelector1, keySelector2);
傳入的參數(shù)是兩條流中各自的鍵選擇器
這樣的操作就是把兩條流中key相同的數(shù)據(jù)放到了一起,然后針對來源的流各自進行處理;
同樣也可以在合并之前先使用KeyBy進行分區(qū),然后基于兩條KeyedStream進行連接操作;
要注意兩條流定義的鍵的類型必須相同,否則會拋出異常
CoProcessFunction
對于連接流 ConnectedStreams 的處理操作,需要分別定義對兩條流的處理轉(zhuǎn)換,因此接口中就會有兩個相同的方法需要實現(xiàn),用數(shù)字“1”“2”區(qū)分,在兩條流中的數(shù)據(jù)到來時分別調(diào)用。我們把這種接口叫作“協(xié)同處理函數(shù)”(co-process function)
例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction
CoProcessFunction源碼如下:
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
public abstract class Context {...}
...
}
簡單示例:實現(xiàn)一個實時對賬的需求,也就是app 的支付操作和第三方的支付操作的一個雙流 Join。App 的支付事件和第三方的支付事件將會互相等待 5 秒鐘,如果等不來對應(yīng)的支付事件,那么就輸出報警信息
// 實時對賬
public class BillCheckExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 來自app的支付日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
})
);
// 來自第三方支付平臺的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
@Override
public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
return element.f3;
}
})
);
// 檢測同一支付單在兩條流中是否匹配,不匹配就報警
appStream.connect(thirdpartStream)
.keyBy(data -> data.f0, data -> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
// 自定義實現(xiàn)CoProcessFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{
// 定義狀態(tài)變量,用來保存已經(jīng)到達的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
);
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
);
}
@Override
public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
// 看另一條流中事件是否來過
if (thirdPartyEventState.value() != null){
out.collect("對賬成功:" + value + " " + thirdPartyEventState.value());
// 清空狀態(tài)
thirdPartyEventState.clear();
} else {
// 更新狀態(tài)
appEventState.update(value);
// 注冊一個5秒后的定時器,開始等待另一條流的事件
ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null){
out.collect("對賬成功:" + appEventState.value() + " " + value);
// 清空狀態(tài)
appEventState.clear();
} else {
// 更新狀態(tài)
thirdPartyEventState.update(value);
// 注冊一個5秒后的定時器,開始等待另一條流的事件
ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定時器觸發(fā),判斷狀態(tài),如果某個狀態(tài)不為空,說明另一條流中事件沒來
if (appEventState.value() != null) {
out.collect("對賬失敗:" + appEventState.value() + " " + "第三方支付平臺信息未到");
}
if (thirdPartyEventState.value() != null) {
out.collect("對賬失?。? + thirdPartyEventState.value() + " " + "app信息未到");
}
appEventState.clear();
thirdPartyEventState.clear();
}
}}
運行結(jié)果如下:
運行結(jié)果解析:
①在CoProcessFunction
的實現(xiàn)中,聲明了兩個狀態(tài)變量用來保存App的支付信息和第三方的支付信息
②App支付信息到達之后,觸發(fā)processElement1
中的操作,檢查第三方的支付信息是否已經(jīng)到達(如果先到達會保存在相應(yīng)的狀態(tài)變量中);如果已經(jīng)到達,則對賬成功;如果沒有到達,則等待5s,仍未到達則對賬失敗;
③第三方支付信息到達后,流程同②
④對于order-1,時間戳為1000的數(shù)據(jù)(App)到達后,第三方支付信息未到達,等待5s,接著時間戳未3000的數(shù)據(jù)(第三方)到達后,發(fā)現(xiàn)App支付信息已經(jīng)到達,因此對賬成功
⑤對于order-2和order-3,均是等待5s后沒有檢測到App(第三方)數(shù)據(jù)到達而發(fā)出報警信息
廣播連接流(BroadcastConnectedStream)
DataStream 調(diào)用.connect()
方法時可以傳入一個廣播流(BroadcastConnectedStream)
這種連接方式往往用在需要動態(tài)定義某些規(guī)則或配置的場景。因為規(guī)則是實時變動的,所以我們可以用一個單獨的流來獲取規(guī)則數(shù)據(jù);而這些規(guī)則或配置是對整個應(yīng)用全局有效的,所以不能只把這數(shù)據(jù)傳遞給一個下游并行子任務(wù)處理,而是要“廣播”(broadcast)給所有的并行子任務(wù)。而下游子任務(wù)收到廣播出來的規(guī)則,會把它保存成一個狀態(tài),這就是所謂的“廣播狀態(tài)”(broadcast state)
如何創(chuàng)建廣播流
基于DataStream調(diào)用.broadcast()
方法,傳入一個“映射狀態(tài)描述器”(MapStateDescriptor
),說明狀態(tài)的名稱和類型;
因為廣播狀態(tài)底層是用一個“映射”(map)結(jié)構(gòu)來保存的
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
數(shù)據(jù)流和廣播流的連接
得到“廣播連接流”(BroadcastConnectedStream),然后基于廣播連接流調(diào)用.process()
方法,就可以同時獲取規(guī)則和數(shù)據(jù),進行動態(tài)處理
DataStream<String> output = stream
.connect(ruleBroadcastStream)
.process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction
BroadcastProcessFunction 與 CoProcessFunction 類似,同樣是一個抽象類,需要實現(xiàn)兩個方法,針對合并的兩條流中元素分別定義處理操作。區(qū)別在于這里一條流是正常處理數(shù)據(jù),而另一條流則是要用新規(guī)則來更新廣播狀態(tài),所以對應(yīng)的兩個方法叫作.processElement()
和.processBroadcastElement()
文章來源:http://www.zghlxwxcb.cn/news/detail-821336.html
學(xué)習(xí)課程鏈接:【尚硅谷】Flink1.13實戰(zhàn)教程(涵蓋所有flink-Java知識點)_嗶哩嗶哩_bilibili?文章來源地址http://www.zghlxwxcb.cn/news/detail-821336.html
到了這里,關(guān)于Flink多流轉(zhuǎn)換(1)—— 分流&合流的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!