国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

這篇具有很好參考價值的文章主要介紹了ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1.ApacheStreamPark是什么?

ApacheStreamPark是流處理極速開發(fā)框架,流批一體 & 湖倉一體的云原生平臺,一站式流處理計算平臺。

2.介紹

2.1 特性

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??特性中的簡單易用和文檔詳盡這兩點我也是深有體會的,部署一點都不簡單,照著官方文檔都不一定能搞出來,下面部署環(huán)節(jié)慢慢來吐槽吧。

2.2 架構(gòu)

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

2.3 Zeppelin和StreamPark的對比

??之前我們寫 Flink SQL 基本上都是使用 Java 包裝 SQL,打 jar 包,提交到 S3 平臺上。通過命令行方式提交代碼,但這種方式始終不友好,流程繁瑣,開發(fā)和運維成本太大。我們希望能夠進一步簡化流程,將 Flink TableEnvironment 抽象出來,有平臺負責初始化、打包運行 Flink 任務(wù),實現(xiàn) Flink 應(yīng)用程序的構(gòu)建、測試和部署自動化。

??這是個開源興起的時代,我們自然而然的將目光投向開源領(lǐng)域中:在一眾開源項目中,經(jīng)過對比各個項目綜合評估發(fā)現(xiàn) Zeppelin 和 StreamPark 這兩個項目對 Flink 的支持較為完善,都宣稱支持 Flink on K8s ,最終進入到我們的目標選擇范圍中,以下是兩者在 K8s 相關(guān)支持的簡單比較。

功能 Zeppelin StreamPark
任務(wù)狀態(tài)監(jiān)控 稍低 ,不能作為任務(wù)狀態(tài)監(jiān)控工具 較高
任務(wù)資源管理 有 ,但目前版本還不是很健全
本地化部署 稍低 ,on K8s 模式只能將 Zeppelin 部署在 K8s 中,否則就需要打通 Pod 和外部網(wǎng)絡(luò),但是這在生產(chǎn)環(huán)境中很少這樣做的 可以本地化部署
多語言支持 較高 ,支持 Python/Scala/Java 多語言 一般 ,目前 K8s 模式和 YARN 模式同時支持 FlinkSQL,并可以根據(jù)自身需求,使用 Java/Scala 開發(fā) DataStream
Flink WebUI 代理 目前還支持的不是很完整 ,主開發(fā)大佬目前是考慮整合 Ingress 較好 ,目前支持 ClusterIp/NodePort/LoadBalance 模式
學習成本 成本較低 ,需要增加額外的參數(shù)學習,這個和原生的 FlinkSQL 在參數(shù)上有點區(qū)別 無成本 ,K8s 模式下 FlinkSQL 為原生支持的 SQL 格式;同時支持 Custome-Code(用戶編寫代碼開發(fā)Datastream/FlinkSQL 任務(wù))
Flink 多版本支持 支持 支持
Flink 原生鏡像侵入 有侵入 ,需要在 Flink 鏡像中提前部署 jar 包,會同 JobManager 啟動在同一個 Pod 中,和 zeppelin-server 通信 無侵入 ,但是會產(chǎn)生較多鏡像,需要定時清理
代碼多版本管理 支持 支持

3.相關(guān)連接

ApacheStreamPark官方文檔

https://streampark.apache.org/zh-CN/

flink1.14.4官網(wǎng)

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh

streampark2.1.0的gitHub地址

https://github.com/apache/incubator-streampark/tree/release-2.1.0

本地調(diào)試啟動、編譯指南

https://z87p7jn1yv.feishu.cn/docx/X4UfdZ8cdoeK8ExQ7sUc1UHknps

多業(yè)務(wù)聚合查詢設(shè)計思路與實踐

https://mp.weixin.qq.com/s/N1TqaLaqGCDRH9jnmhvlzg

4.部署

??官方提供的在源碼文件的docker-compose.yam里面的鏡像是apache/streampark:latest,但是這個鏡像根本用不了,之前用這個和官方提供的那幾個鏡像2.1.0和2.1.0,這兩個鏡像版本可以在dockerHub的官網(wǎng)上搜索到,為啥用不了呢?因為我在部署的時候用的最新的鏡像,然后將源碼包中的腳本文件拉下來在本地數(shù)據(jù)庫里面streampark庫里面執(zhí)行了,然后使用官網(wǎng)給的鏡像部署yaml后,發(fā)現(xiàn)容器一直在重啟,然后我就看了下容器的日志,發(fā)現(xiàn)有關(guān)于數(shù)據(jù)庫的表字段確實的報錯,然后我就很是好奇和納悶,就將確實的子段在表里面補全了,然后重啟后可以啟動起來,但是還是用不了,然后我就聯(lián)系到官方,才得知他們的最新的鏡像apache/streampark:latest里里面的jar包使用的是開發(fā)分支的開發(fā)版本,所以才會有用不了的問題,官方在源碼版本、鏡像版本和sql版本這方面做的對應(yīng)關(guān)系上還是做的不夠的,這個也是讓使用者很頭疼的一個問題,明明是按照官網(wǎng)的文檔來搞的,為啥都搞不通?所以說上面的特性中的易用性和文檔詳盡可以說是值得讓人吐槽了。

??那如何解決呢?

??給官方反饋了這個問題,但是官方建議使用源碼構(gòu)建部署,然后我突發(fā)奇想,我自己構(gòu)建一個二進制的源碼包,然后在構(gòu)建一個鏡像試一下看看給的行,于是乎就就進行了漫長的嘗試之路。

4.1 二進制包編譯構(gòu)建

??編譯構(gòu)建二進制可執(zhí)行包,使用自己構(gòu)建的二進制包構(gòu)建Docker鏡像,需要準備一臺Linux的服務(wù)或者是虛擬機,可以正常上網(wǎng)即可,在該臺機子上需要事先安裝Git(拉取源碼文件),Maven和java環(huán)境(JDK1.8),我采用的是是上傳的源碼包:incubator-streampark-2.1.0.tar.gz,然后解壓源碼包:

tar -zxvf incubator-streampark-2.1.0.tar.gz 

??解壓到服務(wù)器上,然后進入到解壓路徑里面:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??執(zhí)行:

./build.sh

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??編譯構(gòu)建會去下載很多的pom依賴,所以需要經(jīng)過漫長的等待,如果你的網(wǎng)絡(luò)速度夠快的話,估計也挺快的,然后編譯構(gòu)建完成后會在當前目錄下看到一個dist的目錄,里面就生成了一個二進制的可執(zhí)行部署的源碼包了:apache-streampark_2.12-2.1.0-incubating-bin.tar.gz,這里源碼編譯構(gòu)建就構(gòu)建好了,下面構(gòu)建鏡像需要用到這個包。

4.2 鏡像構(gòu)建

??需要將Dockerfile文件和apache-streampark_2.12-2.1.0-incubating-bin.tar.gz放在同一個路徑下(目錄下)然后執(zhí)行構(gòu)建命令

??Dockerfile文件

FROM alpine:3.16 as deps-stage
COPY . /
WORKDIR /
RUN tar zxvf apache-streampark_2.12-2.1.0-incubating-bin.tar.gz \
&& mv apache-streampark_2.12-2.1.0-incubating-bin streampark

FROM docker:dind
WORKDIR /streampark
COPY --from=deps-stage /streampark /streampark

ENV NODE_VERSION=16.1.0
ENV NPM_VERSION=7.11.2

RUN apk add openjdk8 ; \ # 這里會報錯,在windows環(huán)境用;在linux上使用&&
    apk add maven ; \
    apk add wget ; \
    apk add vim ; \
    apk add bash; \
    apk add curl

ENV JAVA_HOME=/usr/lib/jvm/java-1.8-openjdk
ENV MAVEN_HOME=/usr/share/java/maven-3
ENV PATH $JAVA_HOME/bin:$PATH
ENV PATH $MAVEN_HOME/bin:$PATH

RUN wget "https://nodejs.org/dist/v$NODE_VERSION/node-v$NODE_VERSION-linux-x64.tar.gz" \
    && tar zxvf "node-v$NODE_VERSION-linux-x64.tar.gz" -C /usr/local --strip-components=1 \
    && rm "node-v$NODE_VERSION-linux-x64.tar.gz" \
    && ln -s /usr/local/bin/node /usr/local/bin/nodejs \
    && curl -LO https://dl.k8s.io/release/v1.23.0/bin/linux/amd64/kubectl \
    && install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

RUN mkdir -p ~/.kube

EXPOSE 10000

??構(gòu)建命令:

docker build -f Dockerfile -t my_streampark:2.1.0 .
#推送阿里云鏡像倉庫(略)

??這里給大家提供了我自己構(gòu)建的鏡像如下:

registry.cn-hangzhou.aliyuncs.com/bigfei/zlf:streampark2.1.0

4.3 初始化sql

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??執(zhí)行的過程會碰到兩個錯誤:

-- 1.Unknown column !launch' in 't flink_app'
alter table "t flink_app'
-- drop index“inx state": 2.注釋這個一行
-- 這個是在2.1.0的版本里面的flink_app這個表里面缺少的字段和索引,可以或略,或者是在表里加上launch字段,不影響我我們下面部署2.1.0來使用這個庫里的sql數(shù)據(jù)的

??streampark庫如下:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??可以使用資料里面的:streampark.sql,是我執(zhí)行了官方的那個sql后將streampark庫導(dǎo)出來的一個腳本,用我給的這個也是沒有問題的。

4.4 部署

4.4.1 Docker-compose.yaml部署腳本

version: '2.1'
services:
  streampark-console:
    image: my_streampark:2.1.0
    command: ${RUN_COMMAND}
    ports:
      - 10000:10000
    env_file: .env
    volumes:
      - flink:/streampark/flink/${FLINK}
      - /var/run/docker.sock:/var/run/docker.sock
      - /etc/hosts:/etc/hosts:ro
      - ~/.kube:/root/.kube:ro
    privileged: true
    restart: unless-stopped
    
  jobmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    command: "jobmanager.sh start-foreground"
    ports:
      - 8081:8081
    volumes:
      - ./conf:/opt/flink/conf
      - /tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    depends_on:
      - jobmanager
    command: "taskmanager.sh start-foreground"
    volumes:
      - ./conf:/opt/flink/conf
      - /tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

volumes:
  flink:

??這個文件是我把flink的部署和streampark的部署合并修改了下,注意不要使用streampark官網(wǎng)的那種方式,搞了一個橋接的網(wǎng)絡(luò),否則有可能導(dǎo)致容器間的網(wǎng)絡(luò)不通。

4.4.2 配置文件準備

??deplay文件夾下:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??conf文件夾如下:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??需要修改.env和conf里面的application.yaml文件里面streampark數(shù)據(jù)庫相關(guān)的連接信息,這個application可以自己搞個目錄掛載到容器的如下路徑:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??把官方的那個拿出來改一改然后掛載,我這個好像是沒有生效的,

??相關(guān)資料會在文末分享的。

4.4.3 flink啟動配置

flink官網(wǎng)內(nèi)存配置

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/memory/mem_setup_tm/

4.4.4 streampark啟動配置

??flink-conf.yaml文件配置

jobmanager.rpc.address: jobmanager
blob.server.port: 6124
query.server.port: 6125

state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory

heartbeat.interval: 1000
heartbeat.timeout: 5000

rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000

classloader.resolve-order: parent-first

taskmanager.memory.managed.fraction: 0.1 
taskmanager.memory.process.size: 2048m 

jobmanager.memory.process.size: 7072m

4.4.5 遇到的問題

??由于我之前搞的flink部署有點問題,使用了橋接網(wǎng)絡(luò),導(dǎo)致直接使用flink的sql-client.sh執(zhí)行之前的cdc失敗了,報了如下的錯誤:

java.net,UnknownHostException: jobmanager: Temporary failure in name resolution

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??然后我就把部署文件改成上面那種方式,后面把之前啟動的容器全部刪除,重新部署后就可以正常執(zhí)行了。

??之前還遇到一個錯誤就是在cdc實踐的時候會遇到的問題,streampark提交啟動了cdc任務(wù),但是flink的jobs里面這個任務(wù)執(zhí)行失敗了:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

java.util.concurrent.CompletionException: java.util.concurrent.Completiotion: org.apache.flink.runtime.jobmanager.schedulerloResourceAvailableException: Could not acquire the minimurrequired resources.

??這個問題是之前flink采用橋接網(wǎng)絡(luò)搭建的有問題,導(dǎo)致jobmanager啟動不起來,使用上面正確的啟動方式和flink-conf.yaml里面的配置,對taskmanager和jobmanager的資源配置和內(nèi)存配置如下:

taskmanager.memory.managed.fraction: 0.1 
taskmanager.memory.process.size: 2048m 
jobmanager.memory.process.size: 7072m

??請根據(jù)官網(wǎng)先關(guān)flink的內(nèi)存參數(shù)來設(shè)置,資源盡量給大點,然后把之前有問題的容器刪除重新啟動后,三個容器都正常啟動了。

5 cdc實踐

5.1 確定flink是否正常

??flink首頁正常啟動在沒有任務(wù)執(zhí)行的時候可以看到slot的數(shù)據(jù)量:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??正常啟動taskManagers里面可以看到task的信息:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??job-manager的信息:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

5.2 streampark管理端配置

??streampark的默認的用戶名和密碼是:admin/streampark

5.2.1 flink-home配置

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

5.2.2 flink-cluster配置

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

5.2.3 新增cdc-sql和上傳jar或添加依賴

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??flink的job-manager節(jié)點和task-manager節(jié)點的/opt/flink/lib節(jié)點下我都傳了上面那幾個jar包了,然后用這個streampark來管理你只要把你任務(wù)用到的jar的上或者是把jar的maven依賴填上去,然后任務(wù)在大包的時候會將這個這些依賴全部打包到任務(wù)的jar包中,最后提交給flink去執(zhí)行,這種是不是更加的方便快捷高效的管理任務(wù)了呢。

5.3 cdc執(zhí)行成功實例

??cdc相關(guān)的請看

??多業(yè)務(wù)聚合查詢設(shè)計思路與實踐

https://mp.weixin.qq.com/s/N1TqaLaqGCDRH9jnmhvlzg

??streampark端:

??streampark點擊開始啟任務(wù)的時候不選擇savepoint了,不然flink那邊會報錯的

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??flink端:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??需要容器一直運行中,如果重啟后之前的savepoint和chackpoint就沒了,這個感覺是flink的savepoint和checkpoint的配置沒有生效,還得重新研究下,如果重啟了,沒有之前的任務(wù)了,需要在streampark啟動下flink這邊就又有了。

??發(fā)現(xiàn)一個問題就是:剛才我重新提交了,但是flink的jobmanager的時候報了這個savepoin持久化到/tmp/flink-checkpoints-directory/文件中失敗了,這個有點離譜了嘛:

2023-06-14 15:48:58 2023-06-14 07:48:58,551 WARN  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Error while processing AcknowledgeCheckpoint message
2023-06-14 15:48:58 org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
2023-06-14 15:48:58     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
2023-06-14 15:48:58     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
2023-06-14 15:48:58     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
2023-06-14 15:48:58 Caused by: java.io.IOException: Mkdirs failed to create file:/tmp/flink-checkpoints-directory/acb95418d91e34f6cce478337154dd4f/chk-3
2023-06-14 15:48:58     at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:262) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:65) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:109) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1210) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:48:58     ... 6 more
2023-06-14 15:49:01 2023-06-14 07:49:01,533 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 4 (type=CHECKPOINT) @ 1686728941531 for job acb95418d91e34f6cce478337154dd4f.
2023-06-14 15:49:01 2023-06-14 07:49:01,557 WARN  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Error while processing AcknowledgeCheckpoint message
2023-06-14 15:49:01 org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 4. Failure reason: Failure to finalize checkpoint.
2023-06-14 15:49:01     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1100) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01     at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01     at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
2023-06-14 15:49:01     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_322]
2023-06-14 15:49:01     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_322]
2023-06-14 15:49:01     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
2023-06-14 15:49:01 Caused by: java.io.IOException: Mkdirs failed to create file:/tmp/flink-checkpoints-directory/acb95418d91e34f6cce478337154dd4f/chk-4

??然后我將我wsl的/tmp路徑下的flink-checkpoints-directory、flink-savepoints-directory的權(quán)限重新修改下:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??后面我又使用如下命令給兩個文件夾下所有文件授權(quán):

[root@DESKTOP-QF29H8K tmp]# chmod -R 777 flink-savepoints-directory/
[root@DESKTOP-QF29H8K tmp]# chmod -R 777 flink-checkpoints-directory/

??上面兩種授權(quán)都試了下,但是還是報錯了,這個不曉得是不是一個bug,還是我的checkpoints、savepoints有配置的有問題,這個問題我已經(jīng)反饋給官方了,估計在Linux上就沒有這個問題了,在windows上確實是奇葩的問題太多了。

??這個問題我知道是啥問題了,是掛載的問題,如果是linux系統(tǒng)是沒有這個問題的,但是在windows上可以使用絕對路徑和相當路徑來掛載,那就跟wsl里面的文件路徑?jīng)]有關(guān)系了哈,然后修改部署文件docker-compose-windows.yaml 如下:

version: '2.1'
services:
  streampark-console:
    image: my_streampark:2.1.0
    command: ${RUN_COMMAND}
    ports:
      - 10000:10000
    env_file: .env
    volumes:
      - flink:/streampark/flink/${FLINK}
      - /var/run/docker.sock:/var/run/docker.sock
      - /etc/hosts:/etc/hosts:ro
      - ~/.kube:/root/.kube:ro
    privileged: true
    restart: unless-stopped
    
  jobmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    command: "jobmanager.sh start-foreground"
    ports:
      - 8081:8081
    volumes:
      - ./conf:/opt/flink/conf
      - ./tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - ./tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    depends_on:
      - jobmanager
    command: "taskmanager.sh start-foreground"
    volumes:
      - ./conf:/opt/flink/conf
      - ./tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - ./tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

volumes:
  flink:

??重新在當前部署路徑下執(zhí)行部署命令:

docker-compose -f docker-compose-windows.yaml up -d

??docker-compose 掛載目錄

https://blog.csdn.net/SMILY12138/article/details/130305102

??可以看出在當前的deplay先會自動創(chuàng)建一個tmp文件夾,里面會自動創(chuàng)建flink-checkpoints-directory、flink-savepoints-directory

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??然后上面那個錯誤就沒有報了,就可以正常的創(chuàng)建寫入文件到這個兩個掛載的目錄中了:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??這個掛載文集解決了之后,重新啟動任務(wù)就會自動提示選擇checkpoint了

??任務(wù)第一次啟動的時候不設(shè)置savepoint,第一次就指定會找不到_meatedata報錯,當停止任務(wù)的時候給一個savepoint的如下,然后重新啟動就可以自動選擇savepoint了:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

# savepoint的寫法是
file:/tmp/flink-savepoints-directory

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??停止執(zhí)行savepoint的位置:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??重啟選擇last-savepoint啟動:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??由于Linux的/tmp下重啟文件會被刪除,所以我重新修改了docker-compose-windows.yaml 如下,這一版本也是最終的部署版本,windows環(huán)境下可以直接使用,Linux上稍微改下也是可以使用的:

version: '2.1'
services:
  streampark-console:
    image: my_streampark:2.1.0
    command: ${RUN_COMMAND}
    ports:
      - 10000:10000
    env_file: .env
    volumes:
      - flink:/streampark/flink/${FLINK}
      - /var/run/docker.sock:/var/run/docker.sock
      - /etc/hosts:/etc/hosts:ro
      - ~/.kube:/root/.kube:ro
    privileged: true
    restart: unless-stopped
    
  jobmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    command: "jobmanager.sh start-foreground"
    ports:
      - 8081:8081
    volumes:
      - ./webUpDir:/usr/local/flink/upload
      - ./webTepDir:/usr/local/flink/tmpdir
      - ./conf:/opt/flink/conf
      - ./tmp/flink-checkpoints-directory:/usr/local/flink/flink-checkpoints-directory
      - ./tmp/flink-savepoints-directory:/usr/local/flink/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    depends_on:
      - jobmanager
    command: "taskmanager.sh start-foreground"
    volumes:
      - ./webUpDir:/usr/local/flink/upload
      - ./webTepDir:/usr/local/flink/tmpdir
      - ./conf:/opt/flink/conf
      - ./tmp/flink-checkpoints-directory:/usr/local/flink/flink-checkpoints-directory
      - ./tmp/flink-savepoints-directory:/usr/local/flink/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

volumes:
  flink:

??flink-conf.yaml新增兩個配置:

jobmanager.rpc.address: jobmanager
blob.server.port: 6124
query.server.port: 6125

state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
state.savepoints.dir: file:///tmp/flink-savepoints-directory

heartbeat.interval: 1000
heartbeat.timeout: 5000

rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000

classloader.resolve-order: parent-first

taskmanager.memory.managed.fraction: 0.1 
taskmanager.memory.process.size: 2048m 

jobmanager.memory.process.size: 7072m
# 新增兩個配置
web.upload.dir: /usr/local/flink/upload
web.tmpdir: /usr/local/flink/tmpdir

??這兩個配置用于配置flink的webui端上傳或者臨時文件做一個持久化(或者通過http的方式)提交任務(wù)的jar,streampark提交的cdc的任務(wù)會構(gòu)架一個jar包然后調(diào)用flink的接口給flink上傳一個jar包來執(zhí)行這個任務(wù),所以這個任務(wù)的包需要做一個持久化:

??兩參數(shù)的官方位置

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??Flink standalone集群問題記錄

https://blog.csdn.net/LeoGanlin/article/details/124692129

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??webTepDir:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??webUpDir:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??解決了savepoint和checkpoint的掛載問題和重啟后flink的jar任務(wù)丟失,然后我們先停止三個容器,然后重新啟動后,看flink里面的jar包任務(wù)還在的,streampark的界面的任務(wù)也是正常執(zhí)行的,然后去驗證cdc,去mysql客戶端新增、修改和刪除關(guān)聯(lián)數(shù)據(jù),在es中也是可以實時同步的;savepoint和checkpoint持久化可以使用fliesystem掛載到本機目錄,或者是使用hdfs、oss、S3等等,官方都有文檔說明的。

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

6.資料

鏈接:https://pan.baidu.com/s/1ajAAcjsMOxYR9-uQW0jzmw 
提取碼:c3nv

??資料包內(nèi)容:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??部署文件夾:

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

7.streampark官方提供的最新的二進制試用包

ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐

??試用版streampark二進制安裝包:

apache-streampark 2.11: 
鏈接:https://pan.baidu.com/s/1O_YSE-7Jqb4O2A3H9lHT3A 
提取碼:7cm6

apache-streampark 2.12: 
鏈接:https://pan.baidu.com/s/1pRqMXP1PbZcgSJ5Dt1g68A 
提取碼:ce00

??官方雖然給我們重新搞了兩個二進制試用包,不推薦使用最新的包,因為有想不到的bug和踩不完的坑,嘗鮮使用下也是可以的。

8.總結(jié)

??到此我的分享就結(jié)束了,在實踐的過程中也遇到了很多的問題,同時在解決問題的過程中也有很多的收獲,也結(jié)識了一些大佬,在和大佬交流的過程中也得到了一些啟發(fā)和學到了一些東西,希望我的分享能給你帶來幫助,請一鍵三連,么么噠!文章來源地址http://www.zghlxwxcb.cn/news/detail-510126.html

到了這里,關(guān)于ApacheStreamPark2.1.0部署及執(zhí)行flink-cdc任務(wù)同步mysql表的數(shù)據(jù)到es的實踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink-cdc之讀取mysql變化數(shù)據(jù)

    flink-cdc之讀取mysql變化數(shù)據(jù)

    pom 代碼 注意開啟checkpoint 和不開啟是有區(qū)別的(savepoint也可以 啟動的flink指定時候 -s savepath) 不開啟,如果項目重啟了,會重新讀取所有的數(shù)據(jù) 開啟了,項目重啟了額,會根據(jù)保留的信息去讀取變化的數(shù)據(jù) ?mysql ? 數(shù)據(jù)庫表 ?增加一條數(shù)據(jù) 打印日志 op:c 是create ==FlinkCDC==

    2024年02月16日
    瀏覽(29)
  • 【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    【FLINK】Kafka數(shù)據(jù)源通過Flink-cdc進行實時數(shù)據(jù)同步

    CDC是Change Data Capture的縮寫,中文意思是 變更數(shù)據(jù)獲取 ,flink-cdc的作用是,通過flink捕獲數(shù)據(jù)源的事務(wù)變動操作記錄,包括數(shù)據(jù)的增刪改操作等,根據(jù)這些記錄可作用于對目標端進行實時數(shù)據(jù)同步。 下圖是flink-cdc最新支持的數(shù)據(jù)源類型: kafka的數(shù)據(jù)源要通過flink-cdc進行實時數(shù)

    2024年02月12日
    瀏覽(36)
  • Flinkx/Datax/Flink-CDC 優(yōu)劣勢對比

    Flinkx/Datax/Flink-CDC 優(yōu)劣勢對比

    Flinkx/Datax/Flink-CDC 優(yōu)劣勢對比_HiBoyljw的博客-CSDN博客 ? ? ? ?FlinkX是一款基于Flink的分布式離線/實時數(shù)據(jù)同步插件,可實現(xiàn)多種異構(gòu)數(shù)據(jù)源高效的數(shù)據(jù)同步,其由袋鼠云于2016年初步研發(fā)完成,目前有穩(wěn)定的研發(fā)團隊持續(xù)維護,已在Github上開源(開源地址詳見文章末尾),并維

    2024年02月07日
    瀏覽(21)
  • flink-cdc同步mysql數(shù)據(jù)到elasticsearch

    flink-cdc同步mysql數(shù)據(jù)到elasticsearch

    CDC是(Change Data Capture 變更數(shù)據(jù)獲?。┑暮喎Q。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù) 或 數(shù)據(jù)表的插入INSERT、更新UPDATE、刪除DELETE等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進行訂閱及消費。 cdc項目地址:https://github.com/ver

    2024年02月13日
    瀏覽(22)
  • flinkcdc 3.0 源碼學習之任務(wù)提交腳本flink-cdc.sh

    flinkcdc 3.0 源碼學習之任務(wù)提交腳本flink-cdc.sh

    大道至簡,用簡單的話來描述復(fù)雜的事,我是Antgeek,歡迎閱讀. 在flink 3.0版本中,我們僅通過一個簡單yaml文件就可以配置出一個復(fù)雜的數(shù)據(jù)同步任務(wù), 然后再來一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以將任務(wù)提交, 本文就是來探索一下這個shell腳本,主要是研究如何通過一個shell命

    2024年02月19日
    瀏覽(22)
  • 【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    【開發(fā)問題】flink-cdc不用數(shù)據(jù)庫之間的,不同類型的轉(zhuǎn)化

    我一開始是flink-cdc,oracle2Mysql,sql 我一開始直接用的oracle【date】類型,mysql【date】類型,sql的校驗通過了,但是真正操作數(shù)據(jù)的時候報錯,告訴我oracle的數(shù)據(jù)格式的日期數(shù)據(jù),不可以直接插入到mysql格式的日期數(shù)據(jù),說白了就是數(shù)據(jù)格式不一致導(dǎo)致的 我想的是既然格式不對

    2024年02月12日
    瀏覽(25)
  • SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    最近做的一個項目,使用的是pg數(shù)據(jù)庫,公司沒有成熟的DCD組件,為了實現(xiàn)數(shù)據(jù)變更消息發(fā)布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka。 監(jiān)聽數(shù)據(jù)變化,進行異步通知,做系統(tǒng)內(nèi)異步任務(wù)。 架構(gòu)方案(懶得寫了,看圖吧): -- 創(chuàng)建pg 高線數(shù)據(jù)同步用

    2024年02月02日
    瀏覽(31)
  • Flink-CDC Cannot instantiate the coordinator for operator Source

    在使用flink1.14.6版本cdc時出現(xiàn)報錯: Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.11-1.14.6.jar:1.14.6] at java.util.concurrent.CompletableFuture.uniWhenComp

    2024年02月12日
    瀏覽(22)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、達夢等數(shù)據(jù)庫開啟日志方法

    目錄 1. 前言 2. 數(shù)據(jù)源安裝與配置 2.1 MySQL 2.1.1 安裝 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安裝 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安裝 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安裝 2.4.2 CDC 配置 2.5達夢 2.4.1安裝 2.4.2CDC配置 3. 驗證 3.1 Flink版本與CDC版本的對應(yīng)關(guān)系 3.2 下載相關(guān)包 3.3 添加cdc jar 至lib目錄 3.4 驗

    2024年02月05日
    瀏覽(122)
  • 206.Flink(一):flink概述,flink集群搭建,flink中執(zhí)行任務(wù),單節(jié)點、yarn運行模式,三種部署模式的具體實現(xiàn)

    206.Flink(一):flink概述,flink集群搭建,flink中執(zhí)行任務(wù),單節(jié)點、yarn運行模式,三種部署模式的具體實現(xiàn)

    Flink官網(wǎng)地址:Apache Flink? — Stateful Computations over Data Streams | Apache Flink Flink是一個 框架 和 分布式處理引擎 ,用于對 無界 和 有界 數(shù)據(jù)流進行 有狀態(tài)計算 。 無界流(流): 有定義流的開始,沒有定義結(jié)束。會無休止產(chǎn)生數(shù)據(jù) 無界流數(shù)據(jù)必須持續(xù)處理 有界流(批): 有定

    2024年02月11日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包