K8s相關(guān)知識(shí)可以閱讀博主以下幾篇技術(shù)博客:
- K8s系列之:搭建高可用K8s v1.23.5集群詳細(xì)步驟,3個(gè)master節(jié)點(diǎn),3個(gè)Node節(jié)點(diǎn)
- K8s系列之:Pod的基本用法
- k8s系列之:kubectl子命令詳解一
- k8s系列之:kubectl子命令詳解二
- 更多K8s知識(shí)點(diǎn)詳見(jiàn)博主K8s系列文章
更多Debezium內(nèi)容請(qǐng)閱讀博主Debezium專欄,博主會(huì)持續(xù)更新Debezium專欄:
- Debezium專欄
一、概述
Debezium 可以輕松部署在開源容器管理平臺(tái) Kubernetes 上。該部署利用了 Strimzi 項(xiàng)目,該項(xiàng)目旨在通過(guò)自定義資源簡(jiǎn)化 Kubernetes 上 Apache Kafka 的部署。
為了測(cè)試您的部署,您可以使用 minikube,它會(huì)在本地計(jì)算機(jī)上啟動(dòng) Kubernetes 集群。如果您想在 minikube 上完全測(cè)試本文檔中描述的 Debezium 部署,則需要在 minikube 上設(shè)置不安全的容器映像注冊(cè)表。為此,您需要使用 --insecure-registry 標(biāo)志啟動(dòng) minikube:
$ minikube start --insecure-registry "10.0.0.0/24"
10.0.0.1 是默認(rèn)的服務(wù)集群 IP,因此此設(shè)置允許在整個(gè)集群內(nèi)拉取鏡像。您還需要啟用注冊(cè)表 minikube 插件:
minikube addons enable registry
二、先決條件
為了使容器與集群上的其他工作負(fù)載分開,請(qǐng)為 Debezium 創(chuàng)建專用命名空間。在本文檔的其余部分中,將使用 debezium-example 命名空間:
kubectl create ns debezium-example
部署 Strimzi Operator
如上所述,對(duì)于 Debezium 部署,我們將使用 Strimzi,它管理 Kubernetes 上的 Kafka 部署。
部署minikube詳細(xì)步驟可以參考博主下面這篇技術(shù)博客:
- minikube從入門到精通系列之一:部署minikube詳細(xì)步驟
安裝 Strimzi 最簡(jiǎn)單的方法是通過(guò) Operator Lifecycle Manager (OLM)。如果您的集群上尚未安裝 OLM,可以通過(guò)運(yùn)行以下命令來(lái)安裝它:
curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0
現(xiàn)在,安裝 Strimzi operator本身:
kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml
三、為數(shù)據(jù)庫(kù)創(chuàng)建Secrets
稍后,在部署 Debezium Kafka 連接器時(shí),我們需要提供連接器的用戶名和密碼才能連接到數(shù)據(jù)庫(kù)。出于安全原因,最好不要直接提供憑據(jù),而是將它們保存在單獨(dú)的安全位置。 Kubernetes 為此提供了 Secret 對(duì)象。除了創(chuàng)建 Secret 對(duì)象本身之外,我們還必須創(chuàng)建一個(gè)角色和角色綁定,以便 Kafka 可以訪問(wèn)憑證。
我們先創(chuàng)建 Secret 對(duì)象:
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Secret
metadata:
name: debezium-secret
namespace: debezium-example
type: Opaque
data:
username: ZGViZXppdW0=
password: ZGJ6
EOF
用戶名和密碼包含用于連接 MySQL 數(shù)據(jù)庫(kù)的 Base64 編碼憑據(jù) (debezium/dbz),我們稍后將部署該數(shù)據(jù)庫(kù)。
現(xiàn)在,我們可以創(chuàng)建一個(gè)角色,它引用上一步中創(chuàng)建的秘密:
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: debezium-example
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["debezium-secret"]
verbs: ["get"]
EOF
我們還必須將此角色綁定到 Kafka Connect 集群服務(wù)帳戶,以便 Kafka Connect 可以訪問(wèn)密鑰:
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: debezium-example
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: debezium-example
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
EOF
一旦我們部署 Kafka Connect,服務(wù)帳戶將由 Strimzi 創(chuàng)建。服務(wù)帳戶的名稱采用 $KafkaConnectName-connect 形式。稍后,我們將創(chuàng)建名為 debezium-connect-cluster 的 Kafka Connect 集群,因此我們?cè)谶@里使用 debezium-connect-cluster-connect 作為 subject.name。
四、部署Apache Kafka
接下來(lái),部署一個(gè)(單節(jié)點(diǎn))Kafka 集群:
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: debezium-cluster
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
EOF
等待它準(zhǔn)備好:
$ kubectl wait kafka/debezium-cluster --for=condition=Ready --timeout=300s -n debezium-example
五、部署數(shù)據(jù)源
下面將使用MySQL作為數(shù)據(jù)源。除了使用 MySQL 運(yùn)行 pod 之外,還需要一個(gè)適當(dāng)?shù)姆?wù)來(lái)指向帶有 DB 本身的 pod。它可以被創(chuàng)建,例如如下:
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: quay.io/debezium/example-mysql:2.3
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: debezium
- name: MYSQL_USER
value: mysqluser
- name: MYSQL_PASSWORD
value: mysqlpw
ports:
- containerPort: 3306
name: mysql
EOF
六、部署 Debezium 連接器
要部署 Debezium 連接器,您需要在實(shí)例化實(shí)際連接器本身之前部署具有所需連接器插件的 Kafka Connect 集群。第一步,必須創(chuàng)建帶有插件的 Kafka Connect 容器映像。如果您已經(jīng)構(gòu)建了容器鏡像并在注冊(cè)表中可用,則可以跳過(guò)此步驟。本文檔以MySQL連接器為例。
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: 10.110.154.103/debezium-connect-mysql:latest
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF
您必須將registry的IP地址10.110.154.103替換為可以推送鏡像的registry。如果您使用注冊(cè)表插件在 minikube 上運(yùn)行它,您可以將映像推送到內(nèi)部 minikube 注冊(cè)表中。注冊(cè)表的 IP 地址可以通過(guò)以下方式獲得:通過(guò)運(yùn)行
kubectl -n kube-system get svc registry -o jsonpath='{.spec.clusterIP}'
為簡(jiǎn)單起見(jiàn),我們跳過(guò)了下載工件的校驗(yàn)和驗(yàn)證。如果您想確保工件已正確下載,請(qǐng)通過(guò) sha512sum 屬性指定其校驗(yàn)和。
如果您在本地或遠(yuǎn)程注冊(cè)表(例如 quay.io 或 DockerHub)中已經(jīng)有合適的容器映像,則可以使用此簡(jiǎn)化版本:
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
image: 10.110.154.103/debezium-connect-mysql:latest
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
EOF
另請(qǐng)注意,我們已經(jīng)配置了 Strimzi 秘密提供程序。這個(gè)秘密提供者將為這個(gè) Kafka Connect 集群創(chuàng)建一個(gè)服務(wù)帳戶(我們已經(jīng)將其綁定到適當(dāng)?shù)慕巧?,并允許 Kafka Connect 訪問(wèn)我們的 Secret 對(duì)象。
七、創(chuàng)建 Debezium 連接器
要?jiǎng)?chuàng)建 Debezium 連接器,您只需創(chuàng)建具有適當(dāng)配置的 KafkaConnector,在本例中為 MySQL:
$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: ${secrets:debezium-example/debezium-secret:username}
database.password: ${secrets:debezium-example/debezium-secret:password}
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory
EOF
您可以注意到,我們?cè)谶B接器配置中沒(méi)有使用純文本用戶名和密碼,而是引用我們之前創(chuàng)建的 Secret 對(duì)象。
八、驗(yàn)證部署
要驗(yàn)證一切正常,您可以例如開始觀察mysql.inventory.customers Kafka Topic:
kubectl run -n debezium-example -it --rm
--image=quay.io/debezium/tooling:1.2
--restart=Never watcher
-- kcat -b debezium-cluster-kafka-bootstrap:9092
-C -o beginning -t mysql.inventory.customers
連接MySQL數(shù)據(jù)庫(kù):
kubectl run -n debezium-example -it --rm
--image=mysql:8.0 --restart=Never
--env MYSQL_ROOT_PASSWORD=debezium mysqlterm
-- mysql -hmysql -P3306 -uroot -pdebezium
在客戶表中做一些更改:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-511799.html
sql> update customers set first_name="Sally Marie" where id=1001;
您現(xiàn)在應(yīng)該能夠觀察 Kafka 主題上的更改事件:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-511799.html
{
...
"payload": {
"before": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"after": {
"id": 1001,
"first_name": "Sally Marie",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"source": {
"version": "{debezium-version}",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1646300467000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 401,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1646300467746,
"transaction": null
}
}
到了這里,關(guān)于Debezium系列之:在 Kubernetes 上部署 Debezium的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!