Flink學習筆記
前言:今天是學習 flink 第三天啦,學習了高級 api 開發(fā)中11 中重要算子,查找了好多資料理解其中的原理,以及敲了好幾個小時代碼抓緊理解原理。
Tips:雖然學習進度有點慢,希望自己繼續(xù)努力,不斷猜想 api 原理,通過敲代碼不斷印證自己的想法,轉碼大數(shù)據(jù)之路一定會越來越好的!
二、Flink 流批一體 API 開發(fā)
2. Transfromation
2.1 Map
將 DataStream 中的每一個元素轉化為另一個元素,類似于之前 wordcount 案例中 word—> (word,1)
案例:使用map操作,讀取 apache.log 文件中的字符串數(shù)據(jù)轉換成 ApacheLogEvent 對象
# 日志數(shù)據(jù)
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.text.SimpleDateFormat;
/**
* @author lql
* @time 2024-02-13 19:44:52
* @description TODO:使用map操作,讀取apache.log文件中的字符串數(shù)據(jù)轉換成ApacheLogEvent對象
*/
public class MapDemo {
public static void main(String[] args) throws Exception {
/**
* ?獲取ExecutionEnvironment運行環(huán)境
* ?使用readTextFile讀取數(shù)據(jù)構建數(shù)據(jù)源
* ?創(chuàng)建一個ApacheLogEvent類
* ?使用map操作執(zhí)行轉換
* ?打印測試
*/
//TODO ?獲取ExecutionEnvironment運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO ?使用readTextFile讀取數(shù)據(jù)構建數(shù)據(jù)源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache2.log");
//TODO ?創(chuàng)建一個ApacheLogEvent類
//TODO ?使用map操作執(zhí)行轉換
/**
* String:傳入值類型
* ApacheEvent:返回值類型
*/
SingleOutputStreamOperator<ApacheEvent> apacheEventBean = lines.map(new MapFunction<String, ApacheEvent>() {
@Override
public ApacheEvent map(String line) throws Exception {
String[] elements = line.split(" ");
String ip = elements[0];
int userId = Integer.parseInt(elements[1]);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long timestamp = simpleDateFormat.parse(elements[2]).getTime();
String method = elements[3];
String path = elements[4];
return new ApacheEvent(ip, userId, timestamp, method, path);
}
});
//TODO ?打印測試
apacheEventBean.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class ApacheEvent{
String ip; // 訪問ip
int userId; // 用戶id
long timestamp; // 訪問時間戳
String method; // 訪問方法
String path; // 訪問路徑
}
}
# 打印數(shù)據(jù)
2> MapDemo.ApacheEvent(ip=10.0.0.1, userId=10003, timestamp=1431829613000, method=POST, path=/presentations/logstash-monitorama-2013/css/print/paper.css)
總結:
- 1- env.readTextFile 返回的類型是:DataStream,而不是 DataStreamSource 類型,不然會報錯,這里用 var 快捷鍵需要注意!
- 2- 重寫 map 方法,切割后列表形式,以腳標形式取值
- 3- Intger.parseInt 可以將字符串轉化為整數(shù)類型
- 4- new 一個 SimpleDateFormat()進行日期格式化處理
- 5- simpleDateFormat.parse(字符串).getTime() 可以獲取指定格式的日期
2.2 FlatMap
將 DataStream 中的每一個元素轉化為 0……n 個元素,類似于 wordcount 案例中以空格切割單詞
實例:讀取 flatmap.log 文件中的數(shù)據(jù)
將數(shù)據(jù):
張三,蘋果手機,聯(lián)想電腦,華為平板
李四,華為手機,蘋果電腦,小米平板
轉化為:
張三有蘋果手機
張三有聯(lián)想電腦
張三有華為平板
李四有…
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lql
* @time 2024-02-14 21:04:09
* @description TODO:讀取 flatmap.log文件中的數(shù)據(jù),以上數(shù)據(jù)為一條轉換為三條
*/
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
/**
* 開發(fā)步驟:
* ?構建批處理運行環(huán)境
* ?構建本地集合數(shù)據(jù)源
* ?使用flatMap將一條數(shù)據(jù)經(jīng)過處理轉換為三條數(shù)據(jù)
* ?使用逗號分隔字段
* ?分別構建三條數(shù)據(jù)
* ?打印輸出
*/
// TODO 1: 構建 flink 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2: 獲取本地數(shù)據(jù)源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\flatmap.log");
// TODO 3: 使用flatMap將一條數(shù)據(jù)經(jīng)過處理轉換為三條數(shù)據(jù)
SingleOutputStreamOperator<String> result = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] elements = line.split(",");
collector.collect(elements[0] + "有" + elements[1]);
collector.collect(elements[0] + "有" + elements[2]);
collector.collect(elements[0] + "有" + elements[3]);
}
});
result.print();
env.execute();
}
}
結果:
8> 李四有華為手機
8> 李四有蘋果電腦
8> 李四有小米平板
5> 張三有蘋果手機
5> 張三有聯(lián)想電腦
5> 張三有華為平板
總結:collect 可以多行書寫
2.3 Filter
過濾出來符合條件的元素
實例:讀取 apache.log 文件中的訪問日志數(shù)據(jù),過濾出來以下訪問IP是 83.149.9.216 的訪問日志。
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:20:30
* @description TODO:讀取apache.log文件中的訪問日志數(shù)據(jù),過濾出來以下訪問IP是83.149.9.216的訪問日志。
*/
public class FilterDemo {
public static void main(String[] args) throws Exception {
/**
* ?獲取ExecutionEnvironment運行環(huán)境
* ?使用fromCollection構建數(shù)據(jù)源
* ?使用filter操作執(zhí)行過濾
* 打印測試
*/
//TODO ?獲取ExecutionEnvironment運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO ?使用fromCollection構建數(shù)據(jù)源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");
//TODO ?使用filter操作執(zhí)行過濾(66.249.73.135)
SingleOutputStreamOperator<String> result = lines.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return line.contains("83.149.9.216");
}
});
result.print();
env.execute();
}
}
結果:
2> 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
2> 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
2> 83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
2> 83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
2> 83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
2> 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
總結:contains 方法可以達到過濾效果
2.4 KeyBy
流處理中沒有 groupBy,而是 keyBy
實例:讀取本地數(shù)據(jù)源, 進行單詞的計數(shù)
package cn.itcast.day02.transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:29:52
* @description TODO:讀取本地元組數(shù)據(jù)源, 進行單詞的計數(shù)
*/
public class KeyByDemo {
public static void main(String[] args) throws Exception {
// TODO 1: 初始化 Fink 環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2: 讀取本地數(shù)據(jù)源
DataStreamSource<Tuple2<String, Integer>> source = env.fromElements(
Tuple2.of("籃球", 1),
Tuple2.of("籃球", 2),
Tuple2.of("籃球", 3),
Tuple2.of("足球", 3),
Tuple2.of("足球", 2),
Tuple2.of("足球", 3)
);
// 在流計算內,來一條算一條,就是每個組的數(shù)據(jù),挨個進行計算,求和累加,所以結果中最后一個打印的數(shù)據(jù)才是最終的求和結果
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(t -> t.f0).sum(1);
// 如果不分組的話, sum的結果是 1+2+3+3+2+3 = 14 分組后是 籃球 6 足球 8
sum.print();
env.execute();
}
}
結果:
4> (足球,3)
4> (足球,5)
4> (足球,8)
5> (籃球,1)
5> (籃球,3)
5> (籃球,6)
總結:
- 1- keyBy 是流式分組
- 2- keyBy () 可以填寫 t -> f0, 也可以直接填 0
2.5 Reduce
可以對一個 dataset 或者一個 group 來進行聚合計算,最終聚合成一個元素
實例:讀取 apache.log 日志,統(tǒng)計ip地址訪問pv數(shù)量,使用 reduce 操作聚合成一個最終結果
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:43:10
* @description TODO: 讀取apache.log日志,統(tǒng)計ip地址訪問pv數(shù)量,使用 reduce 操作聚合成一個最終結果
*/
public class ReduceDemo {
public static void main(String[] args) throws Exception {
/**
* ?獲取 ExecutionEnvironment 運行環(huán)境
* ?使用 readTextFile 構建數(shù)據(jù)源
* ?使用 reduce 執(zhí)行聚合操作
* ?打印測試
*/
//TODO ?獲取 ExecutionEnvironment 運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO ?使用 readTextFile 構建數(shù)據(jù)源
DataStream<String> lines = env.readTextFile("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\apache.log");
//TODO ?使用 reduce 執(zhí)行聚合操作
SingleOutputStreamOperator<Tuple2<String, Integer>> ipAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] dataArray = line.split(" ");
return Tuple2.of(dataArray[0], 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = ipAndOne.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple1, Tuple2<String, Integer> tuple2) throws Exception {
return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1);
}
});
result.print();
env.execute();
}
}
結果:
3> (74.218.234.48,3)
3> (74.218.234.48,4)
3> (74.218.234.48,5)
3> (74.218.234.48,6)
總結:
- 1- reduce 類似于 sum 操作
- 2- 重寫方法注意返回值寫法:return Tuple2.of(tuple1.f0, tuple1.f1 + tuple2.f1)
2.6 minBy 和 maxBy
獲取指定字段的最大值、最小值
2.6.1 場景一:
實例:Tuple2 情況
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:57:18
* @description TODO:分組后,求組內最值
*/
public class MinMaxByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] fields = line.split(",");
String word = fields[0];
int count = Integer.parseInt(fields[1]);
return Tuple2.of(word, count);
}
});
KeyedStream<Tuple2<String, Integer>, String> keyd = wordAndCount.keyBy(t -> t.f0);
keyd.minBy(1).print("最小數(shù)據(jù)>>>");
keyd.maxBy(1).print("最大數(shù)據(jù)>>>");
env.execute();
}
}
結果:
最大數(shù)據(jù)>>>:1> (spark,2)
最小數(shù)據(jù)>>>:1> (spark,2)
最小數(shù)據(jù)>>>:1> (spark,2)
最大數(shù)據(jù)>>>:1> (spark,5)
最大數(shù)據(jù)>>>:8> (hadoop,7)
最大數(shù)據(jù)>>>:8> (hadoop,7)
最小數(shù)據(jù)>>>:8> (hadoop,3)
最小數(shù)據(jù)>>>:8> (hadoop,3)
2.6.2 場景二
實例:Tuple3 情況
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 21:57:18
* @description TODO:分組后,求組內最值
*/
public class MinMaxByDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//調用Source創(chuàng)建DataStream
//遼寧,沈陽,1000
//北京,朝陽,8000
//遼寧,朝陽,1000
//遼寧,朝陽,1000
//遼寧,沈陽,2000
//北京,朝陽,1000
//遼寧,大連,3000
//遼寧,鐵嶺,500
DataStream<String> lines = env.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple3<String, String, Double>> pcm = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String value) throws Exception {
String[] fields = value.split(",");
String province = fields[0];
String city = fields[1];
double money = Double.parseDouble(fields[2]);
return Tuple3.of(province, city, money);
}
});
KeyedStream<Tuple3<String, String, Double>, String> keyed = pcm.keyBy(t -> t.f0);
// considerTimestamps 設置為 false,則 Flink 在比較時不會考慮元素的時間戳,而只會根據(jù)指定的字段
SingleOutputStreamOperator<Tuple3<String, String, Double>> res = keyed.minBy(2, false);
res.print();
env.execute();
}
}
結果:
5> (遼寧,沈陽,1000.0)
4> (北京,朝陽,8000.0)
5> (遼寧,朝陽,1000.0)
5> (遼寧,朝陽,1000.0)
5> (遼寧,朝陽,1000.0)
4> (北京,朝陽,1000.0)
5> (遼寧,朝陽,1000.0)
5> (遼寧,鐵嶺,500.0)
2.7 min max 和 minBy maxBy 的區(qū)別
package cn.itcast.day02.transformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.awt.event.TextEvent;
/**
* @author lql
* @time 2024-02-14 22:52:36
* @description TODO
*/
public class MinVSMinByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<Integer, Integer, Integer>> source = env.fromElements(
Tuple3.of(1, 3, 2),
Tuple3.of(1, 1, 2),
Tuple3.of(1, 2, 3),
Tuple3.of(1, 111, 1),
Tuple3.of(1, 1, 1),
Tuple3.of(1, 2, 0),
Tuple3.of(1, 33, 2)
);
source.keyBy(t -> t.f0).min(2).print("min>>>");
source.keyBy(t->t.f0).minBy(2).printToErr("minBy>>>");
env.execute();
}
}
結果:
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,3,2)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,111,1)
minBy>>>:6> (1,2,0)
minBy>>>:6> (1,2,0)
min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,2)
min>>>:6> (1,3,1)
min>>>:6> (1,3,1)
min>>>:6> (1,3,0)
min>>>:6> (1,3,0)
總結:
- 1- minBy 和 maxBy 會返回整個對象數(shù)據(jù)(包括最小值所在的前綴)
- 2- min 和 max 只會返回最小值以及第一次最小值的前綴
2.8 Union
將多個DataSet合并成一個DataSet,union合并的DataSet的類型必須是一致的
package cn.itcast.day02.transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 23:06:20
* @description TODO:
* * 使用union實現(xiàn)
* * 將以下數(shù)據(jù)進行取并集操作
* * 數(shù)據(jù)集1
* * "hadoop", "hive","flume"
* * 數(shù)據(jù)集2
* * "hadoop","hive","spark"
* *
* * 注意:
* * 1:合并后的數(shù)據(jù)不會自動去重
* * 2:要求數(shù)據(jù)類型必須一致
* */
public class UnionDemo {
public static void main(String[] args) throws Exception {
/**
* 實現(xiàn)步驟:
* 1)初始化flink的流處理的運行環(huán)境
* 2)加載/創(chuàng)建數(shù)據(jù)源
* 3)處理數(shù)據(jù)
* 4)打印輸出
* 5)遞交執(zhí)行作業(yè)
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds1 = env.fromElements("hadoop", "hive", "flume");
DataStreamSource<String> ds2 = env.fromElements("hadoop","hive","spark");
DataStream<String> result = ds1.union(ds2);
result.printToErr();
env.execute();
}
}
結果:
2> hive
6> flume
3> spark
1> hadoop
4> hadoop
5> hive
總結:
- 1- Uinon 合并 dataset, 數(shù)據(jù)集類型必須一致
- 2- Union 合并不會去除
- 3- Union 合并出來的數(shù)據(jù)集是亂序的
2.9 Connect
DataStream,DataStream → ConnectedStreams,流相互獨立, 作為對比Union后是真的變成一個流了
package cn.itcast.day02.transformation;
/**
* @author lql
* @time 2024-02-14 23:10:14
* @description TODO
*/
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.concurrent.TimeUnit;
/**
* 讀取兩個數(shù)據(jù)流(生成兩個不同類型的數(shù)據(jù)流),使用connect進行合并輸出
* 和union類似,但是connect只能連接兩個流,兩個流之間的數(shù)據(jù)類型可以不同,對兩個流的數(shù)據(jù)可以分別應用不同的處理邏輯
*/
public class ConnectDemo {
public static void main(String[] args) throws Exception {
/**
* 實現(xiàn)步驟:
* 1)初始化flink流處理的運行環(huán)境
* 2)構建兩個不同類型數(shù)據(jù)的數(shù)據(jù)流
* 3)對連接后的流數(shù)據(jù)進行業(yè)務處理
* 4)打印輸出
* 5)啟動作業(yè)
*/
//TODO 1)初始化flink流處理的運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2)構建兩個不同類型數(shù)據(jù)的數(shù)據(jù)流
DataStream<Long> longDataStreamSource = env.addSource(new MyNoParallelSource());
DataStream<Long> longDataStreamSource2 = env.addSource(new MyNoParallelSource());
//TODO 3)對連接后的流數(shù)據(jù)進行業(yè)務處理
SingleOutputStreamOperator<String> strDataStreamSource = longDataStreamSource2.map(new MapFunction<Long, String>() {
@Override
public String map(Long aLong) throws Exception {
return "str_" + aLong;
}
});
ConnectedStreams<Long, String> connectedStreams = longDataStreamSource.connect(strDataStreamSource);
//對連接后的流應用不同的業(yè)務邏輯
SingleOutputStreamOperator<Object> result = connectedStreams.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
return value;
}
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//TODO 4)打印輸出
result.print();
//TODO 5)啟動作業(yè)
env.execute();
}
public static class MyNoParallelSource implements SourceFunction<Long> {
//定義一個變量,是否循環(huán)生成數(shù)據(jù)
private boolean isRunning = true;
private Long count = 0L;
/**
* 這是主要的方法,啟動一個數(shù)據(jù)源
* 實現(xiàn)數(shù)據(jù)的生成操作
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Long> ctx) throws Exception {
//不斷生成訂單數(shù)據(jù)
while (isRunning){
count+=1;
//收集數(shù)據(jù)返回
ctx.collect(count);
//每隔一秒鐘生成一條訂單數(shù)據(jù)
TimeUnit.SECONDS.sleep(1);
}
}
/**
* 取消數(shù)據(jù)的生成操作
*/
@Override
public void cancel() {
isRunning = false;
}
}
}
結果:
3> 1
5> str_1
4> 2
6> str_2
5> 3
7> str_3
總結:
- Connect 兩個流可以類型不一樣
2.10 split、select 和 Side Outputs
Split 就是將一個 DataStream 分成兩個或者多個 DataStream
Select 就是獲取分流后對應的數(shù)據(jù)
Tips:
- 簡單認為就是, Split會給數(shù)據(jù)打上標記,然后通過Select, 選擇標記來劃分出不同的Stream,效果類似KeyBy分流,但是比KeyBy更自由些,可以自由打標記并進行分流。
- Side Outputs:split 過期啦,可以使用process方法對流中數(shù)據(jù)進行處理,并針對不同的處理結果將數(shù)據(jù)收集到不同的OutputTag中
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author lql
* @time 2024-02-14 23:25:38
* @description TODO
*/
public class StreamSplitDemo {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 當設置為 AUTOMATIC 時,F(xiàn)link 會自動選擇最佳的并行度來執(zhí)行作業(yè)。
//TODO 1.source
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//TODO 2.transformation
//需求:對流中的數(shù)據(jù)按照奇數(shù)和偶數(shù)拆分并選擇
OutputTag<Integer> oddTag = new OutputTag<>("奇數(shù)", TypeInformation.of(Integer.class));
OutputTag<Integer> evenTag = new OutputTag("偶數(shù)", TypeInformation.of(Integer.class));
SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
//out收集完的還是放在一起的,ctx可以將數(shù)據(jù)放到不同的OutputTag
if (value % 2 == 0) {
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
DataStream<Integer> oddResult = result.getSideOutput(oddTag);
DataStream<Integer> evenResult = result.getSideOutput(evenTag);
//TODO 3.sink
System.out.println(oddTag);//OutputTag(Integer, 奇數(shù))
System.out.println(evenTag);//OutputTag(Integer, 偶數(shù))
oddResult.print("奇數(shù):");
evenResult.print("偶數(shù):");
//TODO 4.execute
env.execute();
}
}
結果:
OutputTag(Integer, 奇數(shù))
OutputTag(Integer, 偶數(shù))
奇數(shù)::3> 1
偶數(shù)::8> 6
偶數(shù)::6> 4
偶數(shù)::4> 2
奇數(shù)::5> 3
奇數(shù)::1> 7
奇數(shù)::7> 5
偶數(shù)::2> 8
偶數(shù)::4> 10
奇數(shù)::3> 9
總結:
- 1- OutputTag 對象用于定義輸出類型
- 2- process 可以分流
- 3- 引流數(shù)據(jù)使用:getSideOutput 方法
2.11 Iterate
在流中創(chuàng)建“反饋(feedback)”循環(huán),通過將一個算子的輸出重定向到某個先前的算子。
迭代的數(shù)據(jù)流向:DataStream → IterativeStream → DataStream
package cn.itcast.day02.transformation;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-14 23:34:23
* @description TODO:Iterate迭代流式計算
*/
public class IterateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//10
DataStreamSource<String> strs = env.socketTextStream("node1", 9999);
DataStream<Long> numbers = strs.map(Long::parseLong);
//調用iterate方法 DataStream -> IterativeStream
//對Nums進行迭代(不停的輸入int的數(shù)字)
IterativeStream<Long> iteration = numbers.iterate();
//IterativeStream -> DataStream
//對迭代出來的數(shù)據(jù)進行運算 //對輸入的數(shù)據(jù)應用更新模型,即輸入數(shù)據(jù)的處理邏輯
DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("iterate input =>" + value);
return value -= 2;
}
});
//只要滿足value > 0的條件,就會形成一個回路,重新的迭代,即將前面的輸出作為輸入,在進行一次應用更新模型,即輸入數(shù)據(jù)的處理邏輯
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
//傳入迭代的條件
iteration.closeWith(feedback);
//不滿足迭代條件的最后要輸出
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
//數(shù)據(jù)結果
output.printToErr("output value:");
env.execute();
}
}
結果:
iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2
總結:
- 1- 更新模型,更新參數(shù)較為常見
- 2- 算子迭代,需要理解應用
ction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
-
@author lql
-
@time 2024-02-14 23:34:23
-
@description TODO:Iterate迭代流式計算
*/
public class IterateDemo {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //10 DataStreamSource<String> strs = env.socketTextStream("node1", 9999); DataStream<Long> numbers = strs.map(Long::parseLong); //調用iterate方法 DataStream -> IterativeStream //對Nums進行迭代(不停的輸入int的數(shù)字) IterativeStream<Long> iteration = numbers.iterate(); //IterativeStream -> DataStream //對迭代出來的數(shù)據(jù)進行運算 //對輸入的數(shù)據(jù)應用更新模型,即輸入數(shù)據(jù)的處理邏輯 DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("iterate input =>" + value); return value -= 2; } }); //只要滿足value > 0的條件,就會形成一個回路,重新的迭代,即將前面的輸出作為輸入,在進行一次應用更新模型,即輸入數(shù)據(jù)的處理邏輯 DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value > 0; } }); //傳入迭代的條件 iteration.closeWith(feedback); //不滿足迭代條件的最后要輸出 DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value <= 0; } }); //數(shù)據(jù)結果 output.printToErr("output value:"); env.execute();
}文章來源:http://www.zghlxwxcb.cn/news/detail-827393.html
}
結果:
```java
iterate input =>7
iterate input =>5
iterate input =>3
iterate input =>1
output value::2> -1
iterate input =>6
iterate input =>4
output value::3> 0
iterate input =>2
總結:文章來源地址http://www.zghlxwxcb.cn/news/detail-827393.html
- 1- 更新模型,更新參數(shù)較為常見
- 2- 算子迭代,需要理解應用
到了這里,關于flink重溫筆記(三):Flink 流批一體 API 開發(fā)——Transformation 重要算子操作的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!