1.ApacheStreamPark是什么?
ApacheStreamPark是流處理極速開發(fā)框架,流批一體 & 湖倉一體的云原生平臺,一站式流處理計算平臺。
2.介紹
2.1 特性
??特性中的簡單易用和文檔詳盡這兩點我也是深有體會的,部署一點都不簡單,照著官方文檔都不一定能搞出來,下面部署環(huán)節(jié)慢慢來吐槽吧。
2.2 架構(gòu)
2.3 Zeppelin和StreamPark的對比
??之前我們寫 Flink SQL 基本上都是使用 Java 包裝 SQL,打 jar 包,提交到 S3 平臺上。通過命令行方式提交代碼,但這種方式始終不友好,流程繁瑣,開發(fā)和運維成本太大。我們希望能夠進一步簡化流程,將 Flink TableEnvironment 抽象出來,有平臺負責初始化、打包運行 Flink 任務(wù),實現(xiàn) Flink 應(yīng)用程序的構(gòu)建、測試和部署自動化。
??這是個開源興起的時代,我們自然而然的將目光投向開源領(lǐng)域中:在一眾開源項目中,經(jīng)過對比各個項目綜合評估發(fā)現(xiàn) Zeppelin 和 StreamPark 這兩個項目對 Flink 的支持較為完善,都宣稱支持 Flink on K8s ,最終進入到我們的目標選擇范圍中,以下是兩者在 K8s 相關(guān)支持的簡單比較。
功能 | Zeppelin | StreamPark |
---|---|---|
任務(wù)狀態(tài)監(jiān)控 | 稍低 ,不能作為任務(wù)狀態(tài)監(jiān)控工具 | 較高 |
任務(wù)資源管理 | 無 | 有 ,但目前版本還不是很健全 |
本地化部署 | 稍低 ,on K8s 模式只能將 Zeppelin 部署在 K8s 中,否則就需要打通 Pod 和外部網(wǎng)絡(luò),但是這在生產(chǎn)環(huán)境中很少這樣做的 | 可以本地化部署 |
多語言支持 | 較高 ,支持 Python/Scala/Java 多語言 | 一般 ,目前 K8s 模式和 YARN 模式同時支持 FlinkSQL,并可以根據(jù)自身需求,使用 Java/Scala 開發(fā) DataStream |
Flink WebUI 代理 | 目前還支持的不是很完整 ,主開發(fā)大佬目前是考慮整合 Ingress | 較好 ,目前支持 ClusterIp/NodePort/LoadBalance 模式 |
學習成本 | 成本較低 ,需要增加額外的參數(shù)學習,這個和原生的 FlinkSQL 在參數(shù)上有點區(qū)別 | 無成本 ,K8s 模式下 FlinkSQL 為原生支持的 SQL 格式;同時支持 Custome-Code(用戶編寫代碼開發(fā)Datastream/FlinkSQL 任務(wù)) |
Flink 多版本支持 | 支持 | 支持 |
Flink 原生鏡像侵入 | 有侵入 ,需要在 Flink 鏡像中提前部署 jar 包,會同 JobManager 啟動在同一個 Pod 中,和 zeppelin-server 通信 | 無侵入 ,但是會產(chǎn)生較多鏡像,需要定時清理 |
代碼多版本管理 | 支持 | 支持 |
3.相關(guān)連接
ApacheStreamPark官方文檔
https://streampark.apache.org/zh-CN/
flink1.14.4官網(wǎng)
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh
streampark2.1.0的gitHub地址
https://github.com/apache/incubator-streampark/tree/release-2.1.0
本地調(diào)試啟動、編譯指南
https://z87p7jn1yv.feishu.cn/docx/X4UfdZ8cdoeK8ExQ7sUc1UHknps
多業(yè)務(wù)聚合查詢設(shè)計思路與實踐
https://mp.weixin.qq.com/s/N1TqaLaqGCDRH9jnmhvlzg
4.部署
??官方提供的在源碼文件的docker-compose.yam里面的鏡像是apache/streampark:latest,但是這個鏡像根本用不了,之前用這個和官方提供的那幾個鏡像2.1.0和2.1.0,這兩個鏡像版本可以在dockerHub的官網(wǎng)上搜索到,為啥用不了呢?因為我在部署的時候用的最新的鏡像,然后將源碼包中的腳本文件拉下來在本地數(shù)據(jù)庫里面streampark庫里面執(zhí)行了,然后使用官網(wǎng)給的鏡像部署yaml后,發(fā)現(xiàn)容器一直在重啟,然后我就看了下容器的日志,發(fā)現(xiàn)有關(guān)于數(shù)據(jù)庫的表字段確實的報錯,然后我就很是好奇和納悶,就將確實的子段在表里面補全了,然后重啟后可以啟動起來,但是還是用不了,然后我就聯(lián)系到官方,才得知他們的最新的鏡像apache/streampark:latest里里面的jar包使用的是開發(fā)分支的開發(fā)版本,所以才會有用不了的問題,官方在源碼版本、鏡像版本和sql版本這方面做的對應(yīng)關(guān)系上還是做的不夠的,這個也是讓使用者很頭疼的一個問題,明明是按照官網(wǎng)的文檔來搞的,為啥都搞不通?所以說上面的特性中的易用性和文檔詳盡可以說是值得讓人吐槽了。
??那如何解決呢?
??給官方反饋了這個問題,但是官方建議使用源碼構(gòu)建部署,然后我突發(fā)奇想,我自己構(gòu)建一個二進制的源碼包,然后在構(gòu)建一個鏡像試一下看看給的行,于是乎就就進行了漫長的嘗試之路。
4.1 二進制包編譯構(gòu)建
??編譯構(gòu)建二進制可執(zhí)行包,使用自己構(gòu)建的二進制包構(gòu)建Docker鏡像,需要準備一臺Linux的服務(wù)或者是虛擬機,可以正常上網(wǎng)即可,在該臺機子上需要事先安裝Git(拉取源碼文件),Maven和java環(huán)境(JDK1.8),我采用的是是上傳的源碼包:incubator-streampark-2.1.0.tar.gz,然后解壓源碼包:
tar -zxvf incubator-streampark-2.1.0.tar.gz
??解壓到服務(wù)器上,然后進入到解壓路徑里面:
??執(zhí)行:
./build.sh
??編譯構(gòu)建會去下載很多的pom依賴,所以需要經(jīng)過漫長的等待,如果你的網(wǎng)絡(luò)速度夠快的話,估計也挺快的,然后編譯構(gòu)建完成后會在當前目錄下看到一個dist的目錄,里面就生成了一個二進制的可執(zhí)行部署的源碼包了:apache-streampark_2.12-2.1.0-incubating-bin.tar.gz,這里源碼編譯構(gòu)建就構(gòu)建好了,下面構(gòu)建鏡像需要用到這個包。
4.2 鏡像構(gòu)建
??需要將Dockerfile文件和apache-streampark_2.12-2.1.0-incubating-bin.tar.gz放在同一個路徑下(目錄下)然后執(zhí)行構(gòu)建命令
??Dockerfile文件
FROM alpine:3.16 as deps-stage
COPY . /
WORKDIR /
RUN tar zxvf apache-streampark_2.12-2.1.0-incubating-bin.tar.gz \
&& mv apache-streampark_2.12-2.1.0-incubating-bin streampark
FROM docker:dind
WORKDIR /streampark
COPY --from=deps-stage /streampark /streampark
ENV NODE_VERSION=16.1.0
ENV NPM_VERSION=7.11.2
RUN apk add openjdk8 ; \ # 這里會報錯,在windows環(huán)境用;在linux上使用&&
apk add maven ; \
apk add wget ; \
apk add vim ; \
apk add bash; \
apk add curl
ENV JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk
ENV MAVEN_HOME=/usr/share/java/maven-3
ENV PATH $JAVA_HOME/bin:$PATH
ENV PATH $MAVEN_HOME/bin:$PATH
RUN wget "https://nodejs.org/dist/v$NODE_VERSION/node-v$NODE_VERSION-linux-x64.tar.gz" \
&& tar zxvf "node-v$NODE_VERSION-linux-x64.tar.gz" -C /usr/local --strip-components=1 \
&& rm "node-v$NODE_VERSION-linux-x64.tar.gz" \
&& ln -s /usr/local/bin/node /usr/local/bin/nodejs \
&& curl -LO https://dl.k8s.io/release/v1.23.0/bin/linux/amd64/kubectl \
&& install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
RUN mkdir -p ~/.kube
EXPOSE 10000
??構(gòu)建命令:
docker build -f Dockerfile -t my_streampark:2.1.0 .
#推送阿里云鏡像倉庫(略)
??這里給大家提供了我自己構(gòu)建的鏡像如下:
registry.cn-hangzhou.aliyuncs.com/bigfei/zlf:streampark2.1.0
4.3 初始化sql
??執(zhí)行的過程會碰到兩個錯誤:
-- 1.Unknown column !launch' in 't flink_app'
alter table "t flink_app'
-- drop index“inx state": 2.注釋這個一行
-- 這個是在2.1.0的版本里面的flink_app這個表里面缺少的字段和索引,可以或略,或者是在表里加上launch字段,不影響我我們下面部署2.1.0來使用這個庫里的sql數(shù)據(jù)的
??streampark庫如下:
??可以使用資料里面的:streampark.sql,是我執(zhí)行了官方的那個sql后將streampark庫導(dǎo)出來的一個腳本,用我給的這個也是沒有問題的。
4.4 部署
4.4.1 Docker-compose.yaml部署腳本
version: '2.1'
services:
streampark-console:
image: my_streampark:2.1.0
command: ${RUN_COMMAND}
ports:
- 10000:10000
env_file: .env
volumes:
- flink:/streampark/flink/${FLINK}
- /var/run/docker.sock:/var/run/docker.sock
- /etc/hosts:/etc/hosts:ro
- ~/.kube:/root/.kube:ro
privileged: true
restart: unless-stopped
jobmanager:
image: apache/flink:1.14.4-scala_2.12-java8
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
volumes:
- ./conf:/opt/flink/conf
- /tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
- /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: apache/flink:1.14.4-scala_2.12-java8
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"
volumes:
- ./conf:/opt/flink/conf
- /tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
- /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
flink:
??這個文件是我把flink的部署和streampark的部署合并修改了下,注意不要使用streampark官網(wǎng)的那種方式,搞了一個橋接的網(wǎng)絡(luò),否則有可能導(dǎo)致容器間的網(wǎng)絡(luò)不通。
4.4.2 配置文件準備
??deplay文件夾下:
??conf文件夾如下:
??需要修改.env和conf里面的application.yaml文件里面streampark數(shù)據(jù)庫相關(guān)的連接信息,這個application可以自己搞個目錄掛載到容器的如下路徑:
??把官方的那個拿出來改一改然后掛載,我這個好像是沒有生效的,
??相關(guān)資料會在文末分享的。
4.4.3 flink啟動配置
flink官網(wǎng)內(nèi)存配置
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/memory/mem_setup_tm/
4.4.4 streampark啟動配置
??flink-conf.yaml文件配置
jobmanager.rpc.address: jobmanager
blob.server.port: 6124
query.server.port: 6125
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory
heartbeat.interval: 1000
heartbeat.timeout: 5000
rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000
classloader.resolve-order: parent-first
taskmanager.memory.managed.fraction: 0.1
taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 7072m
4.4.5 遇到的問題
??由于我之前搞的flink部署有點問題,使用了橋接網(wǎng)絡(luò),導(dǎo)致直接使用flink的sql-client.sh執(zhí)行之前的cdc失敗了,報了如下的錯誤:
java.net,UnknownHostException: jobmanager: Temporary failure in name resolution
??然后我就把部署文件改成上面那種方式,后面把之前啟動的容器全部刪除,重新部署后就可以正常執(zhí)行了。
??之前還遇到一個錯誤就是在cdc實踐的時候會遇到的問題,streampark提交啟動了cdc任務(wù),但是flink的jobs里面這個任務(wù)執(zhí)行失敗了:
java.util.concurrent.CompletionException: java.util.concurrent.Completiotion: org.apache.flink.runtime.jobmanager.schedulerloResourceAvailableException: Could not acquire the minimurrequired resources.
??這個問題是之前flink采用橋接網(wǎng)絡(luò)搭建的有問題,導(dǎo)致jobmanager啟動不起來,使用上面正確的啟動方式和flink-conf.yaml里面的配置,對taskmanager和jobmanager的資源配置和內(nèi)存配置如下:
taskmanager.memory.managed.fraction: 0.1
taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 7072m
??請根據(jù)官網(wǎng)先關(guān)flink的內(nèi)存參數(shù)來設(shè)置,資源盡量給大點,然后把之前有問題的容器刪除重新啟動后,三個容器都正常啟動了。
5 cdc實踐
5.1 確定flink是否正常
??flink首頁正常啟動在沒有任務(wù)執(zhí)行的時候可以看到slot的數(shù)據(jù)量:
??正常啟動taskManagers里面可以看到task的信息:
??job-manager的信息:
5.2 streampark管理端配置
??streampark的默認的用戶名和密碼是:admin/streampark
5.2.1 flink-home配置
5.2.2 flink-cluster配置
5.2.3 新增cdc-sql和上傳jar或添加依賴
??flink的job-manager節(jié)點和task-manager節(jié)點的/opt/flink/lib節(jié)點下我都傳了上面那幾個jar包了,然后用這個streampark來管理你只要把你任務(wù)用到的jar的上或者是把jar的maven依賴填上去,然后任務(wù)在大包的時候會將這個這些依賴全部打包到任務(wù)的jar包中,最后提交給flink去執(zhí)行,這種是不是更加的方便快捷高效的管理任務(wù)了呢。
5.3 cdc執(zhí)行成功實例
??cdc相關(guān)的請看
??多業(yè)務(wù)聚合查詢設(shè)計思路與實踐
https://mp.weixin.qq.com/s/N1TqaLaqGCDRH9jnmhvlzg
??streampark端:
??streampark點擊開始啟任務(wù)的時候不選擇savepoint了,不然flink那邊會報錯的
??flink端:
??需要容器一直運行中,如果重啟后之前的savepoint和chackpoint就沒了,這個感覺是flink的savepoint和checkpoint的配置沒有生效,還得重新研究下,如果重啟了,沒有之前的任務(wù)了,需要在streampark啟動下flink這邊就又有了。
??發(fā)現(xiàn)一個問題就是:剛才我重新提交了,但是flink的jobmanager的時候報了這個savepoin持久化到/tmp/flink-checkpoints-directory/文件中失敗了,這個有點離譜了嘛:
2023-06-14 15:48:58 2023-06-14 07:48:58,551 WARN org.apache.flink.runtime.jobmaster.JobMaster [] - Error while processing AcknowledgeCheckpoint message
2023-06-14 15:48:58 org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
2023-06-14 15:48:58 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
2023-06-14 15:48:58 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
2023-06-14 15:48:58 at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
2023-06-14 15:48:58 Caused by: java.io.IOException: Mkdirs failed to create file:/tmp/flink-checkpoints-directory/acb95418d91e34f6cce478337154dd4f/chk-3
2023-06-14 15:48:58 at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1210) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58 ... 6 more
2023-06-14 15:49:01 2023-06-14 07:49:01,533 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 4 (type=CHECKPOINT) @ 1686728941531 for job acb95418d91e34f6cce478337154dd4f.
2023-06-14 15:49:01 2023-06-14 07:49:01,557 WARN org.apache.flink.runtime.jobmaster.JobMaster [] - Error while processing AcknowledgeCheckpoint message
2023-06-14 15:49:01 org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 4. Failure reason: Failure to finalize checkpoint.
2023-06-14 15:49:01 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01 at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
2023-06-14 15:49:01 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
2023-06-14 15:49:01 at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
2023-06-14 15:49:01 Caused by: java.io.IOException: Mkdirs failed to create file:/tmp/flink-checkpoints-directory/acb95418d91e34f6cce478337154dd4f/chk-4
??然后我將我wsl的/tmp路徑下的flink-checkpoints-directory、flink-savepoints-directory的權(quán)限重新修改下:
??后面我又使用如下命令給兩個文件夾下所有文件授權(quán):
[root@DESKTOP-QF29H8K tmp]# chmod -R 777 flink-savepoints-directory/
[root@DESKTOP-QF29H8K tmp]# chmod -R 777 flink-checkpoints-directory/
??上面兩種授權(quán)都試了下,但是還是報錯了,這個不曉得是不是一個bug,還是我的checkpoints、savepoints有配置的有問題,這個問題我已經(jīng)反饋給官方了,估計在Linux上就沒有這個問題了,在windows上確實是奇葩的問題太多了。
??這個問題我知道是啥問題了,是掛載的問題,如果是linux系統(tǒng)是沒有這個問題的,但是在windows上可以使用絕對路徑和相當路徑來掛載,那就跟wsl里面的文件路徑?jīng)]有關(guān)系了哈,然后修改部署文件docker-compose-windows.yaml 如下:
version: '2.1'
services:
streampark-console:
image: my_streampark:2.1.0
command: ${RUN_COMMAND}
ports:
- 10000:10000
env_file: .env
volumes:
- flink:/streampark/flink/${FLINK}
- /var/run/docker.sock:/var/run/docker.sock
- /etc/hosts:/etc/hosts:ro
- ~/.kube:/root/.kube:ro
privileged: true
restart: unless-stopped
jobmanager:
image: apache/flink:1.14.4-scala_2.12-java8
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
volumes:
- ./conf:/opt/flink/conf
- ./tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
- ./tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: apache/flink:1.14.4-scala_2.12-java8
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"
volumes:
- ./conf:/opt/flink/conf
- ./tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
- ./tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
flink:
??重新在當前部署路徑下執(zhí)行部署命令:
docker-compose -f docker-compose-windows.yaml up -d
??docker-compose 掛載目錄
https://blog.csdn.net/SMILY12138/article/details/130305102
??可以看出在當前的deplay先會自動創(chuàng)建一個tmp文件夾,里面會自動創(chuàng)建flink-checkpoints-directory、flink-savepoints-directory
??然后上面那個錯誤就沒有報了,就可以正常的創(chuàng)建寫入文件到這個兩個掛載的目錄中了:
??這個掛載文集解決了之后,重新啟動任務(wù)就會自動提示選擇checkpoint了
??任務(wù)第一次啟動的時候不設(shè)置savepoint,第一次就指定會找不到_meatedata報錯,當停止任務(wù)的時候給一個savepoint的如下,然后重新啟動就可以自動選擇savepoint了:
# savepoint的寫法是
file:/tmp/flink-savepoints-directory
??停止執(zhí)行savepoint的位置:
??重啟選擇last-savepoint啟動:
??由于Linux的/tmp下重啟文件會被刪除,所以我重新修改了docker-compose-windows.yaml 如下,這一版本也是最終的部署版本,windows環(huán)境下可以直接使用,Linux上稍微改下也是可以使用的:
version: '2.1'
services:
streampark-console:
image: my_streampark:2.1.0
command: ${RUN_COMMAND}
ports:
- 10000:10000
env_file: .env
volumes:
- flink:/streampark/flink/${FLINK}
- /var/run/docker.sock:/var/run/docker.sock
- /etc/hosts:/etc/hosts:ro
- ~/.kube:/root/.kube:ro
privileged: true
restart: unless-stopped
jobmanager:
image: apache/flink:1.14.4-scala_2.12-java8
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
volumes:
- ./webUpDir:/usr/local/flink/upload
- ./webTepDir:/usr/local/flink/tmpdir
- ./conf:/opt/flink/conf
- ./tmp/flink-checkpoints-directory:/usr/local/flink/flink-checkpoints-directory
- ./tmp/flink-savepoints-directory:/usr/local/flink/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: apache/flink:1.14.4-scala_2.12-java8
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"
volumes:
- ./webUpDir:/usr/local/flink/upload
- ./webTepDir:/usr/local/flink/tmpdir
- ./conf:/opt/flink/conf
- ./tmp/flink-checkpoints-directory:/usr/local/flink/flink-checkpoints-directory
- ./tmp/flink-savepoints-directory:/usr/local/flink/flink-savepoints-directory
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
flink:
??flink-conf.yaml新增兩個配置:
jobmanager.rpc.address: jobmanager
blob.server.port: 6124
query.server.port: 6125
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory
heartbeat.interval: 1000
heartbeat.timeout: 5000
rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000
classloader.resolve-order: parent-first
taskmanager.memory.managed.fraction: 0.1
taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 7072m
# 新增兩個配置
web.upload.dir: /usr/local/flink/upload
web.tmpdir: /usr/local/flink/tmpdir
??這兩個配置用于配置flink的webui端上傳或者臨時文件做一個持久化(或者通過http的方式)提交任務(wù)的jar,streampark提交的cdc的任務(wù)會構(gòu)架一個jar包然后調(diào)用flink的接口給flink上傳一個jar包來執(zhí)行這個任務(wù),所以這個任務(wù)的包需要做一個持久化:
??兩參數(shù)的官方位置
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/
??Flink standalone集群問題記錄
https://blog.csdn.net/LeoGanlin/article/details/124692129
??webTepDir:
??webUpDir:
??解決了savepoint和checkpoint的掛載問題和重啟后flink的jar任務(wù)丟失,然后我們先停止三個容器,然后重新啟動后,看flink里面的jar包任務(wù)還在的,streampark的界面的任務(wù)也是正常執(zhí)行的,然后去驗證cdc,去mysql客戶端新增、修改和刪除關(guān)聯(lián)數(shù)據(jù),在es中也是可以實時同步的;savepoint和checkpoint持久化可以使用fliesystem掛載到本機目錄,或者是使用hdfs、oss、S3等等,官方都有文檔說明的。
6.資料
鏈接:https://pan.baidu.com/s/1ajAAcjsMOxYR9-uQW0jzmw
提取碼:c3nv
??資料包內(nèi)容:
??部署文件夾:
7.streampark官方提供的最新的二進制試用包
??試用版streampark二進制安裝包:
apache-streampark 2.11:
鏈接:https://pan.baidu.com/s/1O_YSE-7Jqb4O2A3H9lHT3A
提取碼:7cm6
apache-streampark 2.12:
鏈接:https://pan.baidu.com/s/1pRqMXP1PbZcgSJ5Dt1g68A
提取碼:ce00
??官方雖然給我們重新搞了兩個二進制試用包,不推薦使用最新的包,因為有想不到的bug和踩不完的坑,嘗鮮使用下也是可以的。文章來源:http://www.zghlxwxcb.cn/news/detail-510126.html
8.總結(jié)
??到此我的分享就結(jié)束了,在實踐的過程中也遇到了很多的問題,同時在解決問題的過程中也有很多的收獲,也結(jié)識了一些大佬,在和大佬交流的過程中也得到了一些啟發(fā)和學到了一些東西,希望我的分享能給你帶來幫助,請一鍵三連,么么噠!文章來源地址http://www.zghlxwxcb.cn/news/detail-510126.html
到了這里,關(guān)于ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!