Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),本文介紹了如何使用kafka-go這個庫實現(xiàn)Go語言與kafka的交互。
Go社區(qū)中目前有三個比較常用的kafka客戶端庫 , 它們各有特點。
首先是IBM/sarama(這個庫已經(jīng)由Shopify轉給了IBM),之前我寫過一篇使用sarama操作Kafka的教程,相較于sarama, kafka-go 更簡單、更易用。
segmentio/kafka-go 是純Go實現(xiàn),提供了與kafka交互的低級別和高級別兩套API,同時也支持Context。
此外社區(qū)中另一個比較常用的confluentinc/confluent-kafka-go,它是一個基于cgo的librdkafka包裝,在項目中使用它會引入對C庫的依賴。
準備Kafka環(huán)境
這里推薦使用Docker Compose快速搭建一套本地開發(fā)環(huán)境。
以下docker-compose.yml文件用來搭建一套單節(jié)點zookeeper和單節(jié)點kafka環(huán)境,并且在8080端口提供kafka-ui管理界面。
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka1
environment:
DYNAMIC_CONFIG_ENABLED: "TRUE"
將上述docker-compose.yml文件在本地保存,在同一目錄下執(zhí)行以下命令啟動容器。
docker-compose up -d
容器啟動后,使用瀏覽器打開127.0.0.1:8080 即可看到如下kafka-ui界面。
點擊頁面右側的“Configure new cluster”按鈕,配置kafka服務連接信息。
填寫完信息后,點擊頁面下方的“Submit”按鈕提交即可。
安裝kafka-go
執(zhí)行以下命令下載 kafka-go依賴。
go get github.com/segmentio/kafka-go
注意:kafka-go 需要 Go 1.15或更高版本。
kafka-go使用指南
kafka-go 提供了兩套與Kafka交互的API。
- 低級別( low-level):基于與 Kafka 服務器的原始網(wǎng)絡連接實現(xiàn)。
- 高級別(high-level):對于常用讀寫操作封裝了一套更易用的API。
通常建議直接使用高級別的交互API。
Connection
Conn 類型是 kafka-go 包的核心。它代表與 Kafka broker之間的連接?;谒鼘崿F(xiàn)了一套與Kafka交互的低級別 API。
發(fā)送消息
下面是連接至Kafka之后,使用Conn發(fā)送消息的代碼示例。
// writeByConn 基于Conn發(fā)送消息
func writeByConn() {
topic := "my-topic"
partition := 0
// 連接至Kafka集群的Leader節(jié)點
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 設置發(fā)送消息的超時時間
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// 發(fā)送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 關閉連接
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
消費消息
// readByConn 連接至kafka后接收消息
func readByConn() {
// 指定要連接的topic和partition
topic := "my-topic"
partition := 0
// 連接至Kafka的leader節(jié)點
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 設置讀取超時時間
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
// 讀取一批消息,得到的batch是一系列消息的迭代器
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
// 遍歷讀取消息
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
// 關閉batch
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
// 關閉連接
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
使用batch.Read更高效一些,但是需要根據(jù)消息長度選擇合適的buffer(上述代碼中的b),如果傳入的buffer太?。ㄏ⒀b不下)就會返回io.ErrShortBuffer錯誤。
如果不考慮內存分配的效率問題,也可以按以下代碼使用batch.ReadMessage讀取消息。
for {
msg, err := batch.ReadMessage()
if err != nil {
break
}
fmt.Println(string(msg.Value))
}
創(chuàng)建topic
當Kafka關閉自動創(chuàng)建topic的設置時,可按如下方式創(chuàng)建topic。
// createTopicByConn 創(chuàng)建topic
func createTopicByConn() {
// 指定要創(chuàng)建的topic名稱
topic := "my-topic"
// 連接至任意kafka節(jié)點
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 獲取當前控制節(jié)點信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
// 連接至leader節(jié)點
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
// 創(chuàng)建topic
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
通過非leader節(jié)點連接leader節(jié)點
下面的示例代碼演示了如何通過已有的非leader節(jié)點的Conn,連接至 leader節(jié)點。
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 獲取當前控制節(jié)點信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer connLeader.Close()
獲取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
// 遍歷所有分區(qū)取topic
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
Reader
Reader是由 kafka-go 包提供的另一個概念,對于從單個主題-分區(qū)(topic-partition)消費消息這種典型場景,使用它能夠簡化代碼。Reader 還實現(xiàn)了自動重連和偏移量管理,并支持使用 Context 支持異步取消和超時的 API。
注意: 當進程退出時,必須在 Reader 上調用 Close() 。Kafka服務器需要一個優(yōu)雅的斷開連接來阻止它繼續(xù)嘗試向已連接的客戶端發(fā)送消息。如果進程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)終止,那么下面給出的示例不會調用 Close()。當同一topic上有新Reader連接時,可能導致延遲(例如,新進程啟動或新容器運行)。在這種場景下應使用signal.Notify處理程序在進程關閉時關閉Reader。
消費消息
下面的代碼演示了如何使用Reader連接至Kafka消費消息。
// readByReader 通過Reader接收消息
func readByReader() {
// 創(chuàng)建Reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42) // 設置Offset
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前關閉Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
消費者組
kafka-go支持消費者組,包括broker管理的offset。要啟用消費者組,只需在 ReaderConfig 中指定 GroupID。
使用消費者組時,ReadMessage 會自動提交偏移量。
// 創(chuàng)建一個reader,指定GroupID,從 topic-A 消費消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id", // 指定消費者組id
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前關閉Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
在使用消費者組時會有以下限制:
(*Reader).SetOffset 當設置了GroupID時會返回錯誤
(*Reader).Offset 當設置了GroupID時會永遠返回 -1
(*Reader).Lag 當設置了GroupID時會永遠返回 -1
(*Reader).ReadLag 當設置了GroupID時會返回錯誤
(*Reader).Stats 當設置了GroupID時會返回一個-1的分區(qū)
顯式提交
kafka-go 也支持顯式提交。當需要顯式提交時不要調用 ReadMessage,而是調用 FetchMessage獲取消息,然后調用 CommitMessages 顯式提交。
ctx := context.Background()
for {
// 獲取消息
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
// 處理消息
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 顯式提交
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
在消費者組中提交消息時,具有給定主題/分區(qū)的最大偏移量的消息確定該分區(qū)的提交偏移量的值。例如,如果通過調用 FetchMessage 獲取了單個分區(qū)的偏移量為 1、2 和 3 的消息,則使用偏移量為3的消息調用 CommitMessages 也將導致該分區(qū)的偏移量為 1 和 2 的消息被提交。
管理提交間隔
默認情況下,調用CommitMessages將同步向Kafka提交偏移量。為了提高性能,可以在ReaderConfig中設置CommitInterval來定期向Kafka提交偏移。
// 創(chuàng)建一個reader從 topic-A 消費消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // 每秒刷新一次提交給 Kafka
})
Writer
向Kafka發(fā)送消息,除了使用基于Conn的低級API,kafka-go包還提供了更高級別的 Writer 類型。大多數(shù)情況下使用Writer即可滿足條件,它支持以下特性。
- 對錯誤進行自動重試和重新連接。
- 在可用分區(qū)之間可配置的消息分布。
- 向Kafka同步或異步寫入消息。
- 使用Context的異步取消。
- 關閉時清除掛起的消息以支持正常關閉。
- 在發(fā)布消息之前自動創(chuàng)建不存在的topic。
發(fā)送消息
// 創(chuàng)建一個writer 向topic-A發(fā)送消息
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{}, // 指定分區(qū)的balancer模式為最小字節(jié)分布
RequiredAcks: kafka.RequireAll, // ack模式
Async: true, // 異步
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
創(chuàng)建不存在的topic
如果給Writer配置了AllowAutoTopicCreation:true,那么當發(fā)送消息至某個不存在的topic時,則會自動創(chuàng)建topic。
// 創(chuàng)建不存在的topic
// 如果給Writer配置了AllowAutoTopicCreation:true,那么當發(fā)送消息至某個不存在的topic時,則會自動創(chuàng)建topic。
func writeByWriter2() {
writer := kafka.Writer{
Addr: kafka.TCP("192.168.2.204:9092"),
Topic: "kafka-test-topic",
AllowAutoTopicCreation: true, //自動創(chuàng)建topic
}
messages := []kafka.Message{
{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
{
Key: []byte("Key-C"),
Value: []byte("Tow!"),
},
}
const retries = 3
//重試3次
for i := 0; i < retries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := writer.WriteMessages(ctx, messages...)
if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}
if err != nil {
log.Fatal("unexpected error %v", err)
}
break
}
//關閉Writer
if err := writer.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
寫入多個topic
通常,WriterConfig.Topic用于初始化單個topic的Writer。通過去掉WriterConfig中的Topic配置,分別設置每條消息的message.topic,可以實現(xiàn)將消息發(fā)送至多個topic。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// 注意: 當此處不設置Topic時,后續(xù)的每條消息都需要指定Topic
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
// 注意: 每條消息都需要指定一個 Topic, 否則就會報錯
kafka.Message{
Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Topic: "topic-B",
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Topic: "topic-C",
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
注意:Writer中的Topic和Message中的Topic是互斥的,同一時刻有且只能設置一處。
其他配置
TLS
對于基本的 Conn 類型或在 Reader/Writer 配置中,可以在Dialer中設置TLS選項。如果 TLS 字段為空,則它將不啟用TLS 連接。
注意:不在Conn/Reder/Writer上配置TLS,連接到啟用TLS的Kafka集群,可能會出現(xiàn)io.ErrUnexpectedEOF錯誤。文章來源:http://www.zghlxwxcb.cn/news/detail-858831.html
Connection
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
創(chuàng)建Writer時可以按如下方式指定TLS配置。文章來源地址http://www.zghlxwxcb.cn/news/detail-858831.html
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{}, // 指定TLS配置
},
}
到了這里,關于Go操作Kafka之kafka-go的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!