提示:文章寫完后,目錄可以自動(dòng)生成,如何生成可參考右邊的幫助文檔
前言
創(chuàng)建一個(gè)本地運(yùn)行環(huán)境是提高開發(fā)效率和便捷進(jìn)行代碼調(diào)試的關(guān)鍵。我們將一起構(gòu)建一個(gè)本地環(huán)境,專門用于執(zhí)行Flink任務(wù)。
此環(huán)境以Flink 1.17.2版本為基礎(chǔ),采用Docker技術(shù)搭建而成。通過這種方式,我們能夠模擬出一個(gè)接近實(shí)際生產(chǎn)環(huán)境的運(yùn)行條件,幫助我們更好地測試和優(yōu)化我們的Flink應(yīng)用。
代碼鏈接: flink_study_notes
一、環(huán)境搭建
選擇flink版本
選擇自己實(shí)際應(yīng)用的flink版本,本次我選擇1.17.2flink版本作為演示版本。
鏈接: Docker-Hub-flink-1.17.2
鏡像說明::
- flink 1.13.0
- flink 內(nèi)置 :scala 版本 2.12,Java 版本 8
配置文件準(zhǔn)備
獲取配置文件
為了輕松獲得完整的配置設(shè)置,建議首先啟動(dòng)鏡像,然后將內(nèi)置的配置文件復(fù)制到外部。這一步驟將簡化未來對配置的修改以及環(huán)境的快速部署。通過預(yù)先準(zhǔn)備好配置文件,你可以避免從零開始的重復(fù)工作,并能夠快速地調(diào)整和應(yīng)用新的配置參數(shù)。
生成配置文件
為環(huán)境創(chuàng)建一個(gè)獨(dú)立的網(wǎng)絡(luò)1
使用Docker網(wǎng)絡(luò)可以讓你將多個(gè)容器連接在一起,允許容器間相互通信并組成一個(gè)隔離的網(wǎng)絡(luò)環(huán)境,類似于在宿主機(jī)上創(chuàng)建一個(gè)虛擬子網(wǎng)。
docker network create flink-network
創(chuàng)建JobManager
docker run \
-itd \
--name=jobmanager \
--publish 8081:8081 \
--network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:1.17.2-scala_2.12-java8 jobmanager
創(chuàng)建 TaskManager
docker run \
-itd \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:1.17.2-scala_2.12-java8 taskmanager
運(yùn)行結(jié)果
命令與參數(shù)解析
這條命令在flink-network網(wǎng)絡(luò)下以后臺模式運(yùn)行一個(gè)新的Flink
TaskManager容器,容器名稱為taskmanager,并將作業(yè)管理器的RPC地址設(shè)置為同網(wǎng)絡(luò)中的jobmanager容器。
- –network flink-network: 將容器連接到預(yù)先創(chuàng)建的網(wǎng)絡(luò)flink-network,容器將使用該網(wǎng)絡(luò)的配置與其他容器通信。
-
–env FLINK_PROPERTIES=“jobmanager.rpc.address: jobmanager”: 設(shè)置環(huán)境變量FLINK_PROPERTIES。該變量定義了Apache
Flink的配置,jobmanager.rpc.address是指明連接到的JobManager的地址,在這里被設(shè)置為容器名稱jobmanager,表示TaskManager將會(huì)連接到同一Docker網(wǎng)絡(luò)內(nèi)名為jobmanager的容器。
文件拷貝至本地
#本地創(chuàng)建目錄
mkdir -p ~/app/flink/
#進(jìn)入目錄
cd ~/app/flink/
# jobmanager 容器
docker cp jobmanager:/opt/flink/conf ./JobManager/
# taskmanager 容器
docker cp taskmanager:/opt/flink/conf ./TaskManager/
命令說明:
該命令會(huì)將名為jobmanager|taskmanager的容器的/opt/flink/conf目錄中的內(nèi)容復(fù)制到當(dāng)前工作目錄下的JobManager|TaskManager文件夾中。這樣做的目的是為了方便修改Flink的配置文件。
刪除容器
docker rm -f taskmanager
docker rm -f jobmanager
修改配置
#修改 JobManager/flink-conf.yaml web 端口號為 18081
rest.port: 18081
#修改 TaskManager/flink-conf.yaml 容器任務(wù)槽為 5
taskmanager.numberOfTaskSlots: 5
重新掛載并創(chuàng)建容器
啟動(dòng) jobmanager
docker run \
-itd \
-v ~/app/flink/JobManager/:/opt/flink/conf/ \
--name=jobmanager \
--publish 18081:18081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
--network flink-network flink:1.17.2-scala_2.12-java8 jobmanager
啟動(dòng) taskmanager
docker run \
-itd \
-v ~/app/flink/TaskManager/:/opt/flink/conf/ \
--name=taskmanager --network flink-network \
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
flink:1.17.2-scala_2.12-java8 taskmanager
容器啟動(dòng)成功
鏈接: 訪問地址
二、Flink example
官網(wǎng)地址: 項(xiàng)目配置
創(chuàng)建項(xiàng)目腳手架
maven命令:根據(jù)自己實(shí)際情況修改
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.17.2 \
-DgroupId=cn.demo \
-DartifactId=flink_study_notes \
-Dversion=0.1 \
-Dpackage=cn.demo \
-DinteractiveMode=false
命令解釋:
這個(gè)命令的功能是創(chuàng)建一個(gè)新的Maven項(xiàng)目,項(xiàng)目類型是Apache Flink的Java快速開始項(xiàng)目,項(xiàng)目的groupId是cn.demo,artifactId是flink_study_notes,版本號是0.1,最終項(xiàng)目的包路徑也是cn.demo。并且在指定這些參數(shù)后,禁用了交互,所以該命令可以自動(dòng)完成所有操作無需任何用戶交互。
打開工程
provided: 項(xiàng)目中provided作為默認(rèn)參數(shù),可以讓你更加精確地管理你的依賴關(guān)系,在你的項(xiàng)目環(huán)境中提供所需的資源,而在實(shí)際運(yùn)行環(huán)境中,則由運(yùn)行平臺或者容器來提供。但是在本地ide中運(yùn)行時(shí)會(huì)有如下錯(cuò)誤:
為了解決這個(gè)問題,我們需要在ide appliation運(yùn)行配置中進(jìn)行如下設(shè)置:
批處理示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataBatchJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.fromElements("world count", "hello world", "hello flink", "flink", "hello",
"hello world", "hello flink", "flink", "hello", "world");
SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
out.collect(Tuple2.of(s, 1));
}
}
});
streamOperator.keyBy(value -> value.f0).sum(1).print();
env.execute("count the number of times a word appears");
}
}
流處理示例
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStreamSource<String> socketDS = env.socketTextStream("127.0.0.1", 7777);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.sum(1);
sum.print();
env.execute();
}
}
運(yùn)行結(jié)果:
jar包上傳flink集群運(yùn)行
方式一:界面提交
方式二: 命令提交
#提交任務(wù)
flink run -m 127.0.0.1:18081 -c cn.demo.DataBatchJob -p 2 flink_study_notes-0.1.jar
#取消任務(wù)
flink cancle <JobID>
界面效果
文章來源:http://www.zghlxwxcb.cn/news/detail-827962.html
-
當(dāng)你創(chuàng)建了一個(gè)如
flink-network
的自定義網(wǎng)絡(luò)后,你可以在啟動(dòng)Docker容器時(shí)使用--network
標(biāo)志將容器附加到這個(gè)網(wǎng)絡(luò)上,例如docker run --network flink-network your-image
。這使得容器能夠以更細(xì)粒度的網(wǎng)絡(luò)設(shè)置進(jìn)行通信,并且比默認(rèn)的橋接網(wǎng)絡(luò)提供更好的安全性和靈活性。 ??文章來源地址http://www.zghlxwxcb.cn/news/detail-827962.html
到了這里,關(guān)于【Flink】基于Docker下的Flink運(yùn)行環(huán)境搭建(Mac)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!