WordCount
需求:統(tǒng)計(jì)一段文字中,每個(gè)單詞出現(xiàn)的頻次
添加依賴
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
1.批處理
基本思路:先逐行讀入文件數(shù)據(jù),然后將每一行文字拆分成單詞;接著按照單詞分組,統(tǒng)計(jì)每組數(shù)據(jù)的個(gè)數(shù)。
1.1.數(shù)據(jù)準(zhǔn)備
resources目錄下新建一個(gè) input 文件夾,并在下面創(chuàng)建文本文件words.txt
words.txt
hello flink
hello world
hello java
1.2.代碼編寫(xiě)
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建執(zhí)行環(huán)境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 從文件讀取數(shù)據(jù) 按行讀取(存儲(chǔ)的元素就是每行的文本)
String filePath = Objects.requireNonNull(
BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
DataSource<String> lineDS = env.readTextFile(filePath);
// 3. 轉(zhuǎn)換數(shù)據(jù)格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(
new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
});
// 4. 按照 word 進(jìn)行分組
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
// 5. 分組內(nèi)聚合統(tǒng)計(jì)
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
// 6. 打印結(jié)果
sum.print();
}
}
打印結(jié)果如下:(結(jié)果正確)
上述代碼是基于 DataSet API 的,也就是對(duì)數(shù)據(jù)的處理轉(zhuǎn)換,是看作數(shù)據(jù)集來(lái)進(jìn)行操作的。
事實(shí)上 Flink 本身是流批統(tǒng)一
的處理架構(gòu),批量的數(shù)據(jù)集本質(zhì)上也是流,沒(méi)有必要用兩套不同的 API 來(lái)實(shí)現(xiàn)。從Flink 1.12 開(kāi)始,官方推薦的做法是直接使用 DataStream API,在提交任務(wù)時(shí)通過(guò)將執(zhí)行模式設(shè)為BATCH
來(lái)進(jìn)行批處理:
bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
2.流處理
DataStreamAPI可以直接處理批處理和流處理的所有場(chǎng)景
2.1讀取文件
還是上述words.txt文件
代碼實(shí)現(xiàn):
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建流式執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.讀取文件
String filePath = Objects.requireNonNull(
StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
DataStreamSource<String> lineStream = env.readTextFile(filePath);
// 3. 轉(zhuǎn)換、分組、求和,得到統(tǒng)計(jì)結(jié)果
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
}).keyBy(data -> data.f0)
.sum(1);
// 4. 打印
sum.print();
// 5. 執(zhí)行
env.execute();
}
}
與批處理程序BatchWordCount有幾點(diǎn)不同:
- 創(chuàng)建執(zhí)行環(huán)境的不同,流處理程序使用的是
StreamExecutionEnvironment
。 - 轉(zhuǎn)換處理之后,得到的數(shù)據(jù)對(duì)象類型不同。
- 分組操做調(diào)用的是 keyBy 方法,可以傳入一個(gè)匿名函數(shù)作為鍵選擇器(KeySelector),指定當(dāng)前分組的key。
- 最后執(zhí)行execute方法,開(kāi)始執(zhí)行任務(wù)。
2.2讀取Socket文件流
實(shí)際生產(chǎn)中,真正的數(shù)據(jù)多是無(wú)界的,需要持續(xù)地捕獲數(shù)據(jù)。為了模擬這種場(chǎng)景,可以監(jiān)聽(tīng) socket 端口
,然后向該端口不斷的發(fā)送數(shù)據(jù)。
- 簡(jiǎn)單改動(dòng),只需將StreamWordCount 代碼中讀取文件數(shù)據(jù)的
readTextFile
方法,替換成讀取socket文本流的方法socketTextStream
。
public class StreamSocketWordCount {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建流式執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.讀取文件
DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);
// 3. 轉(zhuǎn)換、分組、求和,得到統(tǒng)計(jì)結(jié)果
SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
}).keyBy(data -> data.f0)
.sum(1);
// 4. 打印
sum.print();
// 5. 執(zhí)行
env.execute();
}
}
- 在 Linux 環(huán)境的主機(jī) 124.222.253.33 上,執(zhí)行下列命令,發(fā)送數(shù)據(jù)進(jìn)行測(cè)試
nc -lk 7777
注意:要先啟動(dòng)端口,后啟動(dòng) StreamSocketWordCount 程序,否則會(huì)報(bào)超時(shí)連接異常。
- 從Linux發(fā)送數(shù)據(jù)
1、輸入“hello flink”,輸出如下內(nèi)容
2、再輸入“hello world”,輸出如下內(nèi)容
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-741672.html
Flink 還具有一個(gè)類型提取系統(tǒng),可以分析函數(shù)的輸入和返回類型,自動(dòng)獲取類型信息,從而獲得對(duì)應(yīng)的序列化器和反序列化器。但是,由于 Java 中泛型擦除
的存在,在某些特殊情況下(比如 Lambda 表達(dá)式中),自動(dòng)提取的信息是不夠精細(xì)的,對(duì)于 flatMap 里傳入的 Lambda 表達(dá)式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無(wú)法得到 Tuple2<String, Long>。需要顯式地告訴系統(tǒng)當(dāng)前的返回類型,才能正確地解析出完整數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-741672.html
到了這里,關(guān)于【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!