代碼地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go
一、簡(jiǎn)介
之前已經(jīng)介紹過一個(gè)操作kafka的go庫了,28.windows安裝kafka,Go操作kafka示例(sarama庫) ,但是這個(gè)庫比較老了,當(dāng)前比較流行的庫是github.com/segmentio/kafka-go
,所以本次我們就使用一下它。
我們?cè)?code>GitHub直接輸入kafka
并帶上language
標(biāo)簽為Go
時(shí),可以可以看到當(dāng)前get github.com/segmentio/kafka-go
庫是最流行的。
首先啟動(dòng)kafka的服務(wù)器,然后在項(xiàng)目中go get github.com/segmentio/kafka-go
接著我們就可以創(chuàng)建生產(chǎn)者和消費(fèi)者了,注意:在實(shí)際工作中,一般是一個(gè)服務(wù)為生產(chǎn)者,另一個(gè)服務(wù)作為消費(fèi)者,但是本案例中不涉及微服務(wù),就是演示一下生成和消費(fèi)的示例代碼,因此寫到了一個(gè)服務(wù)當(dāng)中。
代碼文件組織如下:
user.go :用于測(cè)試發(fā)送和消費(fèi)結(jié)構(gòu)體字符串消息
package model
type User struct {
Id int64 `json:"id"`
UserName string `json:"user_name"`
Age int64 `json:"age"`
}
二、生產(chǎn)者
啟動(dòng)zookeeper
和kafka
,并創(chuàng)建名為test
的topic
,步驟可以參考:28.windows安裝kafka,Go操作kafka示例(sarama庫)
producer.go
package producer
import (
"context"
"encoding/json"
"fmt"
"golang-trick/31-kafka-go/model"
"time"
"github.com/segmentio/kafka-go"
)
var (
topic = "user"
Producer *kafka.Writer
)
func init() {
Producer = &kafka.Writer{
Addr: kafka.TCP("localhost:9092"), //TCP函數(shù)參數(shù)為不定長(zhǎng)參數(shù),可以傳多個(gè)地址組成集群
Topic: topic,
Balancer: &kafka.Hash{}, // 用于對(duì)key進(jìn)行hash,決定消息發(fā)送到哪個(gè)分區(qū)
MaxAttempts: 0,
WriteBackoffMin: 0,
WriteBackoffMax: 0,
BatchSize: 0,
BatchBytes: 0,
BatchTimeout: 0,
ReadTimeout: 0,
WriteTimeout: time.Second, // kafka有時(shí)候可能負(fù)載很高,寫不進(jìn)去,那么超時(shí)后可以放棄寫入,用于可以丟消息的場(chǎng)景
RequiredAcks: kafka.RequireNone, // 不需要任何節(jié)點(diǎn)確認(rèn)就返回
Async: false,
Completion: nil,
Compression: 0,
Logger: nil,
ErrorLogger: nil,
Transport: nil,
AllowAutoTopicCreation: false, // 第一次發(fā)消息的時(shí)候,如果topic不存在,就自動(dòng)創(chuàng)建topic,工作中禁止使用
}
}
// 生產(chǎn)消息,發(fā)送user信息
func SendMessage(ctx context.Context, user *model.User) {
msgContent, err := json.Marshal(user)
if err != nil {
fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))
}
msg := kafka.Message{
Topic: "",
Partition: 0,
Offset: 0,
HighWaterMark: 0,
Key: []byte(fmt.Sprintf("%d", user.Id)),
Value: msgContent,
Headers: nil,
WriterData: nil,
Time: time.Time{},
}
err = Producer.WriteMessages(ctx, msg)
if err != nil {
fmt.Println(fmt.Sprintf("寫入kafka失敗,user:%v,err:%v", user, err))
}
}
main.go
: 測(cè)試消息發(fā)送
package main
import (
"context"
"fmt"
"golang-trick/31-kafka-go/model"
"golang-trick/31-kafka-go/producer"
)
func main() {
ctx := context.Background()
for i := 0; i < 5; i++ {
user := &model.User{
Id: int64(i + 1),
UserName: fmt.Sprintf("lym:%d", i),
Age: 18,
}
producer.SendMessage(ctx, user)
}
producer.Producer.Close() // 消息發(fā)送完畢后,關(guān)閉生產(chǎn)者
}
可以看到五條消息都發(fā)送成功
三、消費(fèi)者
consumer.go
package consumer
import (
"context"
"encoding/json"
"fmt"
"golang-trick/24-gin-learning/class08/model"
"time"
"github.com/segmentio/kafka-go"
)
var (
topic = "user"
Consumer *kafka.Reader
)
func init() {
Consumer = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"}, // broker地址 數(shù)組
GroupID: "test", // 消費(fèi)者組id,每個(gè)消費(fèi)者組可以消費(fèi)kafka的完整數(shù)據(jù),但是同一個(gè)消費(fèi)者組中的消費(fèi)者根據(jù)設(shè)置的分區(qū)消費(fèi)策略共同消費(fèi)kafka中的數(shù)據(jù)
GroupTopics: nil,
Topic: topic, // 消費(fèi)哪個(gè)topic
Partition: 0,
Dialer: nil,
QueueCapacity: 0,
MinBytes: 0,
MaxBytes: 0,
MaxWait: 0,
ReadBatchTimeout: 0,
ReadLagInterval: 0,
GroupBalancers: nil,
HeartbeatInterval: 0,
CommitInterval: time.Second, // offset 上報(bào)間隔
PartitionWatchInterval: 0,
WatchPartitionChanges: false,
SessionTimeout: 0,
RebalanceTimeout: 0,
JoinGroupBackoff: 0,
RetentionTime: 0,
StartOffset: kafka.FirstOffset, // 僅對(duì)新創(chuàng)建的消費(fèi)者組生效,從頭開始消費(fèi),工作中可能更常用從最新的開始消費(fèi)kafka.LastOffset
ReadBackoffMin: 0,
ReadBackoffMax: 0,
Logger: nil,
ErrorLogger: nil,
IsolationLevel: 0,
MaxAttempts: 0,
OffsetOutOfRangeError: false,
})
}
// 消費(fèi)消息
func ReadMessage(ctx context.Context) {
// 消費(fèi)者應(yīng)該通過協(xié)程一直開著,一直消費(fèi)
for {
if msg, err := Consumer.ReadMessage(ctx); err != nil {
fmt.Println(fmt.Sprintf("讀kafka失敗,err:%v", err))
break // 當(dāng)前消息讀取失敗時(shí),并不退出for終止所有后續(xù)消費(fèi),而是跳過該消息即可
} else {
user := &model.User{}
err := json.Unmarshal(msg.Value, user)
if err != nil {
fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))
break // 當(dāng)前消息處理失敗時(shí),并不退出for終止所有后續(xù)消費(fèi),而是跳過該消息即可
}
fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))
}
}
}
main.go: 測(cè)試接收消息
package main
import (
"context"
"fmt"
"golang-trick/31-kafka-go/consumer"
"os"
"os/signal"
"syscall"
)
// 需要監(jiān)聽信息2和15,在程序退出時(shí),關(guān)閉Consumer
func listenSignal() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
sig := <-c
fmt.Printf("收到信號(hào) %s ", sig.String())
if consumer.Consumer != nil {
consumer.Consumer.Close()
}
os.Exit(0)
}
func main() {
ctx := context.Background()
//for i := 0; i < 5; i++ {
// user := &model.User{
// Id: int64(i + 1),
// UserName: fmt.Sprintf("lym:%d", i),
// Age: 18,
// }
// producer.SendMessage(ctx, user)
//}
//producer.Producer.Close()
go consumer.ReadMessage(ctx)
listenSignal()
}
啟動(dòng)后,因?yàn)槲覀冊(cè)O(shè)置的從頭開始消費(fèi),所以原有的五條消息消費(fèi)成功,然后在等待著隊(duì)列中有消息時(shí)繼續(xù)消費(fèi)
我們可以通過kafka
客戶端發(fā)兩條消息,看看我們的消費(fèi)者程序是否能消費(fèi)到文章來源:http://www.zghlxwxcb.cn/news/detail-840758.html
最后關(guān)閉服務(wù)停止消費(fèi)文章來源地址http://www.zghlxwxcb.cn/news/detail-840758.html
到了這里,關(guān)于51.Go操作kafka示例(kafka-go庫)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!