物料準(zhǔn)備
- k8s Rancher, 阿里云的 nas 存儲(chǔ)
- 一臺(tái)物理機(jī)(需要掛載PVC: dags plugins 和 logs)
- mysql 數(shù)據(jù)庫(kù)和redis
- 包含airflow 以及對(duì)應(yīng)依賴庫(kù)的基礎(chǔ)鏡像
這里使用 airflow 的 CeleryExecutor 部署在 k8s 上,并不是使用 KubernetesExecutor.
基礎(chǔ)鏡像構(gòu)建
Dockerfile 文件
這里使用的是 airflow 官方的V2.6.0 的 python3.10 的鏡像
FROM apache/airflow:slim-latest-python3.10
USER root
EXPOSE 8080 5555 8793
COPY config/airflow.cfg /opt/airflow/airflow.cfg
RUN set -ex \
&& buildDeps=' \
freetds-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
libpq-dev \
git \
python3-dev \
gcc \
sasl2-bin \
libsasl2-2 \
libsasl2-dev \
libsasl2-modules \
' \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
freetds-bin \
build-essential \
default-libmysqlclient-dev \
apt-utils \
curl \
rsync \
netcat \
locales \
procps \
telnet
USER airflow
RUN pip install celery
RUN pip install flower
RUN pip install pymysql
RUN pip install mysqlclient
RUN pip install redis
RUN pip install livy==0.6.0
RUN pip install apache-airflow-providers-mysql
RUN pip install apache-airflow-providers-apache-hive
RUN airflow db init
# 保證基礎(chǔ)鏡像安全,執(zhí)行完數(shù)據(jù)庫(kù)初始化后刪除相關(guān)配置文件
RUN rm -rf /opt/airflow/airflow.cfg
構(gòu)建基礎(chǔ)鏡像并推送至鏡像倉(cāng)庫(kù):
- 在構(gòu)建airflow基礎(chǔ)鏡像的時(shí)候同時(shí)會(huì)初始化對(duì)應(yīng)的元數(shù)據(jù)庫(kù)
相關(guān)部署代碼 git 地址:https://github.com/itnoobzzy/EasyAirflow.git
拉取完代碼后進(jìn)入 EasyAirflow項(xiàng)目 創(chuàng)建 logs 目錄并且sudo -R chmod 777 logs
:
因?yàn)樵跇?gòu)建基礎(chǔ)鏡像的時(shí)候需要初始化元數(shù)據(jù)庫(kù),所以需要修改配置文件,這里主要需要修改四個(gè)地方:-
mv config/default_airflow.cfg config/airflow.cfg
, 并且修改 airflow.cfg 文件 - 將 executor 修改為 CeleryExecutor
- 修改 sql_alchemy_conn 使用已有的 mysql 數(shù)據(jù)庫(kù), 這里需要注意連接驅(qū)動(dòng)使用 mysql+pymysql
- 修改 broker_url 和 result_backend, broker 需要使用 redis 通信, result_backend 使用 mysql 存儲(chǔ),這里 result_backend 需要注意使用 db+mysql 連接驅(qū)動(dòng)
-
- 執(zhí)行構(gòu)建命令,并推送至鏡像倉(cāng)庫(kù)
docker build -t airflow:2.6.0 .
docker tag airflow:2.6.0 itnoobzzy/airflow:v2.6.0-python3.10
docker push itnoobzzy/airflow:v2.6.0-python3.10
部署步驟
-
創(chuàng)建 namespace: airflow-v2
-
創(chuàng)建 PVC
volumes.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/volumes.yaml
導(dǎo)入 rancher 后將創(chuàng)建三個(gè) PVC 分別存儲(chǔ) dags, logs, plugins如下:
掛載PVC至物理機(jī)器上,方便管理 dags, logs 和 plugins, 查看 PVC 詳情并執(zhí)行掛載命令,下邊是掛載 airflow-dags-pv 至機(jī)器上的例子:
掛載完后df -h
驗(yàn)證下 dags, logs, plugins 是否都掛載正常: -
創(chuàng)建 ConfigMap
configmap.yaml
文件 Git 地址: https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/configmap.yaml
將 yaml 文件導(dǎo)入 rancher 如下: -
創(chuàng)建 Secret(可選)
secret.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/secret.yaml
將 yaml 文件導(dǎo)入 rancher 如下, 需要注意 secret.yaml 文件中的數(shù)據(jù)庫(kù)信息需要 base64 加密:
可以將數(shù)據(jù)庫(kù)信息使用 k8s 的 secret 存儲(chǔ), 然后再 Deployment yaml 文件中使用環(huán)境變量獲取 secret 中的配置信息。 -
創(chuàng)建 scheduler Deployment
scheduler-dp.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/scheduler-dp.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-scheduler namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: scheduler release: v2.6.0 template: metadata: labels: tier: airflow component: scheduler release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: scheduler image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "scheduler"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc
-
創(chuàng)建 webserver Deployment 和 Service
webserver.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/webserver.yaml
kind: Deployment
apiVersion: apps/v1
metadata:
name: airflow-webserver
namespace: airflow-v2
spec:
replicas: 1
selector:
matchLabels:
tier: airflow
component: webserver
release: v2.6.0
template:
metadata:
labels:
tier: airflow
component: webserver
release: v2.6.0
annotations:
cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 10
containers:
- name: webserver
image: itnoobzzy/airflow:v2.6.0-python3.10
imagePullPolicy: IfNotPresent
args: ["airflow", "webserver"]
env:
- name: AIRFLOW__CORE__FERNET_KEY
value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss=
- name: AIRFLOW__CORE__EXECUTOR
value: CeleryExecutor
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
- name: AIRFLOW__CELERY__BROKER_URL
valueFrom:
secretKeyRef:
name: airflow-secrets
key: broker_url
- name: AIRFLOW__CELERY__RESULT_BACKEND
valueFrom:
secretKeyRef:
name: airflow-secrets
key: result_backend
volumeMounts:
- name: logs-pv
mountPath: "/opt/airflow/logs"
- name: dags-pv
mountPath: "/opt/airflow/dags"
- name: plugins-pv
mountPath: "/opt/airflow/plugins"
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
volumes:
- name: config
configMap:
name: airflow-configmap
- name: logs-pv
persistentVolumeClaim:
claimName: airflow-logs-pvc
- name: dags-pv
persistentVolumeClaim:
claimName: airflow-dags-pvc
- name: plugins-pv
persistentVolumeClaim:
claimName: airflow-plugins-pvc
---
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-svc
spec:
type: ClusterIP
ports:
- name: airflow-webserver
port: 8080
targetPort: 8080
protocol: TCP
selector:
tier: airflow
component: webserver
release: v2.6.0
-
創(chuàng)建 flower Deployment 和 Service
flower.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/flower.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-flower namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: flower release: v2.6.0 template: metadata: labels: tier: airflow component: flower release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: flower image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "celery", "flower"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc --- apiVersion: v1 kind: Service metadata: name: airflow-flower-svc spec: type: ClusterIP ports: - name: airflow-flower port: 5555 targetPort: 5555 protocol: TCP selector: tier: airflow component: flower release: v2.6.0
-
創(chuàng)建 worker Deployment
worker.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/worker.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-worker namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: worker release: v2.6.0 template: metadata: labels: tier: airflow component: worker release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: worker image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "celery", "worker"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc
-
創(chuàng)建 webserver 和 flower 的 Ingress
Ingress.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/ingress.yaml--- apiVersion: extensions/v1beta1 kind: Ingress metadata: name: airflow-ingress spec: rules: - host: airflow-webserver.akulaku.com http: paths: - path: / backend: serviceName: airflow-webserver-svc servicePort: 8080 - host: airflow-flower.akulaku.com http: paths: - path: / backend: serviceName: airflow-flower-svc servicePort: 5555
驗(yàn)證
部署完后在瀏覽器中輸入 http://airflow-webserver.akulaku.com/ 訪問 webserver 界面(需要注意 /etc/hosts 文件配置了域名解析),webserver 初始化管理員用戶名和密碼都為 admin:
在瀏覽器中輸入 http://airflow-flower.akulaku.com/ 訪問 flower 界面:
這里flower worker name 是 worker Pod 的name:
觸發(fā) DAG 運(yùn)行并且在掛載的機(jī)器上查看對(duì)應(yīng)的日志:
機(jī)器上的 /data/app/k8s/EasyAirflow/logs
這個(gè)目錄就是前邊將 k8s 的 PVC 的內(nèi)容掛載在機(jī)器上對(duì)應(yīng)的目錄:
(base) [admin@data-landsat-test03 logs]$ view /data/app/k8s/EasyAirflow/logs/dag_id\=tutorial/run_id\=manual__2023-05-15T09\:27\:44.253944+00\:00/task_id\=sleep/attempt\=1.log
dag_id=tutorial/ dag_processor_manager/ scheduler/
[2023-05-15T09:27:47.187+0000] {taskinstance.py:1125} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.253944+00:00 [queued]>
[2023-05-15T09:27:47.195+0000] {taskinstance.py:1125} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.253944+00:00 [queued]>
[2023-05-15T09:27:47.195+0000] {taskinstance.py:1331} INFO - Starting attempt 1 of 4
[2023-05-15T09:27:47.206+0000] {taskinstance.py:1350} INFO - Executing <Task(BashOperator): sleep> on 2023-05-15 09:27:44.253944+00:00
[2023-05-15T09:27:47.209+0000] {standard_task_runner.py:57} INFO - Started process 71 to run task
[2023-05-15T09:27:47.213+0000] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'tutorial', 'sleep', 'manual__2023-05-15T09:27:44.253944+00:00', '--job-id', '75', '--raw', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpy65q1a3h']
[2023-05-15T09:27:47.213+0000] {standard_task_runner.py:85} INFO - Job 75: Subtask sleep
[2023-05-15T09:27:47.260+0000] {task_command.py:410} INFO - Running <TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.253944+00:00 [running]> on host airflow-worker-6f9ffb7fb8-t6j9p
[2023-05-15T09:27:47.330+0000] {taskinstance.py:1568} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='airflow@example.com' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='sleep' AIRFLOW_CTX_EXECUTION_DATE='2023-05-15T09:27:44.253944+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-05-15T09:27:44.253944+00:00'
[2023-05-15T09:27:47.331+0000] {subprocess.py:63} INFO - Tmp dir root location:
/tmp
[2023-05-15T09:27:47.331+0000] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'sleep 5']
[2023-05-15T09:27:47.355+0000] {subprocess.py:86} INFO - Output:
[2023-05-15T09:27:52.360+0000] {subprocess.py:97} INFO - Command exited with return code 0
[2023-05-15T09:27:52.383+0000] {taskinstance.py:1368} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20230515T092744, start_date=20230515T092747, end_date=20230515T092752
[2023-05-15T09:27:52.426+0000] {local_task_job_runner.py:232} INFO - Task exited with return code 0
[2023-05-15T09:27:52.440+0000] {taskinstance.py:2674} INFO - 0 downstream tasks scheduled from follow-on schedule check
總結(jié)
這里的將 airflow 部署在 k8s 上,并沒有使用 airflow 的 k8s executor ,不能夠做到任務(wù)執(zhí)行完后自動(dòng)停止掉 Pod,縮減成本。
但是 airflow 一般運(yùn)行的都是批處理任務(wù),集中在一個(gè)時(shí)間段內(nèi)運(yùn)行,目前我們公司使用的場(chǎng)景就是在夜間使用 airflow 跑大量離線處理任務(wù),因此在白天的時(shí)候可以將 airflow 的一些 worker 給停掉,晚上再根據(jù)實(shí)際情況增加對(duì)應(yīng)的 worker Pod。
但是在啟停 worker 的 Pod 的時(shí)候也有一些注意事項(xiàng):文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-491342.html
- 啟停能否做成自動(dòng)化的,在白天某個(gè)時(shí)間點(diǎn)開始停止 worker Pod, 在夜間某個(gè)時(shí)間點(diǎn)開始啟動(dòng) Pod。
- 需要優(yōu)雅停止,停止前需要等待 worker 中的任務(wù)運(yùn)行完畢(或者說(shuō)最多等待多久時(shí)間殺死任務(wù)進(jìn)程),并且不會(huì)再有新任務(wù)進(jìn)入將要停止的 Pod 中。
后邊針對(duì)上邊所說(shuō)的問題進(jìn)行研究,一旦發(fā)現(xiàn)好的解決方法和步驟,將與大家一起分享~文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-491342.html
到了這里,關(guān)于airflow v2.6.0 k8s 部署(Rancher)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!