系列文章目錄
【跟小嘉學(xué) Apache Flink】一、Apache Flink 介紹
【跟小嘉學(xué) Apache Flink】二、Flink 快速上手
一、創(chuàng)建工程
1.1、創(chuàng)建 Maven 工程
創(chuàng)建 maven 工程 并且添加如下依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.xiaojia</groupId>
<artifactId>flinkdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>19</maven.compiler.source>
<maven.compiler.target>19</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
</dependency>
</dependencies>
</project>
1.2、log4j 配置
在 resource 目錄下創(chuàng)建 log4j.properties 文件,寫入如下內(nèi)容
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
二、批處理單詞統(tǒng)計(jì)(DataSet API)
2.1、創(chuàng)建 BatchWordCount 類型
package org.xiaojia.demo.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) {
// 1、創(chuàng)建執(zhí)行環(huán)境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// 2、 從文件讀取數(shù)據(jù)
DataSource<String> lineDataSource = executionEnvironment.readTextFile("input/words.txt");
// 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4、按照 word 進(jìn)行分組
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 5、分組內(nèi)進(jìn)行聚合統(tǒng)計(jì)
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
try {
sum.print();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2.4、運(yùn)行結(jié)果
實(shí)際上在 Flink 里面已經(jīng)做到流批處理統(tǒng)一,官方推薦使用 DateStream API,在跳任務(wù)時(shí)通過(guò)執(zhí)行模式設(shè)置為 Batch 來(lái)進(jìn)行批處理
bin/fliink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
三、流處理單詞統(tǒng)計(jì)(DataSet API)
使用 DataSet API可以很容易實(shí)現(xiàn)批處理。對(duì)于Flink而言,流處理才是處理邏輯的底層核心,所以流批統(tǒng)一之后的 DataStream API 更加強(qiáng)大,可以直接處理批處理和流處理的所有場(chǎng)景。
在 Flink 的視角里,一切數(shù)據(jù)都可以認(rèn)為是流,流數(shù)據(jù)是無(wú)界流,而批數(shù)據(jù)是有界流。所以批處理,其實(shí)可以看作是有界流的處理。
3.1、讀取文件流
3.1.1、過(guò)時(shí)的寫法
package org.xiaojia.demo.wc.stream;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundStreamWordCount {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建流式的執(zhí)行環(huán)境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、讀取文件
DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.readTextFile("input/words.txt");
// 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4、按照 word 進(jìn)行分組
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);
// 5、求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);
// 6、打印
sum.print();
// 7、執(zhí)行等待
streamExecutionEnvironment.execute();
}
}
3.1.2、執(zhí)行錯(cuò)誤的處理
Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module
@7ce6a65d
如果出現(xiàn)上述類似錯(cuò)誤,解決方案,通過(guò)添加 VM參數(shù)打開(kāi)對(duì)應(yīng)模塊的對(duì)應(yīng)模塊包
--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED
3.1.3、執(zhí)行結(jié)果
3.1.4、readTextFile 過(guò)時(shí)問(wèn)題
解決方案可以按照提示給出的 使用 FileSource(需要用到Flink的連接器)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
3.2、讀取 socket 網(wǎng)絡(luò)流
3.2.1、讀取socket 流代碼
package org.xiaojia.demo.wc.stream;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建流式的執(zhí)行環(huán)境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、讀取socket流
String hostname = "127.0.0.1";
int port = 8888;
DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);
// 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4、按照 word 進(jìn)行分組
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);
// 5、求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);
// 6、打印
sum.print();
// 7、執(zhí)行等待
streamExecutionEnvironment.execute();
}
}
3.2.2、使用 nc 監(jiān)聽(tīng)端口
(base) xiaojiadeMacBook-Pro:~ xiaojia$ nc -lk 8888
hello java
hello flink
hello world
3.2.3、執(zhí)行結(jié)果
此時(shí),只要有數(shù)據(jù)進(jìn)來(lái),就會(huì)統(tǒng)計(jì)文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-707299.html
3.2.4、從命令行參數(shù)獲取主機(jī)名和端口號(hào)
package org.xiaojia.demo.wc.stream;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建流式的執(zhí)行環(huán)境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、讀取socket流
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("host");
int port = parameterTool.getInt("port");
DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);
// 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4、按照 word 進(jìn)行分組
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);
// 5、求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);
// 6、打印
sum.print();
// 7、執(zhí)行等待
streamExecutionEnvironment.execute();
}
}
命令行參數(shù)傳遞文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-707299.html
到了這里,關(guān)于【跟小嘉學(xué) Apache Flink】二、Flink 快速上手的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!