国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flinkcdc 3.0 源碼學習之任務提交腳本flink-cdc.sh

這篇具有很好參考價值的文章主要介紹了flinkcdc 3.0 源碼學習之任務提交腳本flink-cdc.sh。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

大道至簡,用簡單的話來描述復雜的事,我是Antgeek,歡迎閱讀.
在flink 3.0版本中,我們僅通過一個簡單yaml文件就可以配置出一個復雜的數(shù)據(jù)同步任務,
然后再來一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以將任務提交,
本文就是來探索一下這個shell腳本,主要是研究如何通過一個shell命令+yaml文件將任務提交,其他的功能會在之后的文章中解讀
大數(shù)據(jù)小菜雞在努力學習中,文中內容有誤多多指點.

目錄

概述
流程圖
flink-cdc.sh解讀
完整代碼
逐行解讀
參考

概述

首先需要思考一下,如果是自己來實現(xiàn)這一效果,那么應該如何設計,用什么技術?

我們知道flinkcdc的同步任務實際上也是一個flink任務,最終的提交的還是一個flink任務,而flink任務實際上就是個java任務,用jps命令都是可以查到的.

我們在編寫flink streaming程序的時候,實際上主要的流程都是在一個main方法中,而main方法是可以接收參數(shù)的,所以這塊設計起來其實很簡單就是在shell腳本中獲取到FLINK_HOME路徑,然后將yaml文件通過命令行的方式傳遞到main方法中,然后再設計一個類來解析這個yaml文件形成一個任務實體類,然后根據(jù)這個實體類來生成一個flink任務,這就是一個大概的思路,里面肯定還有很多的細節(jié),接下來就通過這個flink-cdc.sh腳本的解讀來進一步看看大佬們是如何來實現(xiàn)這一功能的.

流程圖

這里使用一個流程圖來描述整個的流程,看完這個就知道這一腳本的大概內容了,如果有興趣可以繼續(xù)往下閱讀,后面都是將腳本的一行一行的解讀并配有中文注釋.
com.ververica.cdc.cli.clifrontend,flinkcdc,flink,flinkcdc,源碼

flink-cdc.sh解讀

源碼路徑 : flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh

完整代碼

#!/usr/bin/env bash
################################################################################
#  Copyright 2023 Ververica Inc.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Setup FLINK_HOME
args=("$@")
# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
for ((i=0; i < ${#args[@]}; i++)); do
    case "${args[i]}" in
        --flink-home)
            if [[ -n "${args[i+1]}" ]]; then
                FLINK_HOME="${args[i+1]}"
                break
            fi
            ;;
    esac
done
if [[ -z $FLINK_HOME ]]; then
  echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."
  exit 1
fi

# Setup Flink related configurations
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
. $FLINK_HOME/bin/config.sh

# Define Flink CDC directories
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log

# Build Java classpath
CLASSPATH=""
# Add Flink libraries to the classpath
for jar in "$FLINK_HOME"/lib/*.jar; do
  CLASSPATH=$CLASSPATH:$jar
done
# Add Flink CDC libraries to classpath
for jar in "$FLINK_CDC_LIB"/*.jar; do
  CLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
CLASSPATH=${CLASSPATH#:}

# Setup logging
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)

# JAVA_RUN should have been setup in config.sh
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

逐行解析

參數(shù)傳入

#!/usr/bin/env bash
# Setup FLINK_HOME
# 獲取這個腳本的所有參數(shù),然后存儲到args變量中
# ${#args[@]} 獲取數(shù)組長度
# ${args[i]} 獲取數(shù)組第i個值
args=("$@")

設置FLINK_HOME這個變量

# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
# 遍歷傳入的參數(shù)檢查是否FLINK_HOME這個環(huán)境變量是通過命令行參數(shù) --flink-home傳遞進來的
# shell中case的語法
# case 值 in
#      模式1) # 這里的模式指的是shell中的通配符模式不是正則表達式,例如 a*,就是a開頭的任意字符串
#           代碼塊
#           ;;
#      模式2)
#           代碼塊
#           ;;
#      *)
#           默認代碼塊
#           ;;
# esac
for ((i=0; i < ${#args[@]}; i++)); do
    case "${args[i]}" in
        --flink-home)
            # 如果匹配到到了就取他的下一個值給FLINK_HOME賦值,取值之前要判斷一下是否存在
            # -n 就是檢查字符串長度是否大于0,大于0返回true,否則false
            if [[ -n "${args[i+1]}" ]]; then
                FLINK_HOME="${args[i+1]}"
                break
            fi
            ;;
    esac
done

校驗FLINK_HOME這個變量是否設置成功

# 如果經(jīng)過上面的循環(huán)還是沒有給FLINK_HOME賦值就退出程序
# 提示 [錯誤] 不能夠在命令行參數(shù)--flink-home 或者 環(huán)境變量FLINK_HOME 找到 FLINK_HOME的值 
if [[ -z $FLINK_HOME ]]; then
  echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."
  exit 1
fi

獲取Flink的一些相關配置

# Setup Flink related configurations
# 設置flink相關的配置
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
# 為了避免config.sh(這個文件在$FLINK_HOME/bin/config.sh)覆蓋掉FLINK_HOME這個變量,所以這里將它置位1
# 為什么置為1呢,這里可以看一下config.sh中的相關代碼,如下
# 可以看到如果變量_FLINK_HOME_DETERMINED為空那么就會把FLINK_HOME的值替換掉,所以這里將它的值賦值為1就是為了避免這個
# 具體FLINK_HOME會被替換成什么值呢
# dirname 就是要獲取文件路徑的路徑,例如dirname /home/user/a.txt 返回 /home/user/
# $SYMLINK_RESOLVED_BIN 是什么值呢
# 是切換到$bin路徑下,的絕對路徑(pwd -P的意思就是獲取實際文件系統(tǒng)路徑,pwd是獲取鏈接路徑)
# $bin是target的路徑
# target="$0" # $0就是當前腳本的名稱
# -L 判斷是否是一個鏈接符號,判斷target是否是一個鏈接符號
# 如果是一個鏈接符號,那么就執(zhí)行循環(huán)的代碼塊
# 跳出的條件是target變量不是一個鏈接符號或者循環(huán)了100次跳出循環(huán),-gt是大于 -ge是大于等于
# ls 就是列出目錄信息
# -ld 有兩個參數(shù) -l和-d,-l是長格式進行顯示,包括文件的屬性和權限信息,相當于ll
# -d是只顯示目錄自身的信息,而不列出目錄中的文件,無論是文件還是目錄,都不會進入它,僅是顯示它自身的信息
# -- 是一個特殊的選項,	用于分隔選項與參數(shù).它的作用是確保$target被視作參數(shù),即使$target是 - 開頭的,避免將其解析成選項
# 解釋一下 target=`expr "$ls" : '.* -> \(.*\)$'`
# 這行大概意思就是通過expr命令和正則表達式提取$ls變量中符號鏈接的目標路徑或者目錄,然后賦值給target
# expr 是一個執(zhí)行表達式的命令
# "$ls" 是作為參數(shù)傳遞給expr
# : '.* -> \(.*\)$' 這是一個正則表達式,用于匹配符號鏈接中的目標文件或目錄.通過使用圓括號 ( ) 捕獲模式,可以將匹配到的部分提取出來


# target="$0"
# # For the case, the executable has been directly symlinked, figure out
# # the correct bin path by following its symlink up to an upper bound.
# # Note: we can't use the readlink utility here if we want to be POSIX
# # compatible.
# iteration=0
# while [ -L "$target" ]; do
#     if [ "$iteration" -gt 100 ]; then
#         echo "Cannot resolve path: You have a cyclic symlink in $target."
#         break
#     fi
#     ls=`ls -ld -- "$target"`
#     target=`expr "$ls" : '.* -> \(.*\)$'`
#     iteration=$((iteration + 1))
# done
# Convert relative path to absolute path and resolve directory symlinks
# bin=`dirname "$target"`
# SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
# if [ -z "$_FLINK_HOME_DETERMINED" ]; then
#     FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
# fi

_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
# config.sh 需要 FLINK_CONF_DIR 配置
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
# 使用config.sh來配置 Flink相關的配置
. $FLINK_HOME/bin/config.sh

定義Flink cdc 的一些路徑

# Define Flink CDC directories
# 定義Flink cdc 的路徑
# SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
# 這行的大概意思就是要獲取腳本的絕對路徑
# ${BASH_SOURCE[0]} bash的特殊變量,獲取當前運行腳本的名稱
# $(dirname -- ${BASH_SOURCE[0]}) 獲取當前運行腳本的路徑(不能直接用這個,因為可能會因為軟連接或者其他情況導致路徑獲取不準確,最穩(wěn)妥的方法就是cd 到這個路徑然后pwd獲取絕對路徑),這里的 -- 就是防止后面的變量被識別成選項例如-開頭
# cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" 切換到這個路徑下
# &> /dev/null 就是將一些標準輸出和錯誤輸出都重定向到/dev/null,這樣可以使輸出更清晰
# && 當前一個命令執(zhí)行成功后執(zhí)行后面的命令
# pwd 獲取當前路徑

# FLINK_CDC_HOME="$SCRIPT_DIR"/..
# SCRIPT_DIR 的上級路徑就是FLINK_CDC_HOME的值,就是切換到了bin目錄的根目錄
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log

構建任務啟動需要的classpath

# Build Java classpath
# 構建 Java的calsspath
CLASSPATH=""
# Add Flink libraries to the classpath
# 將flink路徑下lib的jar包都添加到classpath中
for jar in "$FLINK_HOME"/lib/*.jar; do
  CLASSPATH=$CLASSPATH:$jar
done
# Add Flink CDC libraries to classpath
# 將cdc下lib的jar包都添加到classpath
for jar in "$FLINK_CDC_LIB"/*.jar; do
  CLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
# 添加hadoop 的classpath
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
# 去掉字符串開頭的冒號 ,如果要去掉結尾的冒號 ${CLASSPATH%:}
CLASSPATH=${CLASSPATH#:}

設置日志相關的配置

# Setup logging
# 配置日志
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
# 啟動命令中將日志的配置參數(shù)拼接,指定日志文件以及日志配置文件
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)

啟動任務

# JAVA_RUN should have been setup in config.sh
# exec 是一個用于替換當前進程的命令,一般用在腳本中,會將當前腳本的執(zhí)行進程執(zhí)行的內容替換成exec后面命令
# 有什么作用呢?
# 1.減少系統(tǒng)資源 : 不用創(chuàng)建一個新的進程
# 2.重定向標準輸入/輸出 : 通過使用 exec 命令執(zhí)行新的命令.可以將標準輸入,輸出和錯誤重定向到新命令所指定的位置.
# 3.執(zhí)行后續(xù)操作:在腳本中,使用 exec 命令可以執(zhí)行一些命令或操作后,將控制權交給新的命令.這可以用于在腳本中完成某些初始化操作后,將腳本完全替換為另一個命令或程序.
# $JAVA_RUN 在config.sh就定義了,一般是java 或者 /bin/java
# -classpath 指定classpath路徑
# "${LOG_SETTINGS[@]}" 日志的一些配置信息
# com.ververica.cdc.cli.CliFrontend 入口類
# "$@" 所有的命令行參數(shù)傳到入口類中,通過String args[] 來接收
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

參考

[1] : https://github.com/apache/flink

[2] : https://github.com/ververica/flink-cdc-connectors

[3] : https://blog.csdn.net/wang2leee/article/details/132521566文章來源地址http://www.zghlxwxcb.cn/news/detail-827559.html

到了這里,關于flinkcdc 3.0 源碼學習之任務提交腳本flink-cdc.sh的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • flink的常見的任務提交方式

    flink的常見的任務提交方式

    此方式使用起來相對比較簡單,但是無法滿足需要設置savepoint暫存點的流式任務需求。 使用此方式需要先創(chuàng)建Flink遠方的執(zhí)行環(huán)境,然后按序執(zhí)行FlinkSql,流程如下: java示例如下: 此方式主要通過用java編寫一個任務,然后打成jar的形式上傳到flink集群。此方式比較靈活,可

    2024年04月26日
    瀏覽(21)
  • flink客戶端提交任務報錯

    { “errors”: [ “org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.ntat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest KaTeX parse error: Undefined control sequence: n at position 26: …ndler.java:110)?n?tat java.util.… UniHandle.tryFire(CompletableFuture.java:797)ntat j

    2024年02月15日
    瀏覽(94)
  • 使用Java代碼遠程提交flink任務

    導入依賴 參數(shù)格式參考: { ????\\\"jarPath\\\":\\\"C:\\\\flink-1.13.5\\\\examples\\\\streaming\\\\WordCount.jar\\\", ????\\\"parallelism\\\":1, ????\\\"entryPointClassName\\\":\\\"org.apache.flink.streaming.examples.wordcount.WordCount\\\" }

    2024年02月11日
    瀏覽(22)
  • flinkcdc 3.0 嘗鮮

    flinkcdc 3.0 嘗鮮

    本文會將從環(huán)境搭建到demo來全流程體驗flinkcdc 3.0 包含了如下內容 flink1.18 standalone搭建 doris 1fe1be 搭建 整庫數(shù)據(jù)同步 測試各同步場景 從檢查點重啟同步任務 下載flink 1.18.0 鏈接 : https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz 解壓 : 修改checkpoint 時間間隔 為3秒

    2024年01月22日
    瀏覽(18)
  • 采用seatunnel提交Flink和Spark任務

    seatunnel 是一個非常易用,高性能、支持實時流式和離線批處理的海量數(shù)據(jù)處理產品,架構于Apache Spark 和 Apache Flink之上。 seatunnel 讓Spark和Flink的使用更簡單,更高效。 注:當前版本用的是2.1.3版本? 如果在github下載自己編譯有問題 可在此地址下載編譯好的文件seatunnel-2.1.3-b

    2024年02月15日
    瀏覽(21)
  • flinkcdc 3.0 架構設計學習

    flinkcdc 3.0 架構設計學習

    本文將會了解到flinkcdc3.0版本的架構設計,從一個宏觀層面來學習flinkcdc3.0帶來的新特性 這也是作者目前覺得學習一項技術的思路和方法,就是首先先把demo跑起來體驗一下,然后整體了解一下架構設計,應用場景等,之后再去學習技術細節(jié)和源碼,由淺入深的學習. 文中內容有誤請多

    2024年02月22日
    瀏覽(17)
  • 關于flink重新提交任務,重復消費kafka的坑

    關于flink重新提交任務,重復消費kafka的坑

    按照以下方式設置backend目錄和checkpoint目錄,fsbackend目錄有數(shù)據(jù),checkpoint目錄沒數(shù)據(jù) 我以為checkpoint和fsbackend要同時設置,其實,1.14.3版本,setCheckpointStorage和stateBackend改成了分著設置 我上邊代碼這樣設置,相當于首先指定了以下checkpoint按照默認的backend存儲,然后又指定了按

    2024年02月03日
    瀏覽(23)
  • 基于PBS向超算服務器隊列提交任務的腳本模板與常用命令

    基于PBS向超算服務器隊列提交任務的腳本模板與常用命令

    ??本文介紹在 Linux 服務器中,通過 PBS (Portable Batch System)作業(yè)管理系統(tǒng)腳本的方式,提交任務到 服務器 隊列,并執(zhí)行任務的方法。 ??最近,需要在學校公用的超算中執(zhí)行代碼任務;而和多數(shù)超算設備一樣,其也是需要通過作業(yè)隊列的方式,來提交、管理、排序不同用

    2024年04月12日
    瀏覽(19)
  • Flink1.14提交任務報錯classloader.check-leaked-classloader問題解決

    我的hadoop版本是3.1.3,F(xiàn)link版本是1.14。不知道是hadoop版本的原因還是Flink版本更新的原因。當我運行一個簡單的Flink測試時,雖然結果出來了但是后面還跟著一段報錯信息。 測試命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 報錯信息: Trying to acce

    2024年02月11日
    瀏覽(26)
  • 20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務到集群上

    20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務到集群上

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包