Flink學(xué)習(xí)筆記
前言:今天是第二天啦!開始學(xué)習(xí) Flink 流批一體化開發(fā)知識點(diǎn),重點(diǎn)學(xué)習(xí)了各類數(shù)據(jù)源的導(dǎo)入操作,我發(fā)現(xiàn)學(xué)習(xí)編程需要分類記憶,一次一次地猜想 api 作用,然后通過敲代碼印證自己的想法,以此理解知識點(diǎn),加深對api的理解和應(yīng)用。
Tips:我覺得學(xué)習(xí) Flink 還是挺有意思的,雖然學(xué)習(xí)進(jìn)度有點(diǎn)慢,但是數(shù)據(jù)源已經(jīng)理解清楚了,我相信接下來一切會(huì)越來越好的!
二、Flink 流批一體 API 開發(fā)
1. 輸入數(shù)據(jù)集 Data Source
1.1 預(yù)定義 Source
1.1.1 基于本地集合的 Source
- (1) env.fromElements()
# 兩種輸入類型,一種是元素,一種是元組
DataStreamSource<Object> ds1 = env.fromElements("hadoop","spark", "spark", "flink");
List<Tuple2<String,Long>> tuple2List = new ArrayList<>();
tuple2List.add(Tuple2.of("hadoop",1L));
tuple2List.add(Tuple2.of("spark", 2L));
tuple2List.add(Tuple2.of("flink", 3L));
DataStreamSource<List<Tuple2<String, Long>>> ds2 = env.fromElements(tuple2List);
# 輸出-1
6> spark
4> hadoop
5> spark
7> flink
# 輸出-2
6> [(hadoop,1), (spark,2), (flink,3)]
- (2) env.fromCollection()
# 傳入列表
DataStreamSource<String> ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop"));
# 輸出-3
8> hadoop
6> spark
7> flink
# fromParallelCollection 并行度隊(duì)列(0-10閉區(qū)間)
DataStreamSource<Long> parallelCollection = env.fromParallelCollection(
new NumberSequenceIterator(0L, 10L),
TypeInformation.of(Long.TYPE)
).setParallelism(3);
# 亂序輸出 -parallelCollection
8> 8
2> 10
8> 7
6> 3
6> 5
3> 0
7> 6
1> 9
5> 2
5> 4
4> 1
- (3) env.generateSequence()
# 傳入隊(duì)列(左開右閉區(qū)間)
DataStreamSource<Long> ds4 = env.generateSequence(1, 10);
# 輸出 -4
8> 8
3> 3
2> 2
5> 5
1> 1
1> 9
7> 7
6> 6
4> 4
2> 10
- (4) env.fromSequence()
# 傳入隊(duì)列(左開右閉區(qū)間)
DataStreamSource<Long> ds5 = env.fromSequence(1, 10);
# 輸出 -5
1> 8
7> 6
6> 10
2> 5
3> 1
3> 2
8> 7
4> 9
5> 3
5> 4
1.1.2 基于文件的 Source
- (1) 批的方式讀取文本文件:env.readTextFile(path)
package cn.itcast.day02.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-12 23:47:53
* @description TODO:批的方式讀取文件
*/
public class BatchFromFile {
public static void main(String[] args) throws Exception {
// 配置端口號信息
Configuration configuration = new Configuration();
configuration.setInteger("rest.port",8081);
// 初始化 UI 環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 讀取數(shù)據(jù)源
String path = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt";
DataStreamSource<String> lines = env.readTextFile(path);
// 數(shù)據(jù)源并行度
int parallelism = lines.getParallelism();
System.out.println("ReadTextFileDemo創(chuàng)建的DataStream的并行度為:" + parallelism);
lines.print();
env.execute();
}
}
- (2) 流的方式讀取文本文件:env.readFile()
- 細(xì)節(jié)點(diǎn):流式處理 PROCESS_CONTINUOUSLY 時(shí),文件狀態(tài)改變才能觸發(fā)重新打印一次
package cn.itcast.day02.source;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
/**
* @author lql
* @time 2024-02-13 15:34:11
* @description TODO:流的方式讀取數(shù)據(jù)源,無限流
*/
public class StreamFromFile {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port",8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
String path = "./data/input/wordcount.txt";
// new TextInputFormat(null),文本輸入編碼格式,null表示默認(rèn)為utf-8編碼
// FileProcessingMode.PROCESS_ONCE 只處理一次
// 2000毫秒表示間隔處理時(shí)間
DataStreamSource<String> lines1 = env.readFile(new TextInputFormat(null), path,
FileProcessingMode.PROCESS_ONCE, 2000
);
// FileProcessingMode.PROCESS_CONTINUOUSLY 永續(xù)處理,不會(huì)停止
DataStreamSource<String> lines2 = env.readFile(new TextInputFormat(null), path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 2000
);
// 查看并行度
System.out.println("lines1的并行度:"+lines1.getParallelism());
System.out.println("lines2的并行度:"+lines2.getParallelism());
//lines1.print();
lines2.print();
env.execute();
}
}
1.1.3 基于 Socket 的 Source
- 現(xiàn)象:socket 的并行度是 1(單并行度數(shù)據(jù)源)
- 細(xì)節(jié):在虛擬機(jī)上用 nc -lk 8888 啟動(dòng) socket 服務(wù)端
package cn.itcast.day02.source;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lql
* @time 2024-02-13 16:00:47
* @description TODO:基于socket的數(shù)據(jù)源
*/
public class StreamSocketSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
int parallelism0 = env.getParallelism();
System.out.println("執(zhí)行環(huán)境默認(rèn)的并行度:" + parallelism0);
DataStreamSource<String> lines = env.socketTextStream("192.168.88.161", 8888);
int parallelism1 = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism1);
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
int parallelism2 = words.getParallelism();
System.out.println("調(diào)用完FlatMap后DataStream的并行度:" + parallelism2);
words.print();
env.execute();
}
}
1.2 自定義 Source
1.2.1 基于隨機(jī)生成DataSource
- (1) 自定義實(shí)現(xiàn) SourceFunction 接口
- 例子:自定義數(shù)據(jù)源, 每1秒鐘隨機(jī)生成一條訂單信息(訂單ID、用戶ID、訂單金額、時(shí)間戳)
- 要求: 隨機(jī)生成訂單ID(UUID),用戶ID(0-2),訂單金額(0-100),時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間
package cn.itcast.day02.source.custom;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.UUID;
/**
* @author lql
* @time 2024-02-13 16:21:31
* @description TODO
*/
public class CustomerSourceWithoutParallelDemo {
/**
* 自定義 java Bean 類
* @Data:自動(dòng)為類生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
* @AllArgsConstructor:自動(dòng)生成一個(gè)包含所有參數(shù)的構(gòu)造函數(shù)。
* @NoArgsConstructor:自動(dòng)生成一個(gè)無參構(gòu)造函數(shù)。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order{
// 訂單
private String id;
// 用戶 ID
private String userId;
// 訂單金額
private int money;
// 時(shí)間戳
private Long timestamp;
}
/**
* 主函數(shù)
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//todo 1)獲取flink流處理的運(yùn)行環(huán)境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port",8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
System.out.println("初始化環(huán)境的并行度:"+ env.getParallelism());
// todo 2) 接入自定義數(shù)據(jù)源
DataStreamSource<Order> streamSource = env.addSource(new MySource());
System.out.println("streamSource并行度: " + streamSource.getParallelism());
// todo 3) 打印輸出
streamSource.printToErr();
env.execute();
}
/**
* 自定義數(shù)據(jù)源,每秒鐘生成一個(gè)訂單信息
*/
private static class MySource implements SourceFunction<Order> {
// 定義循環(huán)生成數(shù)據(jù)的標(biāo)記
private boolean isRunning = true;
/**
* 核心方法:生成數(shù)據(jù)
*/
@Override
public void run(SourceContext<Order> sourceContext) throws Exception {
Random random = new Random();
while (isRunning){
// 訂單ID
String orderID = UUID.randomUUID().toString();
// 用戶 Id
String userID = String.valueOf(random.nextInt(3));
// 訂單金額
int money = random.nextInt(1000);
// 時(shí)間
long time = System.currentTimeMillis();
// 返回?cái)?shù)據(jù)
sourceContext.collect(new Order(orderID, userID, money, time));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
結(jié)果:默認(rèn)運(yùn)行環(huán)境的并行度:8, 自定義streamSource的并行度為:1
總結(jié):文章來源地址http://www.zghlxwxcb.cn/news/detail-826946.html
-
1- env.addSource(new MySource()),自定義數(shù)據(jù)源 [私有靜態(tài)方法]:
- new 一個(gè) 實(shí)現(xiàn)(implements) SourceFunction 接口,并重寫核心方法
-
2- 認(rèn)識了 java bean 類,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用
-
3- UUID 這個(gè)工具類可以隨機(jī)生成 id,隨機(jī)數(shù)使用需要先 new 一個(gè),random.nextInt() 是左閉右開
-
4- String.valuesOf()是可以生成字符串類型,while 循環(huán)需要有 boolean 標(biāo)記
-
5- collect()可以返回對象數(shù)據(jù)
-
(2) 實(shí)現(xiàn)ParallelSourceFunction創(chuàng)建可并行Source
DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
# 上述非rich的自定義mySource數(shù)據(jù)源不支持多個(gè)并行度
- (3) 實(shí)現(xiàn)RichParallelSourceFunction:創(chuàng)建并行并帶有Rich功能的Source
package cn.itcast.day02.source.custom;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
/**
* @author lql
* @time 2024-02-13 16:58:49
* @description TODO:多并行度的自定義數(shù)據(jù)源
*/
public class RichParallelismDemo {
/**
* 自定義 java Bean 類
*
* @Data:自動(dòng)為類生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
* @AllArgsConstructor:自動(dòng)生成一個(gè)包含所有參數(shù)的構(gòu)造函數(shù)。
* @NoArgsConstructor:自動(dòng)生成一個(gè)無參構(gòu)造函數(shù)。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
// 訂單
private String id;
// 用戶 ID
private String userId;
// 訂單金額
private int money;
// 時(shí)間戳
private Long timestamp;
}
/**
* 主函數(shù)
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//todo 1)獲取flink流處理的運(yùn)行環(huán)境
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
System.out.println("初始化環(huán)境的并行度:" + env.getParallelism());
// todo 2) 接入自定義數(shù)據(jù)源
DataStreamSource<Order> streamSource = env.addSource(new MySource());
streamSource = streamSource;
System.out.println("streamSource并行度: " + streamSource.getParallelism());
// todo 3) 打印輸出
streamSource.printToErr();
env.execute();
}
/**
* 自定義數(shù)據(jù)源,每秒鐘生成一個(gè)訂單信息
*/
private static class MySource extends RichParallelSourceFunction<Order> {
// 定義循環(huán)生成數(shù)據(jù)的標(biāo)記
private boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void cancel() {
}
@Override
public void run(SourceContext<Order> sourceContext) throws Exception {
Random random = new Random();
while (isRunning) {
// 訂單ID
String orderID = UUID.randomUUID().toString();
// 用戶 Id
String userID = String.valueOf(random.nextInt(3));
// 訂單金額
int money = random.nextInt(1000);
// 時(shí)間
long time = System.currentTimeMillis();
// 返回?cái)?shù)據(jù)
sourceContext.collect(new Order(orderID, userID, money, time));
}
}
}
}
結(jié)果:自定義RichParallelSourceFunction支持多個(gè)并行度
總結(jié):繼承 RichParallelSourceFunction 方法,需要重寫方法 open 和 close !
1.2.2 基于 MySQL 的 Source 操作
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壯');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');
SET FOREIGN_KEY_CHECKS = 1;
package cn.itcast.day02.source.custom;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;
/**
* @author lql
* @time 2024-02-13 17:14:06
* @description TODO:自定義 mysql 數(shù)據(jù)源
*/
public class MysqlSource {
public static void main(String[] args) throws Exception {
// TODO 1: 獲取 flink 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2: 接入自定義數(shù)據(jù)源
DataStreamSource<UserInfo> streamSource = env.addSource(new MysqlSourceFunction());
System.out.println("MysqlSourceFunction的并行度為:"+streamSource.getParallelism());
// todo 3) 打印輸出
streamSource.print();
// todo 4) 啟動(dòng)運(yùn)行作業(yè)
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class UserInfo{
private int id;
private String username;
private String password;
private String name;
}
/**
* 自定義數(shù)據(jù)源:獲取 mysql 數(shù)據(jù)
*/
private static class MysqlSourceFunction extends RichParallelSourceFunction<UserInfo> {
// 定義 mysql 的連接對象
private Connection connection = null;
// 定義 mysql statement 對象
private PreparedStatement statement = null;
/**
* 實(shí)例化的時(shí)候會(huì)被執(zhí)行一次,多個(gè)并行度會(huì)執(zhí)行多次,因?yàn)橛卸鄠€(gè)實(shí)例
* 一般由于資源的初始化操作
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 注冊驅(qū)動(dòng)
Class.forName("com.mysql.jdbc.Driver");
// 實(shí)例化 mysql 的連接對象
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root");
// 實(shí)例化 statement 對象
statement = connection.prepareStatement("select * from test.user");
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void run(SourceContext<UserInfo> sourceContext) throws Exception {
while(true){
ResultSet resultSet = statement.executeQuery();
while(resultSet.next()) {
int id = resultSet.getInt("id");
String username = resultSet.getString("username");
String password = resultSet.getString("password");
String name = resultSet.getString("name");
sourceContext.collect(new UserInfo(id,username,password,name));
}
resultSet.close();
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
}
}
}
結(jié)果:mysql 的自定義 source,可以多并行度文章來源:http://www.zghlxwxcb.cn/news/detail-826946.html
總結(jié):
- 1- java Bean 類,給 mysql 字段名定義用
- 2- 初始化 mysql 連接對象 connection 和 statement 記為null
- 3- 重寫 open 驅(qū)動(dòng)方法:
- 注冊 mysql 驅(qū)動(dòng):Class.forName(“com.mysql.jdbc.Driver”)
- 實(shí)例化連接對象 connection:DriverManager.getConnection()
- 實(shí)例化 statement:connection.prepareStatement(),這里放置 sql 查詢語句
- 4- 重寫 run 核心方法:
- 雙重循環(huán),第一層:結(jié)果集關(guān)閉和停頓間隔,第二層:statement.executeQuery()獲取結(jié)果集,字段類型和內(nèi)容獲取
- 獲取完字段后,需要collect(new 實(shí)體類(字段集))
- 5- 睡眠時(shí)間:TimeUnit.SECONDS.sleep()
到了這里,關(guān)于flink重溫筆記(二):Flink 流批一體 API 開發(fā)——Source 數(shù)據(jù)源操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!