端午假期,夏日炎炎,溫度連續(xù)40度以上,在家學習Flink相關(guān)知識,記錄下來,方便備查。
開發(fā)工具:IntelliJ Idea
Flink版本:1.13.0
本次主要用Flink實現(xiàn)批處理(DataSet API) 和 流處理(DataStream API)簡單實現(xiàn)。
第一步、創(chuàng)建項目與添加依賴
1)新建項目
打開Idea,新建Maven項目,包和項目命名,點擊確定進入項目。
2)引入依賴
在pom.xml文件中添加依賴,即Flink-java、flink-streaming、slf4j等, 可參考以下代碼。
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.2</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
3)添加日志文件
在resource目錄下添加日志文件log4j.properties,內(nèi)容如下所示。
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=@-4r [%t] %-5p %c %x - %m%n
第二步、構(gòu)造數(shù)據(jù)集
在項目下新建 input 文件夾,用于存放數(shù)據(jù)集,在其下新建 words.txt 文件,即測試的數(shù)據(jù)集,如下圖所示。
第三步、編寫業(yè)務代碼
讀取數(shù)據(jù)集中內(nèi)容,并進行單詞的字數(shù)統(tǒng)計。新建 BatchWordCout 類,引入分6個步驟實現(xiàn)數(shù)據(jù)集的讀取與打印。
方式一、批處理 DataSet API
主要處理步驟為
1)創(chuàng)建執(zhí)行環(huán)境;
2)從環(huán)境中讀取數(shù)據(jù);
3)將每行數(shù)據(jù)進行分詞,轉(zhuǎn)化成二元組類型 扁平映射;
4)按照word進行分組;
5)分組內(nèi)進行聚合統(tǒng)計;
6)打印結(jié)果
批處理 DataSet API 寫法如下所示。
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
//1、創(chuàng)建執(zhí)行環(huán)境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2、從環(huán)境中讀取數(shù)據(jù)
DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
// 3、將每行數(shù)據(jù)進行分詞,轉(zhuǎn)化成二元組類型 扁平映射
FlatMapOperator<String,Tuple2<String,Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String,Long>> out) -> {
// 將每行文本進行拆分
String[] words = line.split(" ");
// 將每個單詞轉(zhuǎn)化成二元組
for(String word : words){
out.collect(Tuple2.of(word,1L));
}
}).returns(Types.TUPLE(Types.STRING,Types.LONG));
// 4、按照word進行分組
UnsortedGrouping<Tuple2<String,Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 5、分組內(nèi)進行聚合統(tǒng)計
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
// 6、打印結(jié)果
sum.print();
}
控制臺打印效果如下圖所示。
在Flink 1.12 版本后,官方推薦做法是直接使用 DataSet API 即提交任務時將執(zhí)行模式更改為BATCH來進行批處理
$bin/flink run -Dexecution.runtime-mode=BATCH batchWordCount.jar
方式二、流處理 DataStream API
流處理的處理步驟與批處理流程類似,主要區(qū)別是執(zhí)行環(huán)境不一樣。
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BatchSteamWordCount {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建流式執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、讀取文件
DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
// 3、轉(zhuǎn)換計算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 將每行文本進行拆分
String[] words = line.split(" ");
// 將每個單詞轉(zhuǎn)化成二元組
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4、分組
KeyedStream<Tuple2<String, Long>, Object> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
// 5、求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
// 6、打印結(jié)果
sum.print();
// 7、啟動執(zhí)行
env.execute();
}
}
控制臺輸出結(jié)果如下圖所示。
從打印結(jié)果可以看出 多線程執(zhí)行,結(jié)果是無序;第一列數(shù)字與本地運行環(huán)境的CPU核數(shù)有關(guān);文章來源:http://www.zghlxwxcb.cn/news/detail-499892.html
參考文檔
【1】https://www.bilibili.com/video/BV133411s7Sa?p=9&vd_source=c8717efb4869aaa507d74b272c5d90be文章來源地址http://www.zghlxwxcb.cn/news/detail-499892.html
到了這里,關(guān)于初探Flink的Java實現(xiàn)流處理和批處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!