大家應(yīng)該對 Kubernetes Events 并不陌生,特別是當你使用 kubectl describe 命令或 Event API 資源來了解集群中的故障時。
?
$ kubectl get events
15m Warning FailedCreate replicaset/ml-pipeline-visualizationserver-865c7865bc
Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found
?
盡管這些信息十分有用,但它只是臨時的,保留時間最長為30天。如果出于審計或是故障診斷等目的,你可能想要把這些信息保留得更久,比如保存在像 Kafka 這樣更持久、高效的存儲中。然后你可以借助其他工具(如 Argo Events)或自己的應(yīng)用程序訂閱 Kafka 主題來對某些事件做出響應(yīng)。
?
構(gòu)建K8s事件處理鏈路
我們將構(gòu)建一整套 Kubernetes 事件處理鏈路,其主要構(gòu)成為:
- Eventrouter,開源的 Kubernetes event 處理器,它可以將所有集群事件整合匯總到某個 Kafka 主題中。
- Strimzi Operator,在 Kubernetes 中輕松管理 Kafka broker。
- 自定義 Go 二進制文件以將事件分發(fā)到相應(yīng)的 Kafka 主題中。
?
為什么要把事件分發(fā)到不同的主題中?比方說,在集群的每個命名空間中存在與特定客戶相關(guān)的 Kubernetes 資產(chǎn),那么在使用這些資產(chǎn)之前你當然希望將相關(guān)事件隔離開。
?
本示例中所有的配置、源代碼和詳細設(shè)置指示都已經(jīng)放在以下代碼倉庫中:
https://github.com/esys/kube-events-kafka
?
?
創(chuàng)建 Kafka broker 和主題
我選擇使用 Strimzi(strimzi.io/) 將 Kafka 部署到 Kubernetes 中。簡而言之,它是用于創(chuàng)建和更新 Kafka broker 和主題的。你可以在官方文檔中找到如何安裝該 Operator 的詳細說明:
https://strimzi.io/docs/operators/latest/overview.html
?
首先,創(chuàng)建一個新的 Kafka 集群:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: kube-events
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
default.replication.factor: 3
log.message.format.version: "2.6"
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
replicas: 3
storage:
type: jbod
volumes:
- deleteClaim: false
id: 0
size: 10Gi
type: persistent-claim
version: 2.6.0
zookeeper:
replicas: 3
storage:
deleteClaim: false
size: 10Gi
type: persistent-claim
?
然后創(chuàng)建 Kafka 主題來接收我們的事件:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: cluster-events
spec:
config:
retention.ms: 7200000
segment.bytes: 1073741824
partitions: 1
replicas: 1
?
設(shè)置 EventRouter
在本教程中使用 kubectl apply 命令即可,我們需要編輯 router 的配置,以指明我們的 Kafka 端點和要使用的主題:
apiVersion: v1
data:
config.json: |-
{
"sink": "kafka",
"kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",
"kafkaTopic": "cluster-events"
}
kind: ConfigMap
metadata:
name: eventrouter-cm
?
驗證設(shè)置是否正常工作
我們的 cluster-events Kafka 的主題現(xiàn)在應(yīng)該收到所有的事件。最簡單的方法是在主題上運行一個 consumer 來檢驗是否如此。為了方便期間,我們使用我們的一個 Kafka broker pods,它已經(jīng)有了所有必要的工具,你可以看到事件流:
kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \
--bootstrap-server kube-events-kafka-bootstrap:9092 \
--topic kube-events \
--from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...
?
編寫 Golang 消費者
現(xiàn)在我們想將我們的 Kubernetes 事件依據(jù)其所在的命名空間分發(fā)到多個主題中。我們將編寫一個 Golang 消費者和生產(chǎn)者來實現(xiàn)這一邏輯:
- 消費者部分在 cluster-events 主題上監(jiān)聽傳入的集群事件
- 生產(chǎn)者部分寫入與事件的命名空間相匹配的 Kafka 主題中
?
如果為Kafka配置了適當?shù)倪x項(默認情況),就不需要特地創(chuàng)建新的主題,因為 Kafka 會默認為你創(chuàng)建主題。這是 Kafka 客戶端 API 的一個非??岬墓δ?。
p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {
sugar.Fatal("cannot create producer")
}
defer p.Close()
c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {
sugar.Fatal("cannot create consumer")
}
defer c.Close()
run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
sugar.Infof("signal %s received, terminating", sig)
run = false
}()
var wg sync.WaitGroup
go func() {
wg.Add(1)
for run {
data, err := c.Read()
if err != nil {
sugar.Errorf("read event error: %v", err)
time.Sleep(5 * time.Second)
continue
}
if data == nil {
continue
}
msg, err := event.CreateDestinationMessage(data)
if err != nil {
sugar.Errorf("cannot create destination event: %v", err)
}
p.Write(msg.Topic, msg.Message)
}
sugar.Info("worker thread done")
wg.Done()
}()
wg.Wait()
?
完整代碼在此處:
https://github.com/esys/kube-events-kafka/blob/master/events-fanout/cmd/main.go
?
當然還有更高性能的選擇,這取決于預(yù)計的事件量和扇出(fanout)邏輯的復(fù)雜性。對于一個更強大的實現(xiàn),使用 Spark Structured Streaming 的消費者將是一個很好的選擇。
?
部署消費者
構(gòu)建并將二進制文件推送到 Docker 鏡像之后,我們將它封裝為 Kubernetes deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: events-fanout
name: events-fanout
spec:
replicas: 1
selector:
matchLabels:
app: events-fanout
template:
metadata:
labels:
app: events-fanout
spec:
containers:
- image: emmsys/events-fanout:latest
name: events-fanout
command: [ "./events-fanout"]
args:
- -logLevel=info
env:
- name: ENDPOINT
value: kube-events-kafka-bootstrap:9092
- name: TOPIC
value: cluster-events
?
檢查目標主題是否創(chuàng)建
現(xiàn)在,新的主題已經(jīng)創(chuàng)建完成:
kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name
kafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events
?
你會發(fā)現(xiàn)你的事件根據(jù)其命名空間整齊地存儲在這些主題中。
?
總結(jié)
訪問 Kubernetes 歷史事件日志可以使你對 Kubernetes 系統(tǒng)的狀態(tài)有了更好的了解,但這單靠 kubectl 比較難做到。更重要的是,它可以通過對事件做出反應(yīng)來實現(xiàn)集群或應(yīng)用運維自動化,并以此來構(gòu)建可靠、反應(yīng)靈敏的軟件。
?文章來源:http://www.zghlxwxcb.cn/news/detail-454329.html
原文鏈接:
https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0文章來源地址http://www.zghlxwxcb.cn/news/detail-454329.html
到了這里,關(guān)于如何借助Kafka持久化存儲K8S事件數(shù)據(jù)?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!