寫過Spark批處理的應(yīng)該都知道,有一個廣播變量broadcast
這樣的一個算子,可以優(yōu)化我們計算的過程,有效的提高效率;同樣在Flink中也有broadcast
,簡單來說和Spark中的類似,但是有所區(qū)別,首先Spark中的broadcast
是靜態(tài)的數(shù)據(jù),而Flink中的broadcast
是動態(tài)的,也就是源源不斷的數(shù)據(jù)流.在Flink中會將廣播的數(shù)據(jù)存到state
中.
在Flink中主流數(shù)據(jù)可以獲取state
中的所有狀態(tài)數(shù)據(jù),使用過window
的應(yīng)該都清楚,當(dāng)兩個streamData
中的數(shù)據(jù)到達窗口的時間剛好錯過時就會發(fā)生關(guān)聯(lián)不上的情況,如window
是2S
,sreamData1
到達窗口的時間剛好卡在這個2S
窗口的尾端,而streamData
到達窗口時,這個窗口已經(jīng)結(jié)束了,這種情況就算這兩條數(shù)據(jù)有相同id
也無法進行關(guān)聯(lián)了.
但是broadcast
會將到達的數(shù)據(jù)都存儲在state
中,這樣主流到達的每一條數(shù)據(jù)都可以和state
中的廣播流數(shù)據(jù)進行關(guān)聯(lián)比較.
流程圖內(nèi)容可能不夠準確,只是為了看起來方便理解.文章來源地址http://www.zghlxwxcb.cn/news/detail-649926.html
- 數(shù)據(jù)源
# 主流數(shù)據(jù) ? ~ nc -lk 1234 101,瀏覽商品,2023-08-02 102,瀏覽商品,2023-08-02 103,查看商品價格,2023-08-04 101,商品加入購物車,2023-08-03 101,從購物車刪除商品,2023-08-03 102,下單,2023-08-02 102,申請延期發(fā)貨,2023-08-03 103,點擊商品詳情頁,2023-08-04 104,點擊收藏,2023-08-05 104,下單,2023-08-05 104,付款,2023-08-06 105,瀏覽商品,2023-08-07 106,瀏覽商品,2023-08-07 106,加入購物車,2023-08-08 107,瀏覽商品,2023-08-10
# 廣播流數(shù)據(jù) ? ~ nc -lk 5678 101,小明 102,張麗 103,公孫飛天 104,王二虎 106,李四 108,趙屋面
- 代碼
import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; /** * @Author: J * @Version: 1.0 * @CreateTime: 2023/8/11 * @Description: 多流操作-廣播流 **/ public class FlinkBroadcast { public static void main(String[] args) throws Exception { // 構(gòu)建流環(huán)境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設(shè)置并行度 env.setParallelism(3); // 數(shù)據(jù)集源1作為主流數(shù)據(jù)(用戶行為日志[id,behavior,date]) DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234); // 將字符串切割處理 SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() { }); // 數(shù)據(jù)源2作為廣播流數(shù)據(jù)(用戶信息(id,name)) DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678); // 將字符串切割處理 SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() { }); // 將廣播流數(shù)據(jù)源進行廣播 /** *參數(shù)說明 * 這里需要我們傳入一個MapStateDescriptor,其實就是一個Map結(jié)構(gòu)的數(shù)據(jù)<k,v> * <String, Tuple2<String, String>>,第一個String類型就是廣播流和主流連接的字段,在這個代碼中就是id,由實際業(yè)務(wù)決定 * <String, Tuple2<String, String>>,第二個Tuple2<String, String>就是實際廣播數(shù)據(jù)流的數(shù)據(jù),由實際業(yè)務(wù)決定 * "userInfo"就是給一個名字,這個自定義無強制要求 **/ // 先構(gòu)建一個狀態(tài),后面也會使用 MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() { })); BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState); // 將主流數(shù)據(jù)和廣播流數(shù)據(jù)使用connect連接 /** * 我們將數(shù)據(jù)轉(zhuǎn)變成廣播流之后,在Flink中也不知哪個數(shù)據(jù)流需要使用這個廣播流(userInfoBroadStream), * 這個時候就需要我們自己將主流數(shù)據(jù)和該廣播流數(shù)據(jù)進行連接 **/ BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream); /** * 在process()中有兩類函數(shù)供我們選擇,KeyedBroadcastProcessFunction和BroadcastProcessFunction, * 這里要注意當(dāng)"connectedStream"是KeyedStream時選擇KeyedBroadcastProcessFunction * 當(dāng)"connectedStream"不是KeyedStream時選擇BroadcastProcessFunction就可以. * 使用keyBy算子返回的就是KeyedStream **/ SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() { // 這個方法寫主流數(shù)據(jù)處理邏輯 @Override public void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception { /** * 要注意,這里我們最好從ReadOnlyContext來獲取廣播狀態(tài)數(shù)據(jù),因為獲取只讀的狀態(tài)數(shù)據(jù)可以保證數(shù)據(jù)的安全性, * 如果是通過成員變量的方式獲取可修改的狀態(tài)數(shù)據(jù),就會存在數(shù)據(jù)不安全的問題,如在代碼邏輯中出現(xiàn)了對狀態(tài)數(shù)據(jù) * 修改的代碼,那么共享此狀態(tài)的并行算子可能看到的狀態(tài)數(shù)據(jù)不一致,就會導(dǎo)致數(shù)據(jù)錯誤或者代碼報錯. * 而使用ReadOnlyContext就可以保證processElement這個方法中我們只對狀態(tài)數(shù)據(jù)進行讀取. **/ ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState); if (broadcastState != null) { // 通過主流中的ID作為key獲取廣播變量中的用戶信息 Tuple2<String, String> userInfo = broadcastState.get(value.f0); // 輸出數(shù)據(jù)的形式(id,behavior,date,name) if (userInfo == null) { out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL"); } else { out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1); } } else { out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL"); } } // 這個方法寫廣播流數(shù)據(jù)處理邏輯 @Override public void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception { // 使用Context獲取狀態(tài) BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState); // 將數(shù)據(jù)存入到狀態(tài)中 broadcastState.put(value.f0, value); } }); // 打印結(jié)果 resultStream.print(); env.execute("Flink broadcast"); } }
- 結(jié)果
代碼內(nèi)容就不進行詳細解釋了,注釋基本都寫清楚了,如有疑問可評論提問,共同探討.3> 101,瀏覽商品,2023-08-02,小明 3> 101,商品加入購物車,2023-08-03,小明 3> 102,申請延期發(fā)貨,2023-08-03,張麗 3> 104,下單,2023-08-05,王二虎 3> 106,瀏覽商品,2023-08-07,李四 1> 102,瀏覽商品,2023-08-02,張麗 1> 101,從購物車刪除商品,2023-08-03,小明 1> 103,點擊商品詳情頁,2023-08-04,公孫飛天 1> 104,付款,2023-08-06,王二虎 1> 106,加入購物車,2023-08-08,李四 2> 103,查看商品價格,2023-08-04,公孫飛天 2> 102,下單,2023-08-02,張麗 2> 104,點擊收藏,2023-08-05,王二虎 2> 105,瀏覽商品,2023-08-07,NULL 2> 107,瀏覽商品,2023-08-10,NULL
文章來源:http://www.zghlxwxcb.cn/news/detail-649926.html
到了這里,關(guān)于Flink多流處理之Broadcast(廣播變量)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!