Flink學習筆記
前言:今天是學習 flink 的第七天啦!學習了 flink 中 sink(數(shù)據(jù)槽) 部分知識點,這一部分只要是解決數(shù)據(jù)處理之后,數(shù)據(jù)到哪里去的問題,我覺得 flink 知識點雖然比較難理解,但是代碼跑通后,邏輯還是比較有趣的!
Tips:毛爺爺說過:“宜將剩勇追窮寇,不可沽名學霸王!”明天周日除了復習前面知識點之外,也要繼續(xù)努力學習接下來的知識點,繼續(xù)加油!
二、Flink 流批一體 API 開發(fā)
4. 數(shù)據(jù)輸出 Sink
4.1 print 打印
打印是最簡單的一個Sink,通常是用來做實驗和測試時使用。
實例:socket 數(shù)據(jù)源,查看進程編號最終輸出 sink 之 print 打印
package cn.itcast.day06.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-17 22:27:48
* @description TODO:print
*/
public class PrintSinkDemo {
public static void main(String[] args) throws Exception {
//local模式默認的并行度是當前機器的邏輯核的數(shù)量
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism0 = env.getParallelism();
System.out.println("執(zhí)行環(huán)境默認的并行度:" + parallelism0);
// socket 數(shù)據(jù)源
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
// 獲取 lines 數(shù)據(jù)源并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism);
lines.print();
lines.addSink(new MyPrintSink()).name("my-print-sink");
env.execute();
}
private static class MyPrintSink extends RichSinkFunction<String> {
// 這一處定義很重要,不然 indexOfThisSubtask 只能在一個方法中使用!
private int indexOfThisSubtask;
@Override
public void open(Configuration parameters) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(indexOfThisSubtask + 1 + "> " + value);
}
}
}
結(jié)果:
執(zhí)行環(huán)境默認的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop
總結(jié):
- 打印輸出,也是一種 sink
4.2 writeAsText 以文本格式輸出
該方法是將數(shù)據(jù)以文本格式實時的寫入到指定的目錄中,本質(zhì)上使用的是 TextOutputFormat 格式寫入的。
實例:socket 數(shù)據(jù)源,將數(shù)據(jù)輸出到文本 Text 中
package cn.itcast.day06.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 22:40:48
* @description TODO:writeAsText
*/
public class WriteSinkDemo {
public static void main(String[] args) throws Exception {
//local模式默認的并行度是當前機器的邏輯核的數(shù)量
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism0 = env.getParallelism();
System.out.println("執(zhí)行環(huán)境默認的并行度:" + parallelism0);
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//獲取DataStream的并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism);
lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}
結(jié)果:
output 文件夾下出現(xiàn)以數(shù)字命名的文件
內(nèi)容為 socket 數(shù)據(jù)源輸出,加上了 \n 換行符
目錄中的文件名稱是該 Sink 所在 subtask 的 Index + 1
總結(jié):
- 1- writeAsText 輸出數(shù)據(jù)以小文件方式,文件命名為 subtask 的 Index + 1
- 2- FileSystem.WriteMode 有兩種,一種是 OVERWRITE,可以覆蓋同名文件,一種是 NO_OVERWRITE,同名文件就報錯
4.3 writeAsCsv 以 csv 格式輸出
該方法是將數(shù)據(jù)以 csv 格式寫入到指定的目錄中,本質(zhì)上使用的是 CsvOutputFormat 格式寫入的。
實例:socket 數(shù)據(jù)源,將數(shù)據(jù)輸出到文本 csv 中
package cn.itcast.day06.sink;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.core.fs.FileSystem;
/**
* @author lql
* @time 2024-02-17 22:52:12
* @description TODO:將DataSet數(shù)據(jù)寫入到csv文件中
*/
public class CsvSink {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//需先建立文件
String filePath = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";
//添加數(shù)據(jù)
Tuple7<Integer, String, Integer, Integer, String, String, Long> row = new Tuple7<>(15, "zhangsan", 40, 1, "CN", "2020-09-08 00:00:00", 1599494400000L);
//轉(zhuǎn)換為dataSet,利用 數(shù)據(jù)源中 fromElements 可以接受 [列表或元組] 的屬性
DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>> dataSet = (DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>>) env.fromElements(row);
//將內(nèi)容寫入到File中,如果文件已存在,將會被復蓋
dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute();
}
}
結(jié)果:
在指定文件中,生成了 csv 數(shù)據(jù)
總結(jié):
- 1- 首先需要定義數(shù)據(jù)存放的位置,精確到 .scv
- 2- 最終需要將并行度設(shè)置為 1,才能生成一個完整的文件
4.4 writeUsingOutputFormat 指定格式輸出
該方法是將數(shù)據(jù)已指定的格式寫入到指定目錄中
package cn.itcast.day06.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 23:03:24
* @description TODO:將數(shù)據(jù)已指定的格式寫入到指定目錄中
*/
public class writeUsingOutputFormatSink {
public static void main(String[] args) throws Exception {
//1:獲取流處理運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//調(diào)用env的fromElements創(chuàng)建一個非并行的DataStreamSource
DataStreamSource<String> words = env.fromElements(
"hadoop","spark","flink","hbase","flink","spark"
);
// 對拆分后的單詞,每個單詞記一次數(shù)
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeUsingOutputFormat(new TextOutputFormat<>(new Path("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
env.execute();
}
}
結(jié)果:
在指定目錄下,生成 n(電腦并行度數(shù)量) 個文本文件
總結(jié):
- 1- writeAsText 和 writeAsCsv 方法底層都是調(diào)用了 writeUsingOutputFormat 方法
- 2- 這種方法更加靈活
4.5 writeToSocket 輸出到網(wǎng)絡(luò)端口
該方法是將數(shù)據(jù)輸出到指定的Socket網(wǎng)絡(luò)地址端口。
實例:socket 數(shù)據(jù)源,node1:9999 寫數(shù)據(jù)到 node1:8888
package cn.itcast.day06.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 23:12:03
* @description TODO:writeToSocket
*/
public class WriteToSocketDemo {
public static void main(String[] args) throws Exception {
//local模式默認的并行度是當前機器的邏輯核的數(shù)量
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism0 = env.getParallelism();
System.out.println("執(zhí)行環(huán)境默認的并行度:" + parallelism0);
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//獲取DataStream的并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism);
// 第三個參數(shù)是數(shù)據(jù)輸出的序列化格式 SerializationSchema
lines.writeToSocket("node1",8888,new SimpleStringSchema());
env.execute();
}
}
結(jié)果:
node1:8888 實時接收到 node1:9999 寫入的數(shù)據(jù)
總結(jié):
- 端口號需要提前開啟
4.6 基于本地集合的 Sink
數(shù)據(jù)分類集合輸出
實例:數(shù)據(jù)打印輸出,error 輸出,可以輸出到:Stdout,Stderr,采集為本地集合
package cn.itcast.day06.sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 23:18:12
* @description TODO:數(shù)據(jù)可以輸出到:Stdout,Stderr,采集為本地集合
*/
public class CollectionDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> dataSource = env.fromElements(
Tuple2.of(1, "zhangsan"),
Tuple2.of(2, "lisi"),
Tuple2.of(3, "wangwu"),
Tuple2.of(4, "zhaoliu")
);
//2.sink
dataSource.print();
dataSource.printToErr();
env.execute();
}
}
結(jié)果:文章來源:http://www.zghlxwxcb.cn/news/detail-833949.html
黑色字體輸出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)
紅色字體輸出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)
總結(jié):文章來源地址http://www.zghlxwxcb.cn/news/detail-833949.html
- 1- printToErr 可以進行分類輸出
- 2- 并行度是1 能輸出文件
- 3- 并行度是n 能輸出文件夾
到了這里,關(guān)于flink重溫筆記(六):Flink 流批一體 API 開發(fā)—— 數(shù)據(jù)輸出 sink的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!