1、Flink概述
1.1 Flink是什么
Flink的官網(wǎng)主頁地址:https://flink.apache.org/
Flink的核心目標(biāo)是“數(shù)據(jù)流上有狀態(tài)的計(jì)算”(Stateful Computations over Data Streams)。
具體說明:Apache Flink是一個(gè)“框架和分布式處理引擎”,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。
1.1.1 無界數(shù)據(jù)流
- 有定義流的開始,但是沒有定義流的結(jié)束
- 它們會(huì)無休止的產(chǎn)生數(shù)據(jù)
- 無界流的數(shù)據(jù)必須持續(xù)處理,即數(shù)據(jù)被攝取后需要立即處理。我們不能等到所有數(shù)據(jù)都到達(dá)再處理,因?yàn)檩斎霑r(shí)無限的。
1.1.2 有界數(shù)據(jù)流
- 有定義流的開始,也有定義流的結(jié)束
- 有界流可以在攝取所有數(shù)據(jù)后再進(jìn)行計(jì)算
- 有界流所有的數(shù)據(jù)可以被排序,所有并不需要有序攝取
- 有界流處理通常被稱為批處理
1.1.3 有狀態(tài)流處理
把流處理需要的額外數(shù)據(jù)保存成一個(gè)“狀態(tài)”,然后針對(duì)這條數(shù)據(jù)進(jìn)行處理,并且更新狀態(tài),這就是所謂的“有狀態(tài)的流處理”。
- 狀態(tài)在內(nèi)存中:優(yōu)點(diǎn):速度快;缺點(diǎn):可靠性差
- 狀態(tài)在分布式系統(tǒng)中:優(yōu)點(diǎn):可靠性高;缺點(diǎn):速度慢
1.1.4 Flink發(fā)展歷史
1.2 Flink特點(diǎn)
我們處理數(shù)據(jù)的目標(biāo)是:低延遲、高吞吐、結(jié)果的準(zhǔn)確性和良好的容錯(cuò)性。
Flink主要特點(diǎn)如下:
- 高吞吐和低延遲:每秒處理數(shù)百萬個(gè)事件,毫秒級(jí)延遲
- 結(jié)果的準(zhǔn)確性:Flink提供了事件時(shí)間(event-time)和處理時(shí)間(processing-time)語義。對(duì)于亂序事件流,事件時(shí)間語義仍然能提供一致且準(zhǔn)確的結(jié)果。
- 精確一次(exactly-once)的狀態(tài)一致性保證
- 可以連接到最常用的外部系統(tǒng),如kafka、Hive、JDBC、HDFS、Redis等
- 高可用:本身高可用的設(shè)置,加上K8S,Yarn和Mesos的緊密集成,再加上從故障中快速恢復(fù)和動(dòng)態(tài)擴(kuò)展任務(wù)的能力,F(xiàn)link能做到以極少的停機(jī)時(shí)間7x24全天候運(yùn)行。
1.3 Flink和SparkStreaming(說實(shí)話沒有比較的必要)
1、Spark是以批處理為根本。
2、Flink是以流處理為根本。
1.4 Flink的應(yīng)用場(chǎng)景
1、電商和市場(chǎng)營銷
2、物聯(lián)網(wǎng)(IOT)
3、物流配送和服務(wù)業(yè)
4、銀行和金融業(yè)
1.5 Flink分層API
- 有狀態(tài)流處理:通過底層API(處理函數(shù)),對(duì)原始數(shù)據(jù)加工處理。底層API和DataStreamAPI相集成,可以處理復(fù)雜的計(jì)算。
- DataStreamAPI(流處理)和DataSetAPI(批處理)封裝了底層處理函數(shù),提供了通用的模塊,比如轉(zhuǎn)換(transformations,包括map,flatMap等),連接(joins),聚合(aggregations),窗口(Windows)操作等。注意:Flink1.12后,DataStreamAPI已經(jīng)實(shí)現(xiàn)真正的流批一體,所以DataSetAPI已經(jīng)過時(shí)。
- TableAPI是以表為中心的聲明式編程,其中表可能會(huì)動(dòng)態(tài)變化。TableAPI遵循關(guān)系模型;表有二維數(shù)據(jù)結(jié)構(gòu),類似于關(guān)系數(shù)據(jù)庫中的表,同時(shí)API提供可比較的操作,例如select、project、join、group by、aggregate等。我們可以在表與DataStream/DataSet之間無縫切換,以允許程序?qū)ableAPI與DataStream以及DataSet混用。
- SQL這一層在語法與表達(dá)能力上與TableAPI類似,但是是以SQL查詢表達(dá)式的形式表現(xiàn)程序。SQL抽象與TableAPI交互密切,同時(shí)SQL查詢可以直接在TableAPI定義的表上執(zhí)行。
2、Flink快速上手
2.1 創(chuàng)建項(xiàng)目
在準(zhǔn)備好所有的開發(fā)環(huán)境之后,我們就可以開始開發(fā)自己的第一個(gè)Flink程序了。首先我們要做的,就是在IDEA中搭建一個(gè)Flink項(xiàng)目的骨架。我們會(huì)使用Java項(xiàng)目中常見的Maven來進(jìn)行依賴管理。
1、創(chuàng)建工程
(1)打開IntelliJ IDEA,創(chuàng)建一個(gè)Maven工程。
2、添加項(xiàng)目依賴
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.2 WordCount代碼編寫(大數(shù)據(jù)常用的例子)
需求:統(tǒng)計(jì)一段文字中,每個(gè)單詞出現(xiàn)的頻次
環(huán)境準(zhǔn)備:創(chuàng)建一個(gè)com.zhm.wordcount包
2.2.1 批處理
批處理的基本思路:先逐行讀入文件數(shù)據(jù),然后將每一行文子拆分成單詞;接著按照單詞分組,統(tǒng)計(jì)每組數(shù)據(jù)的個(gè)數(shù),就是對(duì)應(yīng)單詞的頻次。
1、數(shù)據(jù)準(zhǔn)備
(1)在工程根目錄下新建一個(gè)data文件夾,并在下面創(chuàng)建文本文件words.txt
(2)在文件中輸入一些單詞
hello hello hello
world world
hello world
2、代碼編寫
(1)在com.zhm.wordcount包下新建一個(gè)Demo01_BatchProcess類
/**
* @ClassName Batch
* @Description 利用Flink批處理單詞統(tǒng)計(jì)
* @Author Zouhuiming
* @Date 2023/9/3 9:58
* @Version 1.0
*/
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
計(jì)算的套路:
(1) 計(jì)算的環(huán)境
Spark:SparkContext
MR:Driver
Flink:ExecutionEnvironment
(2) 把要計(jì)算的數(shù)據(jù)封裝為計(jì)算模型
Spark:RDD(Spark Core)
DateFrame|DataSet(SparkSQL)
DStream(SparkStream)
MR:k-V
Flink:DataSource
(3)調(diào)用計(jì)算API
RDD.轉(zhuǎn)換算子()
MR:自己去編寫Mapper、Reducer
Flink:DataSource.算子()
*/
public class Demo01_BatchProcess {
public static void main(String[] args) throws Exception {
//創(chuàng)建支持Flink計(jì)算的環(huán)境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//使用環(huán)境去讀取數(shù)據(jù),封裝為計(jì)算模型
DataSource<String> dataSource = env.readTextFile("data/words.txt");
//調(diào)用計(jì)算API
dataSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(new Tuple2<String,Integer>(s1,1));
}
}
}).groupBy(0)
.sum(1)
.print();
}
}
運(yùn)行結(jié)果:
注意:這種實(shí)現(xiàn)是基于DataSetAPI的,也就是我們對(duì)數(shù)據(jù)的處理轉(zhuǎn)換,是看作數(shù)據(jù)集來進(jìn)行操作的。事實(shí)上Flink本身是流批統(tǒng)一的處理架構(gòu),批量的數(shù)據(jù)集本質(zhì)上也是流,沒有必要用兩套不同的API來實(shí)現(xiàn)。所以從Flink1.12開始,官方推薦的做法是直接使用DataStreamAPI,在提交任務(wù)時(shí)通過將執(zhí)行模式設(shè)為BATCH來進(jìn)行批處理;
bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
這樣,DataSetAPI就沒有用了,在實(shí)際應(yīng)用中我們只要維護(hù)一套DataStreamAPI就可以。這里只是為了方便大家理解,我們依然用DataSetAPI做了批處理的實(shí)現(xiàn)。
2.2.2 流處理
對(duì)于Flink而言,流才是整個(gè)處理邏輯的底層核心,所以流批一體之后的DataStreamAPI更加強(qiáng)大,可以直接處理批處理和流處理的所有場(chǎng)景。
下面我們就針對(duì)不同類型的的輸入數(shù)據(jù)源,用具體的代碼來實(shí)現(xiàn)流處理。
1、讀取文件(有界流)
我們同樣試圖讀取文檔words.txt中的數(shù)據(jù),并統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的頻次。整體思路與之前的批處理非常類似,代碼模式也基本一致。
在com.zhm.wordcount包下新建一個(gè)Demo02_BoundedStreamProcess類
package com.zhm.wordcount;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName Demo02_BoundedStreamProcess
* @Description 有界流
* @Author Zouhuiming
* @Date 2023/9/3 10:26
* @Version 1.0
*/
public class Demo02_BoundedStreamProcess {
public static void main(String[] args) throws Exception {
//1、創(chuàng)建支持Flink計(jì)算的環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.1 設(shè)置一個(gè)線程處理這個(gè)流(默認(rèn)是根據(jù)你的cpu數(shù)和單詞種類個(gè)數(shù),取最小值)
// env.setParallelism(1);
//2、獲取數(shù)據(jù)源
FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/words.txt")).build();
//3、利用環(huán)境將數(shù)據(jù)源的數(shù)據(jù)封裝為計(jì)算模型
DataStreamSource<String> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "myfile");
//4、調(diào)用API對(duì)數(shù)據(jù)進(jìn)行計(jì)算
//4.1 將每行數(shù)據(jù)按照給定的分割符拆分為Tuple2類型的數(shù)據(jù)模型(word,1)
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(new Tuple2<>(s1,1));
}
}
//4.2 根據(jù)word分組
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
//4.3 根據(jù)分組之后,按照元組中的第二列聚相加
}).sum(1)
// 4.4 打印結(jié)果
.print();
//5、提交job
env.execute();
}
}
運(yùn)行結(jié)果:
和批處理程序BatchWordCount的不同:
- 創(chuàng)建執(zhí)行環(huán)境的不同,流處理程序使用的是StreamExecutionEnvironment。
- 轉(zhuǎn)換處理之后,得到的數(shù)據(jù)對(duì)象類型不同
- 分組操作調(diào)用的方法是keyBy方法,可以傳入一個(gè)匿名函數(shù)作為鍵選擇器(KeySelector),指定當(dāng)前分組的key是什么。
- 代碼末尾需要調(diào)用env的execute方法,開始執(zhí)行任務(wù)。
2、讀取Socket文本流(無界流)
在實(shí)際的生產(chǎn)環(huán)境中,真正的數(shù)據(jù)流其實(shí)是無界的,有開始卻沒有結(jié)束,這就要求我們需要持續(xù)的處理捕獲的數(shù)據(jù)。為了模擬這種場(chǎng)景,可以監(jiān)聽Socket端口,然后向該端口不斷地發(fā)生數(shù)據(jù)。
(1)將StreamWordCount代碼中讀取文件數(shù)據(jù)的readTextFile方法,替換成讀取Socket文本流的方法socketTextStream。具體代碼實(shí)現(xiàn)如下:
package com.zhm.wordcount;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @ClassName Demo03_UnBoundedStreamProcess
* @Description 無界流
* @Author Zouhuiming
* @Date 2023/9/3 10:39
* @Version 1.0
*/
public class Demo03_UnBoundedStreamProcess {
public static void main(String[] args) throws Exception {
//1、創(chuàng)建支持Flink計(jì)算的環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.1 設(shè)置一個(gè)線程處理這個(gè)流
env.setParallelism(1);
//2、獲取數(shù)據(jù)源
DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);
//3.1 將每行數(shù)據(jù)按照給定的分割符拆分為Tuple2類型的數(shù)據(jù)模型(word,1)
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(new Tuple2<>(s1,1));
}
}
//3.2 根據(jù)word分組
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
//3.3 根據(jù)分組之后,按照元組中的第二列聚相加
}).sum(1)
// 3.4 打印結(jié)果
.print();
//4、提交job
env.execute();
}
}
(2)在Linux環(huán)境的主機(jī)hadoop102上,執(zhí)行下列命令,發(fā)送數(shù)據(jù)進(jìn)行測(cè)試(前提是要安裝netcat)
nc -lk hadoop102 9999
(3)啟動(dòng)Demo03_UnBoundedStreamProcess程序
我們會(huì)發(fā)現(xiàn)程序啟動(dòng)之后沒有任何輸出、也不會(huì)退出。這是正常的,因?yàn)镕link的流處理是事件驅(qū)動(dòng)的,當(dāng)前程序會(huì)一直處于監(jiān)聽狀態(tài),只有接受數(shù)據(jù)才會(huì)執(zhí)行任務(wù)、輸出統(tǒng)計(jì)結(jié)果。
(4)從hadoop102發(fā)送數(shù)據(jù)
(5)觀察idea控制臺(tái)
說明:Flink還具有一個(gè)類型提前系統(tǒng),可以分析函數(shù)的輸入和返回類型,自動(dòng)獲取類型信息,從而獲得對(duì)應(yīng)的序列化器和反序列化器。但是,由于java中泛型擦除的存在,在某些特殊情況下(比如Lambda表達(dá)式中),自動(dòng)提取的信息是不夠精細(xì)的–只告訴Flink當(dāng)前的元素由“船頭、船身、船尾”構(gòu)成,根本無法重建出“大船”的模樣;這時(shí)就需要顯示地提供類型信息,才能使得應(yīng)用程序正常工作或提高其性能。
因?yàn)閷?duì)于flatMap里傳入的Lambda表達(dá)式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無法得到Tuple<String,Long>。只有顯示地告訴系統(tǒng)當(dāng)前的返回類型,才能正確的解析出完整數(shù)據(jù)。
2.2.3 執(zhí)行模式
從Flink 1.12開始,官方推薦的做法是直接使用DataStream API,在提交任務(wù)時(shí)通過將執(zhí)行模式設(shè)為BATCH來進(jìn)行批處理。不建議使用DataSet API。
// 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamAPI執(zhí)行模式包括:流執(zhí)行模式、批執(zhí)行模式和自動(dòng)模式。
- 流執(zhí)行模式(Streaming)
這是DataStreamAPI最經(jīng)典的模式,一邊用于需要持續(xù)實(shí)時(shí)處理的無界數(shù)據(jù)流。默認(rèn)情況下,程序使用的就是Streaming執(zhí)行模式。 - 批執(zhí)行模式(Batch)
專門用于批處理的執(zhí)行模式 - 自動(dòng)模式
在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界來自動(dòng)選擇執(zhí)行模式。
批執(zhí)行模式的使用:主要有兩種方式:
(1)通過命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作業(yè)時(shí),增加execution.runtime-mode參數(shù),指定值為BATCH。
(2)通過代碼設(shè)置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代碼中,直接基于執(zhí)行環(huán)境調(diào)用setRuntimeMode方法,傳入BATCH模式。
實(shí)際應(yīng)用中一般不會(huì)在代碼中配置,而是使用命令行,這樣更加靈活。
2.2.4 本地WebUI
在Idea本地運(yùn)行程序時(shí),可以通過添加本地WebUI依賴,使用WebUI界面查看Job的運(yùn)行情況。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
添加后,在代碼中可以指定綁定的端口:文章來源:http://www.zghlxwxcb.cn/news/detail-707160.html
Configuration conf = new Configuration();
conf.setInteger("rest.port", 3333);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
之后,在程序啟動(dòng)后,打開本地瀏覽器,訪問localhost:3333即可查看job的運(yùn)行情況。文章來源地址http://www.zghlxwxcb.cn/news/detail-707160.html
到了這里,關(guān)于Flink---1、概述、快速上手的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!