国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作

這篇具有很好參考價值的文章主要介紹了flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink學習筆記

前言:今天是學習 flink 第三天啦,學習了高級 api 開發(fā)中11 中重要算子,查找了好多資料理解其中的原理,以及敲了好幾個小時代碼抓緊理解原理。
Tips:雖然學習進度有點慢,希望自己繼續(xù)努力,不斷猜想 api 原理,通過敲代碼不斷印證自己的想法,轉碼大數(shù)據(jù)之路一定會越來越好的!

二、Flink 流批一體 API 開發(fā)

2. Transfromation

2.1 Map

將 DataStream 中的每一個元素轉化為另一個元素,類似于之前 wordcount 案例中 word—> (word,1)

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

案例:使用map操作,讀取 apache.log 文件中的字符串數(shù)據(jù)轉換成 ApacheLogEvent 對象

# 日志數(shù)據(jù)
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;

/**
 * @author lql
 * @time 2024-02-13 19:44:52
 * @description TODO:使用map操作,讀取apache.log文件中的字符串數(shù)據(jù)轉換成ApacheLogEvent對象
 */
public class MapDemo {
    public static void main(String[] args) throws Exception {
        /**
         * ?獲取ExecutionEnvironment運行環(huán)境
         * ?使用readTextFile讀取數(shù)據(jù)構建數(shù)據(jù)源
         * ?創(chuàng)建一個ApacheLogEvent類
         * ?使用map操作執(zhí)行轉換
         * ?打印測試
         */

        //TODO ?獲取ExecutionEnvironment運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO ?使用readTextFile讀取數(shù)據(jù)構建數(shù)據(jù)源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache2.log");

        //TODO ?創(chuàng)建一個ApacheLogEvent類
        //TODO ?使用map操作執(zhí)行轉換
        /**
         * String:傳入值類型
         * ApacheEvent:返回值類型
         */
        SingleOutputStreamOperator<ApacheEvent> apacheEventBean = lines.map(new MapFunction<String, ApacheEvent>() {
            @Override
            public ApacheEvent map(String line) throws Exception {
                String[] elements = line.split(" ");
                String ip = elements[0];
                int userId = Integer.parseInt(elements[1]);
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                long timestamp = simpleDateFormat.parse(elements[2]).getTime();
                String method = elements[3];
                String path = elements[4];
                return new ApacheEvent(ip, userId, timestamp, method, path);
            }
        });

        //TODO ?打印測試
        apacheEventBean.print();

        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class ApacheEvent{
        String ip;      // 訪問ip
        int userId;     // 用戶id
        long timestamp; // 訪問時間戳
        String method;  // 訪問方法
        String path;    // 訪問路徑
    }
}
# 打印數(shù)據(jù)
2> MapDemo.ApacheEvent(ip=10.0.0.1, userId=10003, timestamp=1431829613000, method=POST, path=/presentations/logstash-monitorama-2013/css/print/paper.css)

總結:

  • 1- env.readTextFile 返回的類型是:DataStream,而不是 DataStreamSource 類型,不然會報錯,這里用 var 快捷鍵需要注意!
  • 2- 重寫 map 方法,切割后列表形式,以腳標形式取值
  • 3- Intger.parseInt 可以將字符串轉化為整數(shù)類型
  • 4- new 一個 SimpleDateFormat()進行日期格式化處理
  • 5- simpleDateFormat.parse(字符串).getTime() 可以獲取指定格式的日期
2.2 FlatMap

將 DataStream 中的每一個元素轉化為 0……n 個元素,類似于 wordcount 案例中以空格切割單詞

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

實例:讀取 flatmap.log 文件中的數(shù)據(jù)

將數(shù)據(jù):
張三,蘋果手機,聯(lián)想電腦,華為平板
李四,華為手機,蘋果電腦,小米平板

轉化為:
張三有蘋果手機
張三有聯(lián)想電腦
張三有華為平板
李四有…
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lql
 * @time 2024-02-14 21:04:09
 * @description TODO:讀取 flatmap.log文件中的數(shù)據(jù),以上數(shù)據(jù)為一條轉換為三條
 */
public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 開發(fā)步驟:
         * ?構建批處理運行環(huán)境
         * ?構建本地集合數(shù)據(jù)源
         * ?使用flatMap將一條數(shù)據(jù)經(jīng)過處理轉換為三條數(shù)據(jù)
         * ?使用逗號分隔字段
         * ?分別構建三條數(shù)據(jù)
         * ?打印輸出
         */
        // TODO 1: 構建 flink 流處理環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2: 獲取本地數(shù)據(jù)源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\flatmap.log");

        // TODO 3: 使用flatMap將一條數(shù)據(jù)經(jīng)過處理轉換為三條數(shù)據(jù)
        SingleOutputStreamOperator<String> result = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] elements = line.split(",");
                collector.collect(elements[0] + "有" + elements[1]);
                collector.collect(elements[0] + "有" + elements[2]);
                collector.collect(elements[0] + "有" + elements[3]);
            }
        });
        result.print();
        env.execute();
    }
}

結果:

8> 李四有華為手機
8> 李四有蘋果電腦
8> 李四有小米平板
5> 張三有蘋果手機
5> 張三有聯(lián)想電腦
5> 張三有華為平板

總結:collect 可以多行書寫

2.3 Filter

過濾出來符合條件的元素

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

實例:讀取 apache.log 文件中的訪問日志數(shù)據(jù),過濾出來以下訪問IP是 83.149.9.216 的訪問日志。

83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:20:30
 * @description TODO:讀取apache.log文件中的訪問日志數(shù)據(jù),過濾出來以下訪問IP是83.149.9.216的訪問日志。
 */
public class FilterDemo {
    public static void main(String[] args) throws Exception {
        /**
         * ?獲取ExecutionEnvironment運行環(huán)境
         * ?使用fromCollection構建數(shù)據(jù)源
         * ?使用filter操作執(zhí)行過濾
         * 打印測試
         */
        //TODO ?獲取ExecutionEnvironment運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO ?使用fromCollection構建數(shù)據(jù)源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");

        //TODO ?使用filter操作執(zhí)行過濾(66.249.73.135)
        SingleOutputStreamOperator<String> result = lines.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                return line.contains("83.149.9.216");
            }
        });
        result.print();
        env.execute();
    }
}

結果:

2> 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
2> 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
2> 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
2> 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
2> 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
2> 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png

總結:contains 方法可以達到過濾效果

2.4 KeyBy

流處理中沒有 groupBy,而是 keyBy

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

實例:讀取本地數(shù)據(jù)源, 進行單詞的計數(shù)

package cn.itcast.day02.transformation;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:29:52
 * @description TODO:讀取本地元組數(shù)據(jù)源, 進行單詞的計數(shù)
 */
public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1: 初始化 Fink 環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2: 讀取本地數(shù)據(jù)源
        DataStreamSource<Tuple2<String, Integer>> source  = env.fromElements(
                Tuple2.of("籃球", 1),
                Tuple2.of("籃球", 2),
                Tuple2.of("籃球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );

        // 在流計算內,來一條算一條,就是每個組的數(shù)據(jù),挨個進行計算,求和累加,所以結果中最后一個打印的數(shù)據(jù)才是最終的求和結果
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(t -> t.f0).sum(1);

        // 如果不分組的話, sum的結果是 1+2+3+3+2+3 = 14 分組后是 籃球 6  足球 8
        sum.print();
        env.execute();
    }
}

結果:

4> (足球,3)
4> (足球,5)
4> (足球,8)
5> (籃球,1)
5> (籃球,3)
5> (籃球,6)

總結:

  • 1- keyBy 是流式分組
  • 2- keyBy () 可以填寫 t -> f0, 也可以直接填 0
2.5 Reduce

可以對一個 dataset 或者一個 group 來進行聚合計算,最終聚合成一個元素

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

實例:讀取 apache.log 日志,統(tǒng)計ip地址訪問pv數(shù)量,使用 reduce 操作聚合成一個最終結果

83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:43:10
 * @description TODO: 讀取apache.log日志,統(tǒng)計ip地址訪問pv數(shù)量,使用 reduce 操作聚合成一個最終結果
 */
public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        /**
         * ?獲取 ExecutionEnvironment 運行環(huán)境
         * ?使用 readTextFile 構建數(shù)據(jù)源
         * ?使用 reduce 執(zhí)行聚合操作
         * ?打印測試
         */
        //TODO ?獲取 ExecutionEnvironment 運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO ?使用 readTextFile 構建數(shù)據(jù)源
        DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");

        //TODO ?使用 reduce 執(zhí)行聚合操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> ipAndOne  = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] dataArray = line.split(" ");
                return Tuple2.of(dataArray[0], 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = ipAndOne.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple1, Tuple2<String, Integer> tuple2) throws Exception {
                return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1);
            }
        });

        result.print();
        env.execute();
    }
}

結果:

3> (74.218.234.48,3)
3> (74.218.234.48,4)
3> (74.218.234.48,5)
3> (74.218.234.48,6)

總結:

  • 1- reduce 類似于 sum 操作
  • 2- 重寫方法注意返回值寫法:return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1)
2.6 minBy 和 maxBy

獲取指定字段的最大值、最小值

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫
flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

2.6.1 場景一:

實例:Tuple2 情況

package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:57:18
 * @description TODO:分組后,求組內最值
 */
public class MinMaxByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] fields = line.split(",");
                String word = fields[0];
                int count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyd = wordAndCount.keyBy(t -> t.f0);
        keyd.minBy(1).print("最小數(shù)據(jù)>>>");
        keyd.maxBy(1).print("最大數(shù)據(jù)>>>");
        env.execute();
    }
}

結果:

最大數(shù)據(jù)>>>:1> (spark,2)
最小數(shù)據(jù)>>>:1> (spark,2)
最小數(shù)據(jù)>>>:1> (spark,2)
最大數(shù)據(jù)>>>:1> (spark,5)
最大數(shù)據(jù)>>>:8> (hadoop,7)
最大數(shù)據(jù)>>>:8> (hadoop,7)
最小數(shù)據(jù)>>>:8> (hadoop,3)
最小數(shù)據(jù)>>>:8> (hadoop,3)
2.6.2 場景二

實例:Tuple3 情況

package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 21:57:18
 * @description TODO:分組后,求組內最值
 */
public class MinMaxByDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //調用Source創(chuàng)建DataStream
        //遼寧,沈陽,1000
        //北京,朝陽,8000
        //遼寧,朝陽,1000
        //遼寧,朝陽,1000
        //遼寧,沈陽,2000
        //北京,朝陽,1000
        //遼寧,大連,3000
        //遼寧,鐵嶺,500
        DataStream<String> lines = env.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Tuple3<String, String, Double>> pcm = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {

            @Override
            public Tuple3<String, String, Double> map(String value) throws Exception {
                String[] fields = value.split(",");
                String province = fields[0];
                String city = fields[1];
                double money = Double.parseDouble(fields[2]);
                return Tuple3.of(province, city, money);
            }
        });

        KeyedStream<Tuple3<String, String, Double>, String> keyed = pcm.keyBy(t -> t.f0);

        // considerTimestamps 設置為 false,則 Flink 在比較時不會考慮元素的時間戳,而只會根據(jù)指定的字段
        SingleOutputStreamOperator<Tuple3<String, String, Double>> res = keyed.minBy(2, false);

        res.print();
        env.execute();
    }
}

結果:

5> (遼寧,沈陽,1000.0)
4> (北京,朝陽,8000.0)
5> (遼寧,朝陽,1000.0)
5> (遼寧,朝陽,1000.0)
5> (遼寧,朝陽,1000.0)
4> (北京,朝陽,1000.0)
5> (遼寧,朝陽,1000.0)
5> (遼寧,鐵嶺,500.0)
2.7 min max 和 minBy maxBy 的區(qū)別
package cn.itcast.day02.transformation;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.awt.event.TextEvent;

/**
 * @author lql
 * @time 2024-02-14 22:52:36
 * @description TODO
 */
public class MinVSMinByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple3<Integer, Integer, Integer>> source = env.fromElements(
                Tuple3.of(1, 3, 2),
                Tuple3.of(1, 1, 2),
                Tuple3.of(1, 2, 3),
                Tuple3.of(1, 111, 1),
                Tuple3.of(1, 1, 1),
                Tuple3.of(1, 2, 0),
                Tuple3.of(1, 33, 2)
        );
        source.keyBy(t -> t.f0).min(2).print("min>>>");
        source.keyBy(t->t.f0).minBy(2).printToErr("minBy>>>");

        env.execute();
    }
}

結果:

minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,2,0)
minBy>>>:6> (1,2,0)

min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,1)
min>>>:6> (1,3,1)
min>>>:6> (1,3,0)
min>>>:6> (1,3,0)

總結:

  • 1- minBy 和 maxBy 會返回整個對象數(shù)據(jù)(包括最小值所在的前綴
  • 2- min 和 max 只會返回最小值以及第一次最小值的前綴
2.8 Union

將多個DataSet合并成一個DataSet,union合并的DataSet的類型必須是一致的

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

package cn.itcast.day02.transformation;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 23:06:20
 * @description TODO:
 *  * 使用union實現(xiàn)
 *  * 將以下數(shù)據(jù)進行取并集操作
 *  * 數(shù)據(jù)集1
 *  * "hadoop", "hive","flume"
 *  * 數(shù)據(jù)集2
 *  * "hadoop","hive","spark"
 *  *
 *  * 注意:
 *  * 1:合并后的數(shù)據(jù)不會自動去重
 *  * 2:要求數(shù)據(jù)類型必須一致
 *  */

public class UnionDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 實現(xiàn)步驟:
         * 1)初始化flink的流處理的運行環(huán)境
         * 2)加載/創(chuàng)建數(shù)據(jù)源
         * 3)處理數(shù)據(jù)
         * 4)打印輸出
         * 5)遞交執(zhí)行作業(yè)
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds1 = env.fromElements("hadoop", "hive", "flume");
        DataStreamSource<String> ds2 = env.fromElements("hadoop","hive","spark");
        DataStream<String> result = ds1.union(ds2);
        result.printToErr();
        env.execute();
    }
}

結果:

2> hive
6> flume
3> spark
1> hadoop
4> hadoop
5> hive

總結:

  • 1- Uinon 合并 dataset, 數(shù)據(jù)集類型必須一致
  • 2- Union 合并不會去除
  • 3- Union 合并出來的數(shù)據(jù)集是亂序的
2.9 Connect

DataStream,DataStream → ConnectedStreams,流相互獨立, 作為對比Union后是真的變成一個流了

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

package cn.itcast.day02.transformation;

/**
 * @author lql
 * @time 2024-02-14 23:10:14
 * @description TODO
 */

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.concurrent.TimeUnit;

/**
 * 讀取兩個數(shù)據(jù)流(生成兩個不同類型的數(shù)據(jù)流),使用connect進行合并輸出
 * 和union類似,但是connect只能連接兩個流,兩個流之間的數(shù)據(jù)類型可以不同,對兩個流的數(shù)據(jù)可以分別應用不同的處理邏輯
 */
public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 實現(xiàn)步驟:
         * 1)初始化flink流處理的運行環(huán)境
         * 2)構建兩個不同類型數(shù)據(jù)的數(shù)據(jù)流
         * 3)對連接后的流數(shù)據(jù)進行業(yè)務處理
         * 4)打印輸出
         * 5)啟動作業(yè)
         */

        //TODO 1)初始化flink流處理的運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2)構建兩個不同類型數(shù)據(jù)的數(shù)據(jù)流
        DataStream<Long> longDataStreamSource = env.addSource(new MyNoParallelSource());
        DataStream<Long> longDataStreamSource2 = env.addSource(new MyNoParallelSource());

        //TODO 3)對連接后的流數(shù)據(jù)進行業(yè)務處理
        SingleOutputStreamOperator<String> strDataStreamSource = longDataStreamSource2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long aLong) throws Exception {
                return "str_" + aLong;
            }
        });

        ConnectedStreams<Long, String> connectedStreams = longDataStreamSource.connect(strDataStreamSource);
        //對連接后的流應用不同的業(yè)務邏輯
        SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Long, String, Object>() {
            @Override
            public Object map1(Long value) throws Exception {
                return value;
            }

            @Override
            public Object map2(String value) throws Exception {
                return value;
            }
        });

        //TODO 4)打印輸出
        result.print();

        //TODO 5)啟動作業(yè)
        env.execute();
    }

    public static class MyNoParallelSource implements SourceFunction<Long> {
        //定義一個變量,是否循環(huán)生成數(shù)據(jù)
        private boolean isRunning = true;
        private Long count = 0L;

        /**
         * 這是主要的方法,啟動一個數(shù)據(jù)源
         * 實現(xiàn)數(shù)據(jù)的生成操作
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
            //不斷生成訂單數(shù)據(jù)
            while (isRunning){
                count+=1;
                //收集數(shù)據(jù)返回
                ctx.collect(count);

                //每隔一秒鐘生成一條訂單數(shù)據(jù)
                TimeUnit.SECONDS.sleep(1);
            }
        }

        /**
         * 取消數(shù)據(jù)的生成操作
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

結果:

3> 1
5> str_1
4> 2
6> str_2
5> 3
7> str_3

總結:

  • Connect 兩個流可以類型不一樣
2.10 split、select 和 Side Outputs

flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作,Flink重溫筆記,flink,筆記,大數(shù)據(jù),學習方法,數(shù)據(jù)倉庫

Split 就是將一個 DataStream 分成兩個或者多個 DataStream

Select 就是獲取分流后對應的數(shù)據(jù)

Tips:

  • 簡單認為就是, Split會給數(shù)據(jù)打上標記,然后通過Select, 選擇標記來劃分出不同的Stream,效果類似KeyBy分流,但是比KeyBy更自由些,可以自由打標記并進行分流。
  • Side Outputs:split 過期啦,可以使用process方法對流中數(shù)據(jù)進行處理,并針對不同的處理結果將數(shù)據(jù)收集到不同的OutputTag中
package cn.itcast.day02.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author lql
 * @time 2024-02-14 23:25:38
 * @description TODO
 */
public class StreamSplitDemo {
    public static void main(String[] args) throws Exception {

        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 當設置為 AUTOMATIC 時,F(xiàn)link 會自動選擇最佳的并行度來執(zhí)行作業(yè)。

        //TODO 1.source
        DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        //TODO 2.transformation
        //需求:對流中的數(shù)據(jù)按照奇數(shù)和偶數(shù)拆分并選擇
        OutputTag<Integer> oddTag = new OutputTag<>("奇數(shù)", TypeInformation.of(Integer.class));
        OutputTag<Integer> evenTag = new OutputTag("偶數(shù)", TypeInformation.of(Integer.class));

        SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                //out收集完的還是放在一起的,ctx可以將數(shù)據(jù)放到不同的OutputTag
                if (value % 2 == 0) {
                    ctx.output(evenTag, value);
                } else {
                    ctx.output(oddTag, value);
                }

            }
        });

        DataStream<Integer> oddResult = result.getSideOutput(oddTag);
        DataStream<Integer> evenResult = result.getSideOutput(evenTag);

        //TODO 3.sink
        System.out.println(oddTag);//OutputTag(Integer, 奇數(shù))
        System.out.println(evenTag);//OutputTag(Integer, 偶數(shù))
        oddResult.print("奇數(shù):");
        evenResult.print("偶數(shù):");

        //TODO 4.execute
        env.execute();
    }
}

結果:

OutputTag(Integer, 奇數(shù))
OutputTag(Integer, 偶數(shù))
奇數(shù)::3> 1
偶數(shù)::8> 6
偶數(shù)::6> 4
偶數(shù)::4> 2
奇數(shù)::5> 3
奇數(shù)::1> 7
奇數(shù)::7> 5
偶數(shù)::2> 8
偶數(shù)::4> 10
奇數(shù)::3> 9

總結:

  • 1- OutputTag 對象用于定義輸出類型
  • 2- process 可以分流
  • 3- 引流數(shù)據(jù)使用:getSideOutput 方法
2.11 Iterate

在流中創(chuàng)建“反饋(feedback)”循環(huán),通過將一個算子的輸出重定向到某個先前的算子。

迭代的數(shù)據(jù)流向:DataStream → IterativeStream → DataStream

package cn.itcast.day02.transformation;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-14 23:34:23
 * @description TODO:Iterate迭代流式計算
 */
public class IterateDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //10
        DataStreamSource<String> strs = env.socketTextStream("node1", 9999);

        DataStream<Long> numbers = strs.map(Long::parseLong);

        //調用iterate方法 DataStream -> IterativeStream
        //對Nums進行迭代(不停的輸入int的數(shù)字)
        IterativeStream<Long> iteration = numbers.iterate();

        //IterativeStream -> DataStream
        //對迭代出來的數(shù)據(jù)進行運算 //對輸入的數(shù)據(jù)應用更新模型,即輸入數(shù)據(jù)的處理邏輯
        DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("iterate input =>" + value);
                return value -= 2;
            }
        });
        //只要滿足value > 0的條件,就會形成一個回路,重新的迭代,即將前面的輸出作為輸入,在進行一次應用更新模型,即輸入數(shù)據(jù)的處理邏輯
        DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value > 0;
            }
        });
        //傳入迭代的條件
        iteration.closeWith(feedback);

        //不滿足迭代條件的最后要輸出
        DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value <= 0;
            }
        });

        //數(shù)據(jù)結果
        output.printToErr("output value:");
        env.execute();
    }

}

結果:

iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2

總結:

  • 1- 更新模型,更新參數(shù)較為常見
  • 2- 算子迭代,需要理解應用

ction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**

  • @author lql

  • @time 2024-02-14 23:34:23

  • @description TODO:Iterate迭代流式計算
    */
    public class IterateDemo {
    public static void main(String[] args) throws Exception {

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
     //10
     DataStreamSource<String> strs = env.socketTextStream("node1", 9999);
    
     DataStream<Long> numbers = strs.map(Long::parseLong);
    
     //調用iterate方法 DataStream -> IterativeStream
     //對Nums進行迭代(不停的輸入int的數(shù)字)
     IterativeStream<Long> iteration = numbers.iterate();
    
     //IterativeStream -> DataStream
     //對迭代出來的數(shù)據(jù)進行運算 //對輸入的數(shù)據(jù)應用更新模型,即輸入數(shù)據(jù)的處理邏輯
     DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
         @Override
         public Long map(Long value) throws Exception {
             System.out.println("iterate input =>" + value);
             return value -= 2;
         }
     });
     //只要滿足value > 0的條件,就會形成一個回路,重新的迭代,即將前面的輸出作為輸入,在進行一次應用更新模型,即輸入數(shù)據(jù)的處理邏輯
     DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
         @Override
         public boolean filter(Long value) throws Exception {
             return value > 0;
         }
     });
     //傳入迭代的條件
     iteration.closeWith(feedback);
    
     //不滿足迭代條件的最后要輸出
     DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
         @Override
         public boolean filter(Long value) throws Exception {
             return value <= 0;
         }
     });
    
     //數(shù)據(jù)結果
     output.printToErr("output value:");
     env.execute();
    

    }

}


結果:

```java
iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2

總結:文章來源地址http://www.zghlxwxcb.cn/news/detail-827393.html

  • 1- 更新模型,更新參數(shù)較為常見
  • 2- 算子迭代,需要理解應用

到了這里,關于flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Flink流批一體計算(16):PyFlink DataStream API

    Flink流批一體計算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構建健壯的、有狀態(tài)的流式應用程序。它提供了對狀態(tài)和時間細粒度控制,從而允許實現(xiàn)高級事件驅動系統(tǒng)。 用戶實現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計算(10):PyFlink Tabel API

    簡述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它構建可擴展的批處理和流處理任務,例如實時數(shù)據(jù)處理管道、大規(guī)模探索性數(shù)據(jù)分析、機器學習( ML )管道和 ETL 處理。 如果你對 Python 和 Pandas 等庫已經(jīng)比較熟悉,那么 PyFlink 可以讓你更輕松地利用 Flink 生態(tài)系統(tǒng)的全部功

    2024年02月11日
    瀏覽(27)
  • Flink流批一體計算(20):DataStream API和Table API互轉

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴展了TableEnvironment。 下面代碼演示兩種API如何互轉 TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個 Flink Python DataSt

    2024年02月11日
    瀏覽(55)
  • Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment

    目錄 概述 設置重啟策略 什么是flink的重啟策略(Restartstrategy) flink的重啟策略(Restartstrategy)實戰(zhàn) flink的4種重啟策略 FixedDelayRestartstrategy(固定延時重啟策略) FailureRateRestartstrategy(故障率重啟策略) NoRestartstrategy(不重啟策略) 配置State Backends 以及 Checkpointing Checkpoint 啟用和配置

    2024年02月13日
    瀏覽(47)
  • Flink流批一體計算(12):PyFlink Tabel API之構建作業(yè)

    目錄 1.創(chuàng)建源表和結果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通過 DDL 語句來注冊源表和結果表 2. 創(chuàng)建一個作業(yè) 3. 提交作業(yè)Submitting PyFlink Jobs 1.創(chuàng)建源表和結果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    瀏覽(21)
  • Flink流批一體計算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實現(xiàn)了一個簡單的計數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時進行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計算(14):PyFlink Tabel API之SQL查詢

    舉個例子 查詢 source 表,同時執(zhí)行計算 Table API 查詢 Table 對象有許多方法,可以用于進行關系操作。 這些方法返回新的 Table 對象,表示對輸入 Table 應用關系操作之后的結果。 這些關系操作可以由多個方法調用組成,例如 table.group_by(...).select(...)。 Table API 文檔描述了流和批

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊成臨時表 ·執(zhí)行 SQL 查詢 ·注冊用戶自定義的 (標量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    目錄 1. 在上節(jié)數(shù)據(jù)流上執(zhí)行轉換操作,或者使用 sink 將數(shù)據(jù)寫入外部系統(tǒng)。 2. File Sink File Sink Format Types? Row-encoded Formats? Bulk-encoded Formats? 桶分配 滾動策略 3. 如何輸出結果 Print 集合數(shù)據(jù)到客戶端,execute_and_collect方法將收集數(shù)據(jù)到客戶端內存 將結果發(fā)送到DataStream sink conne

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包