準(zhǔn)備工作
本文簡(jiǎn)述Flink
在Linux
中安裝步驟,和示例程序的運(yùn)行。需要安裝JDK1.8
及以上版本。
下載地址:下載Flink
的二進(jìn)制包
點(diǎn)進(jìn)去后,選擇如下鏈接:
解壓flink-1.10.1-bin-scala_2.12.tgz
,我這里解壓到soft
目錄
[root@hadoop1 softpackage]# tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C ../soft/
單節(jié)點(diǎn)安裝
解壓后進(jìn)入Flink
的bin
目錄執(zhí)行如下腳本即可
[root@hadoop1 bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop1.
進(jìn)入Flink
頁(yè)面看看,如果沒(méi)有修改配置中的端口,默認(rèn)是8081
## 集群安裝
集群安裝分為以下幾步:(注意:hadoopx
都是我配置了/etc/hosts
域名的)bin
【1】將hadoop1
中解壓的Flink
分發(fā)到其他機(jī)器上,同時(shí)我也配置了免密登錄SSH
(也可以手動(dòng)復(fù)制low
)。
[root@hadoop1 soft]# xsync flink-1.10.1
執(zhí)行完后,我們就可以在hadoop2
和hadoop3
中看到flink
【2】選擇hadoop1
作為master
節(jié)點(diǎn),然后修改所有機(jī)器conf/flink-conf.yaml
(修改hadoop1
分發(fā)即可)jobmanager.rpc.address
密鑰以指向您的主節(jié)點(diǎn)。您還應(yīng)該通過(guò)設(shè)置jobmanager.heap.size和taskmanager.memory.process.size
鍵來(lái)定義允許Flink
在每個(gè)節(jié)點(diǎn)上分配的最大主內(nèi)存量。這些值以MB
為單位。如果某些工作節(jié)點(diǎn)有更多的主內(nèi)存要分配給Flink
系統(tǒng),則可以通過(guò)在這些特定節(jié)點(diǎn)上設(shè)置 taskmanager.memory.process.size或taskmanager.memory.flink.size
在conf / flink-conf.yaml
中覆蓋默認(rèn)值。
jobmanager.rpc.address = master主機(jī)名
【3】修改master
的conf/slaves
提供集群中所有節(jié)點(diǎn)的列表,這些列表將用作工作節(jié)點(diǎn)。我的是hadoop2
和hadoop3
。類似于HDFS
配置,編輯文件conf / slaves
并輸入每個(gè)輔助節(jié)點(diǎn)的IP
/主機(jī)名。每個(gè)工作節(jié)點(diǎn)稍后都將運(yùn)行TaskManager
。
hadoop2
hadoop3
以上示例說(shuō)明了具有三個(gè)節(jié)點(diǎn)(主機(jī)名hadoop1
作為master
,hadoop2
和hadoop3
作為worker
)的設(shè)置,并顯示了配置文件的內(nèi)容。Flink
目錄必須在同一路徑下的每個(gè)工作線程上都可用。您可以使用共享的NFS
(網(wǎng)絡(luò)文件系統(tǒng))目錄,也可以將整個(gè)Flink
目錄復(fù)制到每個(gè)工作節(jié)點(diǎn)。特別是:
1、每個(gè)JobManager
的可用內(nèi)存量jobmanager.heap.size
;
2、每個(gè)TaskManager
的可用內(nèi)存量(taskmanager.memory.process.size
并查看內(nèi)存設(shè)置指南);
3、每臺(tái)計(jì)算機(jī)可用的CPU
數(shù)(taskmanager.numberOfTaskSlots
);
4、集群中的CPU
總數(shù)(parallelism.default
);
5、臨時(shí)目錄(io.tmp.dirs
);
【4】在master
上啟動(dòng)集群(第一行)以及執(zhí)行結(jié)果。下面的腳本在本地節(jié)點(diǎn)上啟動(dòng)JobManager
,并通過(guò)SSH
連接到slaves
文件中列出的所有輔助節(jié)點(diǎn),以在每個(gè)節(jié)點(diǎn)上啟動(dòng)TaskManager
?,F(xiàn)在,您的 Flink系統(tǒng)已啟動(dòng)并正在運(yùn)行。現(xiàn)在,在本地節(jié)點(diǎn)上運(yùn)行的JobManager
將在配置的RPC
端口上接受作業(yè)。要停止Flink
,還有一個(gè)stop-cluster.sh
腳本。
[root@hadoop1 flink-1.10.1]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.
【5】Flink
界面展示 :進(jìn)入8081
端口,例如:http://hadoop1:8081/
或者通過(guò)jps
命令查看服務(wù)也可行。Standalone
集群架構(gòu)展示:client
客戶端提交任務(wù)給JobManager
,JobManager
負(fù)責(zé)Flink
集群計(jì)算資源管理,并分發(fā)任務(wù)給TaskManager
執(zhí)行,TaskManager
定期向JobManager
匯報(bào)狀態(tài)。
運(yùn)行 flink示例程序
批處理示例:提交Flink
的批處理examples
程序:也可以在頁(yè)面中進(jìn)行提交,但是作為一名NB
的程序員就使用命令
[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
執(zhí)行上面的命令后,就會(huì)顯示如下信息,這是Flink
提供的examples
下的批處理例子程序,統(tǒng)計(jì)單詞個(gè)數(shù)。
[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 99f4c579947a66884ec269ddf5f5b0ed
Program execution finished
Job with JobID 99f4c579947a66884ec269ddf5f5b0ed has finished.
Job Runtime: 795 ms
Accumulator Results:
- b70332353f355cf0464b0eba21f61075 (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
......
得到結(jié)果,這里統(tǒng)計(jì)的是默認(rèn)的數(shù)據(jù)集,可以通過(guò)--input --output
指定輸入輸出。我們可以在頁(yè)面中查看運(yùn)行的情況:流處理示例:?jiǎn)?dòng)
nc
服務(wù)器:
[root@hadoop1 flink-1.10.1]# nc -lk 9000
提交Flink
的批處理examples
程序:
[root@hadoop1 flink-1.10.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname hadoop1 --port 9000
這是Flink
提供的examples
下的流處理例子程序,接收socket
數(shù)據(jù)傳入,統(tǒng)計(jì)單詞個(gè)數(shù)。在nc
端隨意寫入單詞
[root@hadoop1 flink-1.10.1]# nc -lk 9000
g
s
進(jìn)入slave
節(jié)點(diǎn)(hadoop2
,hadoop3
),進(jìn)入Flink
安裝目錄輸入如下命令,查看實(shí)時(shí)數(shù)據(jù)變化
[root@hadoop2 flink-1.10.1]# tail -f log/flink-*-taskexecutor-*.out
s : 1
: 2
w : 1
d : 1
g : 1
d : 1
停止Flink
[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh
在Flink
的web
中查看運(yùn)行的job
將 JobManager / TaskManager 實(shí)例添加到集群(擴(kuò)展)
您可以使用bin/jobmanager.sh
和bin/taskmanager.sh
腳本將JobManager
和TaskManager
實(shí)例添加到正在運(yùn)行的集群中。添加JobManager
(確保在要啟動(dòng)/停止相應(yīng)實(shí)例的主機(jī)上調(diào)用這些腳本)
[root@hadoop1 flink-1.10.1]# bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加任務(wù)管理器
[root@hadoop1 flink-1.10.1]# bin/taskmanager.sh start|start-foreground|stop|stop-all
YARN模式
在企業(yè)中,經(jīng)常需要將Flink
集群部署到YARN
,因?yàn)榭梢允褂?code>YARN來(lái)管理所有計(jì)算資源。而且Spark
程序也可以部署到YARN
上。CliFrontend
是所有job
的入口類,通過(guò)解析傳遞的參數(shù)(jar
包,mainClass
等),讀取flink
的環(huán)境,配置信息等,封裝成PackagedProgram
,最終通過(guò)ClusterClient
提交給Flink
集群。Flink
運(yùn)行在YARN
上,提供了兩種方式:
第一種使用yarn-session
模式來(lái)快速提交作業(yè)到YARN
集群。如下,在Yarn
中初始化一個(gè)flink
集群,開辟指定的資源,以后提交任務(wù)都向這里提交,這個(gè)flink
集群會(huì)常駐在Yarn
集群中,除非手動(dòng)停止。共享Dispatcher
與ResourceManager
,共享資源。有大量的小作業(yè),適合使用這種方式;YarnSessionClusterEntrypoint
是Flink
在Yarn
上的線程。ApplicationMaster
是JobManager
。YarnTaskExecutorRunner
負(fù)責(zé)接收subTask
并運(yùn)行,是TaskManager
。
【1】修改Hadoop
的etc/hadoop/yarn-site.xml
,添加該配置表示內(nèi)存超過(guò)分配值,是否將任務(wù)殺掉。默認(rèn)為true
。運(yùn)行Flink
程序,很容易超過(guò)分配的內(nèi)存。
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
【2】 添加環(huán)境變量
//查看是否配置HADOOP_CONF_DIR,我這里沒(méi)有配置輸出為空
[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR
//在系統(tǒng)變量中添加 HADOOP_CONF_DIR
[root@hadoop1 hadoop-2.7.2]# vim /etc/profile
//添加如下內(nèi)容,wq保存退出
export HADOOP_CONF_DIR=$HADOOP_HOME/conf/
//刷新 /etc/profile
[root@hadoop1 hadoop-2.7.2]# source /etc/profile
//重新查看是否配置HADOOP_CONF_DIR
[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR
/opt/module/hadoop-2.7.2/conf/
【3】啟動(dòng)HDFS
、YARN
集群。通過(guò)jps
查看啟動(dòng)狀況。關(guān)閉flink
的其他集群。
[root@hadoop1 hadoop-2.7.2]# sbin/start-all.sh
[root@hadoop2 hadoop-2.7.2]# jps
10642 NodeManager
11093 Jps
10838 ResourceManager
10535 DataNode
10168 TaskManagerRunner
【4】將官方指定Pre-bundled Hadoop 2.7.5包放到flink
的lib
目錄下。使用yarn-session
模式提交作業(yè)
使用Flink
中的yarn-session
(yarn
客戶端),會(huì)啟動(dòng)兩個(gè)必要服務(wù)JobManager
和TaskManagers
;
客戶端通過(guò)yarn-session
提交作業(yè);yarn-session
會(huì)一直啟動(dòng),不停地接收客戶端提交的作用。
-n 表示申請(qǐng)2個(gè)容器
-s 表示每個(gè)容器啟動(dòng)多少個(gè)slot
-tm 表示每個(gè)TaskManager申請(qǐng)800M內(nèi)存
-nm yarn 的 appName,
-d detached表示以后臺(tái)程序方式運(yùn)行
如下表示啟動(dòng)一個(gè)yarn session
集群,每個(gè)JM
為1G
,TM
的內(nèi)存是1G
。
[root@hadoop1 flink-1.10.1]# bin/yarn-session.sh -n 2 -jm 1024m -tm 1024m -d
客戶端默認(rèn)是attach
模式,不會(huì)退出 ??梢?code>ctrl+c退出,然后再通過(guò)如下命令連上來(lái)。或者啟動(dòng)的時(shí)候用-d
則為detached
模式
./bin/yarn-session.sh -id application_1594027553009_0001(這個(gè)id來(lái)自下面hadoop集群)
Yarn
上顯示為Flink session cluster
,一致處于運(yùn)行狀態(tài)。點(diǎn)擊
ApplicationMaster
就會(huì)進(jìn)入Flink
集群啟動(dòng)命令行中也會(huì)顯示如下的
JobManager
啟動(dòng)的Web
界面
JobManager Web Interface: http://hadoop1:34431
然后我們可以通過(guò)jps
來(lái)看下當(dāng)前的進(jìn)程,其中YarnSessionClusterEntrypoint
就是我們Yarn Session
的分布式集群。
[root@hadoop1 flink-1.10.1]# jps
69923 NodeManager
81267 Jps
69394 NameNode
69531 DataNode
80571 FlinkYarnSessionCli
80765 YarnSessionClusterEntrypoint
/tmp
下生成了一個(gè)文件
將Flink
應(yīng)用部署到Flink On Yarn 之 session
方式中。
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/WordCount.jar
查看運(yùn)行結(jié)果:Flink On Yarn
之session
部署方式集群停止:關(guān)閉Yarn
就會(huì)關(guān)閉Flink
集群。。。
第二種模式:使用Per-JOBYarn
分離模式(與當(dāng)前客戶端無(wú)關(guān),當(dāng)客戶端提交完任務(wù)就結(jié)束,不用等到Flink
應(yīng)用執(zhí)行完畢)提交作業(yè):每次提交都會(huì)創(chuàng)建一個(gè)新的flink
集群,任務(wù)之間相互獨(dú)立,互不影響,方便管理。任務(wù)執(zhí)行完成之后創(chuàng)建的集群也會(huì)消失。 直接提交任務(wù)給YARN
,獨(dú)享Dispatcher
與ResourceManager
。按需要申請(qǐng)資源。適合執(zhí)行時(shí)間較長(zhǎng)的大作業(yè)。AM
啟動(dòng)類是YarnJobClusterEntrypoint
。YarnTaskExecutorRunner
負(fù)責(zé)接收subTask
,就是TaskManager
。需要打開hadoop
和yarn
分布式集群。不需要啟動(dòng)flink
分布式集群,它會(huì)自動(dòng)啟動(dòng)flink
分布式集群。
[root@hadoop1 flink-1.10.1]# bin/flink run -m yarn-cluster -d ./examples/streaming/WordCount.jar
2020-07-13 03:21:50,479 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2020-07-13 03:21:50,479 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2020-07-13 03:21:50,707 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at hadoop2/192.168.52.129:8032
2020-07-13 03:21:50,791 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-07-13 03:21:50,928 WARN org.apache.flink.yarn.YarnClusterDescriptor - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2020-07-13 03:21:51,001 INFO org.apache.flink.yarn.YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2020-07-13 03:21:53,906 INFO org.apache.flink.yarn.YarnClusterDescriptor
-yn
:yarncontainer
表示TaskManager
的個(gè)數(shù);-yqu
:yarnqueue
指定yarn
的隊(duì)列;-ys
:yarnslots
每一個(gè)TaskManager
對(duì)應(yīng)的slot
個(gè)數(shù);文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-753655.html
上傳成功之后,我們可以在Hadoop
的圖形化界面:http://hadoop2:8088/cluster/apps 中看到當(dāng)前任務(wù)的信息;文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-753655.html
到了這里,關(guān)于Flink 本地單機(jī)/Standalone集群/YARN模式集群搭建的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!