Flink DataStream API的基本使用
前言
Flink DataStream API主要用于處理無(wú)界和有界數(shù)據(jù)流 。無(wú)界數(shù)據(jù)流
是一個(gè)持續(xù)生成數(shù)據(jù)的數(shù)據(jù)源,它沒(méi)有明確的結(jié)束點(diǎn),例如實(shí)時(shí)的交易數(shù)據(jù)或傳感器數(shù)據(jù)。這種類型的數(shù)據(jù)流需要使用Apache Flink的實(shí)時(shí)處理功能來(lái)連續(xù)地處理和分析。
有界數(shù)據(jù)流
是一個(gè)具有明確開(kāi)始和結(jié)束點(diǎn)的數(shù)據(jù)集,例如一個(gè)文件或數(shù)據(jù)庫(kù)表。這種類型的數(shù)據(jù)流通常在批處理場(chǎng)景中使用,其中所有數(shù)據(jù)都已經(jīng)可用,并可以一次性處理。
Flink的DataStream API提供了一套豐富的操作符,如map、filter、reduce、aggregations、windowing、join
等,以支持各種復(fù)雜的數(shù)據(jù)處理和分析需求。此外,DataStream API還提供了容錯(cuò)保證,能確保在發(fā)生故障時(shí),應(yīng)用程序能從最近的檢查點(diǎn)(checkpoint)恢復(fù),從而實(shí)現(xiàn)精確一次(exactly-once)的處理語(yǔ)義。
1. 基本使用方法
-
創(chuàng)建執(zhí)行環(huán)境:
每一個(gè)Flink程序都需要?jiǎng)?chuàng)建一個(gè)
StreamExecutionEnvironment
(執(zhí)行環(huán)境),它可以被用來(lái)設(shè)置參數(shù)和創(chuàng)建從外部系統(tǒng)讀取數(shù)據(jù)的流。final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
創(chuàng)建數(shù)據(jù)流:
你可以從各種數(shù)據(jù)源中創(chuàng)建數(shù)據(jù)流,如本地集合,文件,socket等。下面的代碼是從本地集合創(chuàng)建數(shù)據(jù)流的示例:
DataStream<String> dataStream = env.fromElements("hello", "flink");
-
轉(zhuǎn)換操作:
Flink提供了豐富的轉(zhuǎn)換操作,如
map
,filter
,reduce
等。以下代碼首先將每個(gè)字符串映射為其長(zhǎng)度,然后過(guò)濾出長(zhǎng)度大于5的元素:DataStream<Integer> transformedStream = dataStream .map(s -> s.length()) .filter(l -> l > 5);
-
數(shù)據(jù)輸出:
Flink支持將數(shù)據(jù)流輸出到各種存儲(chǔ)系統(tǒng),如文件,socket,數(shù)據(jù)庫(kù)等。下面的代碼將數(shù)據(jù)流輸出到標(biāo)準(zhǔn)輸出:
transformedStream.print();
-
執(zhí)行程序:
將上述所有步驟放在main函數(shù)中,并在最后調(diào)用
env.execute()
方法來(lái)啟動(dòng)程序。Flink程序是懶加載的,只有在調(diào)用execute
方法時(shí)才會(huì)真正開(kāi)始執(zhí)行。env.execute("Flink Basic API Usage");
2. 核心示例代碼
使用Flink DataStream API構(gòu)建一個(gè)實(shí)時(shí)Word Count程序,它會(huì)從一個(gè)socket端口讀取文本數(shù)據(jù),統(tǒng)計(jì)每個(gè)單詞的出現(xiàn)次數(shù),并將結(jié)果輸出到標(biāo)準(zhǔn)輸出。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountExample {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建執(zhí)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 創(chuàng)建數(shù)據(jù)流,從socket接收數(shù)據(jù),需要在本地啟動(dòng)一個(gè)端口為9000的socket服務(wù)器
DataStream<String> textStream = env.socketTextStream("localhost", 9000);
// 3. 轉(zhuǎn)換操作
DataStream<Tuple2<String, Integer>> wordCountStream = textStream
.flatMap(new LineSplitter()) // 將文本行切分為單詞
.keyBy(0) // 按單詞分組
.sum(1); // 對(duì)每個(gè)單詞的計(jì)數(shù)求和
// 4. 數(shù)據(jù)輸出
wordCountStream.print();
// 5. 執(zhí)行程序
env.execute("Socket Word Count Example");
}
// 自定義一個(gè)FlatMapFunction,將輸入的每一行文本切分為單詞,并輸出為Tuple2,第一個(gè)元素是單詞,第二個(gè)元素是計(jì)數(shù)(初始值為1)
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
3. 完成工程代碼
下面是一個(gè)基于Apache Flink的實(shí)時(shí)單詞計(jì)數(shù)應(yīng)用程序的完整工程代碼,包括Pom.xml文件和所有Java類。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-wordcount-example</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<flink.version>1.13.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
WordCountExample
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountExample {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建執(zhí)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 創(chuàng)建數(shù)據(jù)流,從socket接收數(shù)據(jù),需要在本地啟動(dòng)一個(gè)端口為9000的socket服務(wù)器
DataStream<String> textStream = env.socketTextStream("localhost", 9000);
// 3. 轉(zhuǎn)換操作
DataStream<Tuple2<String, Integer>> wordCountStream = textStream
.flatMap(new LineSplitter()) // 將文本行切分為單詞
.keyBy(0) // 按單詞分組
.sum(1); // 對(duì)每個(gè)單詞的計(jì)數(shù)求和
// 4. 數(shù)據(jù)輸出
wordCountStream.print();
// 5. 執(zhí)行程序
env.execute("Socket Word Count Example");
}
// 自定義一個(gè)FlatMapFunction,將輸入的每一行文本切分為單詞,并輸出為Tuple2,第一個(gè)元素是單詞,第二個(gè)元素是計(jì)數(shù)(初始值為1)
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
現(xiàn)在,你可以使用Maven編譯并運(yùn)行這個(gè)程序。在啟動(dòng)程序之前,你需要在本地啟動(dòng)一個(gè)端口為9000的Socket服務(wù)器。這可以通過(guò)使用Netcat工具 (nc -lk 9000
) 或者其他任何能打開(kāi)端口的工具實(shí)現(xiàn)。然后,你可以輸入文本行,F(xiàn)link程序會(huì)統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù),并實(shí)時(shí)打印結(jié)果。
測(cè)試驗(yàn)證
用py在本地啟動(dòng)一個(gè)socket服務(wù)器,監(jiān)聽(tīng)9000端口,
python比較簡(jiǎn)單實(shí)現(xiàn)一個(gè)socket通信 。寫(xiě)一個(gè)Python來(lái)驗(yàn)證上面寫(xiě)的例子。
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)
print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)
while True:
data = input("Enter text: ")
client_socket.sendall(data.encode())
運(yùn)行Flink程序和Python socket服務(wù)器,然后在Python程序中輸入文本, 會(huì)看到Flink程序?qū)崟r(shí)統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)并輸出到控制臺(tái)。
4. Stream 執(zhí)行環(huán)境
開(kāi)發(fā)學(xué)習(xí)過(guò)程中,不需要關(guān)注。每個(gè) Flink 應(yīng)用都需要有執(zhí)行環(huán)境,在該示例中為 env。流式應(yīng)用需要用到 StreamExecutionEnvironment。
DataStream API 將你的應(yīng)用構(gòu)建為一個(gè) job graph,并附加到 StreamExecutionEnvironment 。當(dāng)調(diào)用 env.execute() 時(shí)此 graph 就被打包并發(fā)送到 JobManager 上,后者對(duì)作業(yè)并行處理并將其子任務(wù)分發(fā)給 Task Manager 來(lái)執(zhí)行。每個(gè)作業(yè)的并行子任務(wù)將在 task slot 中執(zhí)行。
注意,如果沒(méi)有調(diào)用 execute(),應(yīng)用就不會(huì)運(yùn)行。
Flink runtime: client, job manager, task managers
此分布式運(yùn)行時(shí)取決于你的應(yīng)用是否是可序列化的。它還要求所有依賴對(duì)集群中的每個(gè)節(jié)點(diǎn)均可用。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-735389.html
5. 參考文檔
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-735389.html
到了這里,關(guān)于【Apache Flink】Flink DataStream API的基本使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!