国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Flink實(shí)戰(zhàn)】Flink中的分流

這篇具有很好參考價(jià)值的文章主要介紹了【Flink實(shí)戰(zhàn)】Flink中的分流。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink中的分流

在Flink中將數(shù)據(jù)流切分為多個(gè)子數(shù)據(jù)流,子數(shù)據(jù)流稱(chēng)為”旁路輸出數(shù)據(jù)流“。

拆分流數(shù)據(jù)的方式

  • Split,已經(jīng)廢棄,不推薦使用
  • Fliter
  • SideOut,推薦使用

Fliter分流的Java實(shí)現(xiàn)

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指標(biāo)明細(xì)
        DataStream<String> detailMessage = KafkaConfigUtil.buildSource(env)
                .map((MapFunction<String, String>) kafkaMessage -> {
                    JSONObject jsonobject = null;
                    try {
                        jsonobject = JSONObject.parseObject(kafkaMessage);
                    } catch (Exception e) {
                        LOG.warn("報(bào)文格式錯(cuò)誤:{}", kafkaMessage);
                    }
                    if (null == jsonobject || jsonobject.isEmpty()) {
                        LOG.warn("報(bào)文內(nèi)容不合法:{}", JSONObject.toJSONString(jsonobject));
                    } else {
                        if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
                                && !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
                            LOG.warn("報(bào)文所屬服務(wù)不存在:{}", JSONObject.toJSONString(jsonobject));
                        }
                    }
                    return JSONObject.toJSONString(jsonobject);
                });
        // 將原始流中包含demo的數(shù)據(jù)篩選出來(lái)
        DataStream<String> diagnosisMessages = detailMessage
                .filter((FilterFunction<String>) kafkaMessage -> (kafkaMessage.contains("demo")))
                .map((MapFunction<String, String>) sparkMessage -> {
                    // 為達(dá)到實(shí)驗(yàn)效果,進(jìn)行日志輸出
                    LOG.info("[is demo message]:{}", sparkMessage);
                    return sparkMessage;
                });

        env.execute("Flink Streaming Java API Skeleton");
    }

SideOut分流的Java實(shí)現(xiàn)

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        System.out.println("【SideOutputDemo】");
        
        // 指標(biāo)明細(xì)
        DataStream<String> mainMessage = KafkaConfigUtil.buildSource(env)
                .map((MapFunction<String, String>) kafkaMessage -> {
                    JSONObject jsonobject = null;
                    try {
                        jsonobject = JSONObject.parseObject(kafkaMessage);
                    } catch (Exception e) {
                        LOG.warn("報(bào)文格式錯(cuò)誤:{}", kafkaMessage);
                    }
                    if (null == jsonobject || jsonobject.isEmpty()) {
                        LOG.warn("報(bào)文內(nèi)容不合法:{}", JSONObject.toJSONString(jsonobject));
                    } else {
                        if (!EventsServiceEnum.MapReduce.getValue().equals(jsonobject.get("service"))
                                && !EventsServiceEnum.Spark.getValue().equals(jsonobject.get("service"))) {
                            LOG.warn("報(bào)文所屬服務(wù)不存在:{}", JSONObject.toJSONString(jsonobject));
                        }
                    }
                    return JSONObject.toJSONString(jsonobject);
                });

        // 定義一個(gè)切分(旁路輸出)
        final OutputTag<String> outputTag = new OutputTag<String>("Spark_END") {
        };

        SingleOutputStreamOperator<String> sp = mainMessage
                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(
                            String s
                            , Context context
                            , Collector<String> collector) throws Exception {
                        // 向常規(guī)流(主流)中添加數(shù)據(jù)
                        collector.collect(s);
                        // 向旁路輸出流中添加數(shù)據(jù)
                        if (s.contains(AppPhaseEnum.Spark_APP_End.getValue())) {
                            context.output(outputTag, s);
                        }
                    }
                });
        sp.map((MapFunction<String, String>) sparkMessage -> {
            LOG.info("主流的數(shù)據(jù): {}", sparkMessage);
            return sparkMessage;
        });

        DataStream<String> tag = sp.getSideOutput(outputTag);
        tag.map((MapFunction<String, String>) sparkMessage -> {
            LOG.info("旁路[{}]的數(shù)據(jù): {}", outputTag.getId(), sparkMessage);
            return sparkMessage;
        });

        env.execute("Flink Streaming Java API Skeleton");
    }

SideOutPut 是 Flink 框架推薦的分流方法,在使用 SideOutPut 時(shí),需要按照以下步驟進(jìn)行:

  1. 為每個(gè)分支流定義一個(gè) SideOutPut。

  2. 為定義好的 SideOutPut發(fā)出數(shù)據(jù)。只有以下特定的函數(shù)才能通過(guò)Context上下文對(duì)象,向旁路輸出的SideOutPut發(fā)送數(shù)據(jù)。

    1. ProcessFunction:處理函數(shù),單流輸入函數(shù)
    2. KeyedProcessFunction:處理函數(shù),單流輸入函數(shù)
    3. CoProcessFunction:處理函數(shù),雙流流輸入函數(shù)
    4. KeyedCoProcessFunction:處理函數(shù),雙流流輸入函數(shù)
    5. ProcessWindowFunction:窗口函數(shù),全量計(jì)算函數(shù)
    6. ProcessAllWindowFunction:窗口函數(shù),全量計(jì)算函數(shù),它與 ProcessWindowFunction 類(lèi)似,但是它會(huì)對(duì)窗口中的所有數(shù)據(jù)進(jìn)行處理,而不是僅處理觸發(fā)窗口計(jì)算的數(shù)據(jù)。

    例子中使用ProcessFunction實(shí)現(xiàn)流拆分。

  3. 根據(jù)SideOutPut 的ID標(biāo)識(shí)獲取旁路輸出流,進(jìn)行數(shù)據(jù)繼續(xù)處理。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-687699.html

拆分方式 對(duì)比
Split 不支持鏈?zhǔn)讲鸱郑蟹值玫降牧?,是不能進(jìn)行再次切分的
Fliter 多分支流,需要多次遍歷原始流進(jìn)行篩選。浪費(fèi)集群的資源
SideOut 以多次進(jìn)行拆分的,支持鏈?zhǔn)讲鸱帧?/td>

到了這里,關(guān)于【Flink實(shí)戰(zhàn)】Flink中的分流的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)轉(zhuǎn)換算子(Transformation)【分流】

    所謂 “分流” ,就是將一條數(shù)據(jù)流拆分成完全獨(dú)立的兩條、甚至多條流。也就是基于一個(gè) DataStream ,定義一些篩選條件,將符合條件的數(shù)據(jù)揀選出來(lái)放到對(duì)應(yīng)的流里。 其實(shí)根據(jù)條件篩選數(shù)據(jù)的需求,本身非常容易實(shí)現(xiàn):只要針對(duì)同一條流多次獨(dú)立調(diào)用 .filter() 方法進(jìn)行篩選

    2024年01月24日
    瀏覽(17)
  • Flink分流,合流,狀態(tài),checkpoint和精準(zhǔn)一次筆記

    第8章 分流 1.使用側(cè)輸出流 2.合流 2.1 union :使用 ProcessFunction 處理合流后的數(shù)據(jù) 2.2 Connect : 兩條流的格式可以不一樣, map操作使用CoMapFunction,process 傳入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 進(jìn)行了按鍵分區(qū),那么要傳入的就是 KeyedBroadcastProcessFunction; 如果沒(méi)有按鍵分

    2024年02月12日
    瀏覽(22)
  • Flink---5、聚合算子、用戶自定義函數(shù)、物理分區(qū)算子、分流、合流

    Flink---5、聚合算子、用戶自定義函數(shù)、物理分區(qū)算子、分流、合流

    ?????????????????????? 星光下的趕路人star的個(gè)人主頁(yè) ?????????????????????? 欲買(mǎi)桂花同載酒,終不似,少年游 計(jì)算的結(jié)果不僅依賴(lài)當(dāng)前數(shù)據(jù),還跟之前的數(shù)據(jù)有關(guān),相當(dāng)于要把所有數(shù)據(jù)聚在一起進(jìn)行匯總合并—這就是

    2024年02月07日
    瀏覽(29)
  • 實(shí)戰(zhàn)Flink Java api消費(fèi)kafka實(shí)時(shí)數(shù)據(jù)落盤(pán)HDFS

    實(shí)戰(zhàn)Flink Java api消費(fèi)kafka實(shí)時(shí)數(shù)據(jù)落盤(pán)HDFS

    在Java api中,使用flink本地模式,消費(fèi)kafka主題,并直接將數(shù)據(jù)存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 為了完成 Flink 從 Kafka 消費(fèi)數(shù)據(jù)并實(shí)時(shí)寫(xiě)入 HDFS 的需求,通常需要啟動(dòng)以下組件: 確保 Zookeeper 在運(yùn)行,因?yàn)?Flink 的 Kafka Consumer 需要依賴(lài) Zookeeper。 確保 Kafka Serve

    2024年01月24日
    瀏覽(29)
  • 實(shí)戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫(kù)獲取增量變更數(shù)據(jù)

    目錄 前言: 1、springboot引入依賴(lài): 2、yml配置文件 3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽(tīng)器 4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對(duì)象 5、CDC 數(shù)據(jù)實(shí)體類(lèi) 6、自定義ApplicationContextUtil 7、自定義sink 交由spring管理,處理變更數(shù)據(jù) ? ? ? ? 我的場(chǎng)景是從SQL Server數(shù)據(jù)庫(kù)獲取指定表的增量數(shù)據(jù),查

    2024年02月10日
    瀏覽(24)
  • 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】

    尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程從入門(mén)到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實(shí)

    2024年02月09日
    瀏覽(51)
  • 【Flink實(shí)戰(zhàn)】Flink 商品銷(xiāo)量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X

    【Flink實(shí)戰(zhàn)】Flink 商品銷(xiāo)量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X

    ?? 作者 :“大數(shù)據(jù)小禪” ?? 文章簡(jiǎn)介 :Flink 商品銷(xiāo)量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X ?? 歡迎小伙伴們 點(diǎn)贊 ??、 收藏 ?、 留言 ?? Flink怎么操作Redis Flink怎么操作redis? 方式一:自定義sink 方式二:使用connector Redis Sink 核心是RedisMapper 是一個(gè)接口,使用時(shí)要

    2024年02月06日
    瀏覽(47)
  • 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記02【Flink部署】

    尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記02【Flink部署】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程從入門(mén)到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實(shí)

    2024年02月11日
    瀏覽(31)
  • 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記03【Flink運(yùn)行時(shí)架構(gòu)】

    尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記03【Flink運(yùn)行時(shí)架構(gòu)】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程從入門(mén)到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實(shí)戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實(shí)

    2024年02月16日
    瀏覽(44)
  • 大數(shù)據(jù)Flink(五十二):Flink中的批和流以及性能比較

    大數(shù)據(jù)Flink(五十二):Flink中的批和流以及性能比較

    文章目錄 Flink中的批和流以及性能比較 ??????????????一、Flink中的批和流

    2024年02月15日
    瀏覽(15)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包