国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink多流處理之Broadcast(廣播變量)

這篇具有很好參考價值的文章主要介紹了Flink多流處理之Broadcast(廣播變量)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

寫過Spark批處理的應(yīng)該都知道,有一個廣播變量broadcast這樣的一個算子,可以優(yōu)化我們計算的過程,有效的提高效率;同樣在Flink中也有broadcast,簡單來說和Spark中的類似,但是有所區(qū)別,首先Spark中的broadcast是靜態(tài)的數(shù)據(jù),而Flink中的broadcast是動態(tài)的,也就是源源不斷的數(shù)據(jù)流.在Flink中會將廣播的數(shù)據(jù)存到state中.
Flink多流處理之Broadcast(廣播變量),flink,大數(shù)據(jù),java
在Flink中主流數(shù)據(jù)可以獲取state中的所有狀態(tài)數(shù)據(jù),使用過window的應(yīng)該都清楚,當(dāng)兩個streamData中的數(shù)據(jù)到達窗口的時間剛好錯過時就會發(fā)生關(guān)聯(lián)不上的情況,如window2S,sreamData1到達窗口的時間剛好卡在這個2S窗口的尾端,而streamData到達窗口時,這個窗口已經(jīng)結(jié)束了,這種情況就算這兩條數(shù)據(jù)有相同id也無法進行關(guān)聯(lián)了.
但是broadcast會將到達的數(shù)據(jù)都存儲在state中,這樣主流到達的每一條數(shù)據(jù)都可以和state中的廣播流數(shù)據(jù)進行關(guān)聯(lián)比較.
Flink多流處理之Broadcast(廣播變量),flink,大數(shù)據(jù),java
流程圖內(nèi)容可能不夠準確,只是為了看起來方便理解.文章來源地址http://www.zghlxwxcb.cn/news/detail-649926.html

  • 數(shù)據(jù)源
    # 主流數(shù)據(jù)
    ?  ~ nc -lk 1234
    101,瀏覽商品,2023-08-02
    102,瀏覽商品,2023-08-02
    103,查看商品價格,2023-08-04
    101,商品加入購物車,2023-08-03
    101,從購物車刪除商品,2023-08-03
    102,下單,2023-08-02
    102,申請延期發(fā)貨,2023-08-03
    103,點擊商品詳情頁,2023-08-04
    104,點擊收藏,2023-08-05
    104,下單,2023-08-05
    104,付款,2023-08-06
    105,瀏覽商品,2023-08-07
    106,瀏覽商品,2023-08-07
    106,加入購物車,2023-08-08
    107,瀏覽商品,2023-08-10
    
    # 廣播流數(shù)據(jù)
    ?  ~ nc -lk 5678
    101,小明
    102,張麗
    103,公孫飛天
    104,王二虎
    106,李四
    108,趙屋面
    
  • 代碼
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/11
     * @Description: 多流操作-廣播流
     **/
    public class FlinkBroadcast {
        public static void main(String[] args) throws Exception {
            // 構(gòu)建流環(huán)境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 設(shè)置并行度
            env.setParallelism(3);
            // 數(shù)據(jù)集源1作為主流數(shù)據(jù)(用戶行為日志[id,behavior,date])
            DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);
            // 將字符串切割處理
            SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {
            });
            // 數(shù)據(jù)源2作為廣播流數(shù)據(jù)(用戶信息(id,name))
            DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);
            // 將字符串切割處理
            SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {
            });
            // 將廣播流數(shù)據(jù)源進行廣播
            /**
             *參數(shù)說明
             * 這里需要我們傳入一個MapStateDescriptor,其實就是一個Map結(jié)構(gòu)的數(shù)據(jù)<k,v>
             * <String, Tuple2<String, String>>,第一個String類型就是廣播流和主流連接的字段,在這個代碼中就是id,由實際業(yè)務(wù)決定
             * <String, Tuple2<String, String>>,第二個Tuple2<String, String>就是實際廣播數(shù)據(jù)流的數(shù)據(jù),由實際業(yè)務(wù)決定
             * "userInfo"就是給一個名字,這個自定義無強制要求
             **/
            // 先構(gòu)建一個狀態(tài),后面也會使用
            MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
            }));
            BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);
    
            // 將主流數(shù)據(jù)和廣播流數(shù)據(jù)使用connect連接
            /**
             * 我們將數(shù)據(jù)轉(zhuǎn)變成廣播流之后,在Flink中也不知哪個數(shù)據(jù)流需要使用這個廣播流(userInfoBroadStream),
             * 這個時候就需要我們自己將主流數(shù)據(jù)和該廣播流數(shù)據(jù)進行連接
             **/
            BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);
    
            /**
             * 在process()中有兩類函數(shù)供我們選擇,KeyedBroadcastProcessFunction和BroadcastProcessFunction,
             * 這里要注意當(dāng)"connectedStream"是KeyedStream時選擇KeyedBroadcastProcessFunction
             * 當(dāng)"connectedStream"不是KeyedStream時選擇BroadcastProcessFunction就可以.
             * 使用keyBy算子返回的就是KeyedStream
             **/
            SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {
    
                // 這個方法寫主流數(shù)據(jù)處理邏輯
                @Override
                public void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                    /**
                     * 要注意,這里我們最好從ReadOnlyContext來獲取廣播狀態(tài)數(shù)據(jù),因為獲取只讀的狀態(tài)數(shù)據(jù)可以保證數(shù)據(jù)的安全性,
                     * 如果是通過成員變量的方式獲取可修改的狀態(tài)數(shù)據(jù),就會存在數(shù)據(jù)不安全的問題,如在代碼邏輯中出現(xiàn)了對狀態(tài)數(shù)據(jù)
                     * 修改的代碼,那么共享此狀態(tài)的并行算子可能看到的狀態(tài)數(shù)據(jù)不一致,就會導(dǎo)致數(shù)據(jù)錯誤或者代碼報錯.
                     * 而使用ReadOnlyContext就可以保證processElement這個方法中我們只對狀態(tài)數(shù)據(jù)進行讀取.
                     **/
                    ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);
                    if (broadcastState != null) {
                        // 通過主流中的ID作為key獲取廣播變量中的用戶信息
                        Tuple2<String, String> userInfo = broadcastState.get(value.f0);
                        // 輸出數(shù)據(jù)的形式(id,behavior,date,name)
                        if (userInfo == null) {
                            out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");
                        } else {
                            out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);
                        }
                    } else {
                        out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");
                    }
    
                }
    
                // 這個方法寫廣播流數(shù)據(jù)處理邏輯
                @Override
                public void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {
                    // 使用Context獲取狀態(tài)
                    BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);
    
                    // 將數(shù)據(jù)存入到狀態(tài)中
                    broadcastState.put(value.f0, value);
                }
            });
            // 打印結(jié)果
            resultStream.print();
    
            env.execute("Flink broadcast");
        }
    }
    
  • 結(jié)果
    3> 101,瀏覽商品,2023-08-02,小明
    3> 101,商品加入購物車,2023-08-03,小明
    3> 102,申請延期發(fā)貨,2023-08-03,張麗
    3> 104,下單,2023-08-05,王二虎
    3> 106,瀏覽商品,2023-08-07,李四
    1> 102,瀏覽商品,2023-08-02,張麗
    1> 101,從購物車刪除商品,2023-08-03,小明
    1> 103,點擊商品詳情頁,2023-08-04,公孫飛天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入購物車,2023-08-08,李四
    2> 103,查看商品價格,2023-08-04,公孫飛天
    2> 102,下單,2023-08-02,張麗
    2> 104,點擊收藏,2023-08-05,王二虎
    2> 105,瀏覽商品,2023-08-07,NULL
    2> 107,瀏覽商品,2023-08-10,NULL
    
    代碼內(nèi)容就不進行詳細解釋了,注釋基本都寫清楚了,如有疑問可評論提問,共同探討.

到了這里,關(guān)于Flink多流處理之Broadcast(廣播變量)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink多流處理之connect拼接流

    Flink多流處理之connect拼接流

    Flink中的拼接流 connect 的使用其實非常簡單,就是 leftStream.connect(rightStream) 的方式,但是有一點我們需要清楚,使用 connect 后并不是將兩個流給串聯(lián)起來了,而是將左流和右流建立一個聯(lián)系,作為一個大的流,并且這個大的流可以使用相同的邏輯處理 leftStream 和 rightStream ,也可以使用不

    2024年02月13日
    瀏覽(37)
  • Flink學(xué)習(xí)——處理函數(shù)ProcessFunction及多流轉(zhuǎn)換

    Flink學(xué)習(xí)——處理函數(shù)ProcessFunction及多流轉(zhuǎn)換

    ? ? ? ? 在DataStream的更底層,我們可以不定義任何具體的算子(如map(),filter()等)二只提煉出一個統(tǒng)一的“處理”(process)操作?。它是所有轉(zhuǎn)換算子的概括性的表達??梢宰远x處理邏輯。 ? ? ? ? 所以這一層接口就被叫做“ 處理函數(shù) ”( process function ) ? ? ? ? 處理

    2024年02月14日
    瀏覽(17)
  • 大數(shù)據(jù)-Spark批處理實用廣播Broadcast構(gòu)建一個全局緩存Cache

    大數(shù)據(jù)-Spark批處理實用廣播Broadcast構(gòu)建一個全局緩存Cache

    在Spark中,broadcast是一種優(yōu)化技術(shù),它可以將一個只讀變量緩存到每個節(jié)點上,以便在執(zhí)行任務(wù)時使用。這樣可以避免在每個任務(wù)中重復(fù)傳輸數(shù)據(jù)。

    2024年02月15日
    瀏覽(27)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(8) - 完整版

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(31)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(5) - kafka

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(34)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(2) - jdbc/mysql

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(28)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(1) - File、Socket、console

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月01日
    瀏覽(32)
  • Flink (八) --------- 多流轉(zhuǎn)換

    Flink (八) --------- 多流轉(zhuǎn)換

    無論是基本的簡單轉(zhuǎn)換和聚合,還是基于窗口的計算,我們都是針對一條流上的數(shù)據(jù)進行處理的。而在實際應(yīng)用中,可能需要將不同來源的數(shù)據(jù)連接合并在一起處理,也有可能需要將一條流拆分開,所以經(jīng)常會有對多條流進行處理的場景。本章我們就來討論 Flink 中對多條流進

    2023年04月09日
    瀏覽(23)
  • Flink多流轉(zhuǎn)換(1)—— 分流&合流

    Flink多流轉(zhuǎn)換(1)—— 分流&合流

    目錄 分流 代碼示例 使用側(cè)輸出流 合流 聯(lián)合(Union) 連接(Connect) 簡單劃分的話,多流轉(zhuǎn)換可以分為“分流”和“合流”兩大類 目前分流的操作一般是通過側(cè)輸出流(side output)來實現(xiàn),而合流的算子比較豐富,根據(jù)不同的需求可以調(diào)用 union、connect、join 以及 coGroup 等接口

    2024年01月24日
    瀏覽(22)
  • Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié)

    Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié)

    雙流連結(jié)(Join):根據(jù)某個字段的值將數(shù)據(jù)聯(lián)結(jié)起來,“配對”去做處理 可以 定義時間窗口 ,并將兩條流中 共享一個公共鍵 (key)的數(shù)據(jù)放在窗口中進行配對處理 首先需要調(diào)用 DataStream 的 .join() 方法來合并兩條流,得到一個 JoinedStreams;接著通過 .where() 和 .equalTo() 方法指

    2024年02月19日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包