大道至簡,用簡單的話來描述復雜的事,我是Antgeek,歡迎閱讀.
在flink 3.0版本中,我們僅通過一個簡單yaml文件就可以配置出一個復雜的數(shù)據(jù)同步任務,
然后再來一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以將任務提交,
本文就是來探索一下這個shell腳本,主要是研究如何通過一個shell命令+yaml文件將任務提交,其他的功能會在之后的文章中解讀
大數(shù)據(jù)小菜雞在努力學習中,文中內容有誤多多指點.
目錄
概述
流程圖
flink-cdc.sh解讀
完整代碼
逐行解讀
參考
概述
首先需要思考一下,如果是自己來實現(xiàn)這一效果,那么應該如何設計,用什么技術?
我們知道flinkcdc的同步任務實際上也是一個flink任務,最終的提交的還是一個flink任務,而flink任務實際上就是個java任務,用jps命令都是可以查到的.
我們在編寫flink streaming程序的時候,實際上主要的流程都是在一個main方法中,而main方法是可以接收參數(shù)的,所以這塊設計起來其實很簡單就是在shell腳本中獲取到FLINK_HOME路徑,然后將yaml文件通過命令行的方式傳遞到main方法中,然后再設計一個類來解析這個yaml文件形成一個任務實體類,然后根據(jù)這個實體類來生成一個flink任務,這就是一個大概的思路,里面肯定還有很多的細節(jié),接下來就通過這個flink-cdc.sh腳本的解讀來進一步看看大佬們是如何來實現(xiàn)這一功能的.
流程圖
這里使用一個流程圖來描述整個的流程,看完這個就知道這一腳本的大概內容了,如果有興趣可以繼續(xù)往下閱讀,后面都是將腳本的一行一行的解讀并配有中文注釋.
flink-cdc.sh解讀
源碼路徑 : flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh
完整代碼
#!/usr/bin/env bash
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Setup FLINK_HOME
args=("$@")
# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
for ((i=0; i < ${#args[@]}; i++)); do
case "${args[i]}" in
--flink-home)
if [[ -n "${args[i+1]}" ]]; then
FLINK_HOME="${args[i+1]}"
break
fi
;;
esac
done
if [[ -z $FLINK_HOME ]]; then
echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."
exit 1
fi
# Setup Flink related configurations
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
. $FLINK_HOME/bin/config.sh
# Define Flink CDC directories
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log
# Build Java classpath
CLASSPATH=""
# Add Flink libraries to the classpath
for jar in "$FLINK_HOME"/lib/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
# Add Flink CDC libraries to classpath
for jar in "$FLINK_CDC_LIB"/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
CLASSPATH=${CLASSPATH#:}
# Setup logging
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)
# JAVA_RUN should have been setup in config.sh
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"
逐行解析
參數(shù)傳入
#!/usr/bin/env bash
# Setup FLINK_HOME
# 獲取這個腳本的所有參數(shù),然后存儲到args變量中
# ${#args[@]} 獲取數(shù)組長度
# ${args[i]} 獲取數(shù)組第i個值
args=("$@")
設置FLINK_HOME這個變量
# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
# 遍歷傳入的參數(shù)檢查是否FLINK_HOME這個環(huán)境變量是通過命令行參數(shù) --flink-home傳遞進來的
# shell中case的語法
# case 值 in
# 模式1) # 這里的模式指的是shell中的通配符模式不是正則表達式,例如 a*,就是a開頭的任意字符串
# 代碼塊
# ;;
# 模式2)
# 代碼塊
# ;;
# *)
# 默認代碼塊
# ;;
# esac
for ((i=0; i < ${#args[@]}; i++)); do
case "${args[i]}" in
--flink-home)
# 如果匹配到到了就取他的下一個值給FLINK_HOME賦值,取值之前要判斷一下是否存在
# -n 就是檢查字符串長度是否大于0,大于0返回true,否則false
if [[ -n "${args[i+1]}" ]]; then
FLINK_HOME="${args[i+1]}"
break
fi
;;
esac
done
校驗FLINK_HOME這個變量是否設置成功
# 如果經(jīng)過上面的循環(huán)還是沒有給FLINK_HOME賦值就退出程序
# 提示 [錯誤] 不能夠在命令行參數(shù)--flink-home 或者 環(huán)境變量FLINK_HOME 找到 FLINK_HOME的值
if [[ -z $FLINK_HOME ]]; then
echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."
exit 1
fi
獲取Flink的一些相關配置
# Setup Flink related configurations
# 設置flink相關的配置
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
# 為了避免config.sh(這個文件在$FLINK_HOME/bin/config.sh)覆蓋掉FLINK_HOME這個變量,所以這里將它置位1
# 為什么置為1呢,這里可以看一下config.sh中的相關代碼,如下
# 可以看到如果變量_FLINK_HOME_DETERMINED為空那么就會把FLINK_HOME的值替換掉,所以這里將它的值賦值為1就是為了避免這個
# 具體FLINK_HOME會被替換成什么值呢
# dirname 就是要獲取文件路徑的路徑,例如dirname /home/user/a.txt 返回 /home/user/
# $SYMLINK_RESOLVED_BIN 是什么值呢
# 是切換到$bin路徑下,的絕對路徑(pwd -P的意思就是獲取實際文件系統(tǒng)路徑,pwd是獲取鏈接路徑)
# $bin是target的路徑
# target="$0" # $0就是當前腳本的名稱
# -L 判斷是否是一個鏈接符號,判斷target是否是一個鏈接符號
# 如果是一個鏈接符號,那么就執(zhí)行循環(huán)的代碼塊
# 跳出的條件是target變量不是一個鏈接符號或者循環(huán)了100次跳出循環(huán),-gt是大于 -ge是大于等于
# ls 就是列出目錄信息
# -ld 有兩個參數(shù) -l和-d,-l是長格式進行顯示,包括文件的屬性和權限信息,相當于ll
# -d是只顯示目錄自身的信息,而不列出目錄中的文件,無論是文件還是目錄,都不會進入它,僅是顯示它自身的信息
# -- 是一個特殊的選項, 用于分隔選項與參數(shù).它的作用是確保$target被視作參數(shù),即使$target是 - 開頭的,避免將其解析成選項
# 解釋一下 target=`expr "$ls" : '.* -> \(.*\)$'`
# 這行大概意思就是通過expr命令和正則表達式提取$ls變量中符號鏈接的目標路徑或者目錄,然后賦值給target
# expr 是一個執(zhí)行表達式的命令
# "$ls" 是作為參數(shù)傳遞給expr
# : '.* -> \(.*\)$' 這是一個正則表達式,用于匹配符號鏈接中的目標文件或目錄.通過使用圓括號 ( ) 捕獲模式,可以將匹配到的部分提取出來
# target="$0"
# # For the case, the executable has been directly symlinked, figure out
# # the correct bin path by following its symlink up to an upper bound.
# # Note: we can't use the readlink utility here if we want to be POSIX
# # compatible.
# iteration=0
# while [ -L "$target" ]; do
# if [ "$iteration" -gt 100 ]; then
# echo "Cannot resolve path: You have a cyclic symlink in $target."
# break
# fi
# ls=`ls -ld -- "$target"`
# target=`expr "$ls" : '.* -> \(.*\)$'`
# iteration=$((iteration + 1))
# done
# Convert relative path to absolute path and resolve directory symlinks
# bin=`dirname "$target"`
# SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
# if [ -z "$_FLINK_HOME_DETERMINED" ]; then
# FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
# fi
_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
# config.sh 需要 FLINK_CONF_DIR 配置
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
# 使用config.sh來配置 Flink相關的配置
. $FLINK_HOME/bin/config.sh
定義Flink cdc 的一些路徑
# Define Flink CDC directories
# 定義Flink cdc 的路徑
# SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
# 這行的大概意思就是要獲取腳本的絕對路徑
# ${BASH_SOURCE[0]} bash的特殊變量,獲取當前運行腳本的名稱
# $(dirname -- ${BASH_SOURCE[0]}) 獲取當前運行腳本的路徑(不能直接用這個,因為可能會因為軟連接或者其他情況導致路徑獲取不準確,最穩(wěn)妥的方法就是cd 到這個路徑然后pwd獲取絕對路徑),這里的 -- 就是防止后面的變量被識別成選項例如-開頭
# cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" 切換到這個路徑下
# &> /dev/null 就是將一些標準輸出和錯誤輸出都重定向到/dev/null,這樣可以使輸出更清晰
# && 當前一個命令執(zhí)行成功后執(zhí)行后面的命令
# pwd 獲取當前路徑
# FLINK_CDC_HOME="$SCRIPT_DIR"/..
# SCRIPT_DIR 的上級路徑就是FLINK_CDC_HOME的值,就是切換到了bin目錄的根目錄
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
export FLINK_CDC_HOME=$FLINK_CDC_HOME
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib
FLINK_CDC_LOG="$FLINK_CDC_HOME"/log
構建任務啟動需要的classpath
# Build Java classpath
# 構建 Java的calsspath
CLASSPATH=""
# Add Flink libraries to the classpath
# 將flink路徑下lib的jar包都添加到classpath中
for jar in "$FLINK_HOME"/lib/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
# Add Flink CDC libraries to classpath
# 將cdc下lib的jar包都添加到classpath
for jar in "$FLINK_CDC_LIB"/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
# 添加hadoop 的classpath
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
# 去掉字符串開頭的冒號 ,如果要去掉結尾的冒號 ${CLASSPATH%:}
CLASSPATH=${CLASSPATH#:}
設置日志相關的配置
# Setup logging
# 配置日志
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
# 啟動命令中將日志的配置參數(shù)拼接,指定日志文件以及日志配置文件
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)
啟動任務
# JAVA_RUN should have been setup in config.sh
# exec 是一個用于替換當前進程的命令,一般用在腳本中,會將當前腳本的執(zhí)行進程執(zhí)行的內容替換成exec后面命令
# 有什么作用呢?
# 1.減少系統(tǒng)資源 : 不用創(chuàng)建一個新的進程
# 2.重定向標準輸入/輸出 : 通過使用 exec 命令執(zhí)行新的命令.可以將標準輸入,輸出和錯誤重定向到新命令所指定的位置.
# 3.執(zhí)行后續(xù)操作:在腳本中,使用 exec 命令可以執(zhí)行一些命令或操作后,將控制權交給新的命令.這可以用于在腳本中完成某些初始化操作后,將腳本完全替換為另一個命令或程序.
# $JAVA_RUN 在config.sh就定義了,一般是java 或者 /bin/java
# -classpath 指定classpath路徑
# "${LOG_SETTINGS[@]}" 日志的一些配置信息
# com.ververica.cdc.cli.CliFrontend 入口類
# "$@" 所有的命令行參數(shù)傳到入口類中,通過String args[] 來接收
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"
參考
[1] : https://github.com/apache/flink
[2] : https://github.com/ververica/flink-cdc-connectors文章來源:http://www.zghlxwxcb.cn/news/detail-827559.html
[3] : https://blog.csdn.net/wang2leee/article/details/132521566文章來源地址http://www.zghlxwxcb.cn/news/detail-827559.html
到了這里,關于flinkcdc 3.0 源碼學習之任務提交腳本flink-cdc.sh的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!