国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

51.Go操作kafka示例(kafka-go庫)

這篇具有很好參考價(jià)值的文章主要介紹了51.Go操作kafka示例(kafka-go庫)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

代碼地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go

一、簡(jiǎn)介

之前已經(jīng)介紹過一個(gè)操作kafka的go庫了,28.windows安裝kafka,Go操作kafka示例(sarama庫) ,但是這個(gè)庫比較老了,當(dāng)前比較流行的庫是github.com/segmentio/kafka-go,所以本次我們就使用一下它。

我們?cè)?code>GitHub直接輸入kafka并帶上language標(biāo)簽為Go時(shí),可以可以看到當(dāng)前get github.com/segmentio/kafka-go庫是最流行的。
go kafka庫,go,golang,kafka,開發(fā)語言

首先啟動(dòng)kafka的服務(wù)器,然后在項(xiàng)目中go get github.com/segmentio/kafka-go

接著我們就可以創(chuàng)建生產(chǎn)者和消費(fèi)者了,注意:在實(shí)際工作中,一般是一個(gè)服務(wù)為生產(chǎn)者,另一個(gè)服務(wù)作為消費(fèi)者,但是本案例中不涉及微服務(wù),就是演示一下生成和消費(fèi)的示例代碼,因此寫到了一個(gè)服務(wù)當(dāng)中。代碼文件組織如下:
go kafka庫,go,golang,kafka,開發(fā)語言
user.go :用于測(cè)試發(fā)送和消費(fèi)結(jié)構(gòu)體字符串消息

package model

type User struct {
	Id       int64  `json:"id"`
	UserName string `json:"user_name"`
	Age      int64  `json:"age"`
}

二、生產(chǎn)者

啟動(dòng)zookeeperkafka,并創(chuàng)建名為testtopic,步驟可以參考:28.windows安裝kafka,Go操作kafka示例(sarama庫)

producer.go

package producer

import (
	"context"
	"encoding/json"
	"fmt"
	"golang-trick/31-kafka-go/model"
	"time"

	"github.com/segmentio/kafka-go"
)

var (
	topic    = "user"
	Producer *kafka.Writer
)

func init() {
	Producer = &kafka.Writer{
		Addr:                   kafka.TCP("localhost:9092"), //TCP函數(shù)參數(shù)為不定長(zhǎng)參數(shù),可以傳多個(gè)地址組成集群
		Topic:                  topic,
		Balancer:               &kafka.Hash{}, // 用于對(duì)key進(jìn)行hash,決定消息發(fā)送到哪個(gè)分區(qū)
		MaxAttempts:            0,
		WriteBackoffMin:        0,
		WriteBackoffMax:        0,
		BatchSize:              0,
		BatchBytes:             0,
		BatchTimeout:           0,
		ReadTimeout:            0,
		WriteTimeout:           time.Second,       // kafka有時(shí)候可能負(fù)載很高,寫不進(jìn)去,那么超時(shí)后可以放棄寫入,用于可以丟消息的場(chǎng)景
		RequiredAcks:           kafka.RequireNone, // 不需要任何節(jié)點(diǎn)確認(rèn)就返回
		Async:                  false,
		Completion:             nil,
		Compression:            0,
		Logger:                 nil,
		ErrorLogger:            nil,
		Transport:              nil,
		AllowAutoTopicCreation: false, // 第一次發(fā)消息的時(shí)候,如果topic不存在,就自動(dòng)創(chuàng)建topic,工作中禁止使用
	}
}

// 生產(chǎn)消息,發(fā)送user信息
func SendMessage(ctx context.Context, user *model.User) {
	msgContent, err := json.Marshal(user)
	if err != nil {
		fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))
	}
	msg := kafka.Message{
		Topic:         "",
		Partition:     0,
		Offset:        0,
		HighWaterMark: 0,
		Key:           []byte(fmt.Sprintf("%d", user.Id)),
		Value:         msgContent,
		Headers:       nil,
		WriterData:    nil,
		Time:          time.Time{},
	}

	err = Producer.WriteMessages(ctx, msg)
	if err != nil {
		fmt.Println(fmt.Sprintf("寫入kafka失敗,user:%v,err:%v", user, err))
	}
}

main.go: 測(cè)試消息發(fā)送

package main

import (
	"context"
	"fmt"
	"golang-trick/31-kafka-go/model"
	"golang-trick/31-kafka-go/producer"
)

func main() {
	ctx := context.Background()
	for i := 0; i < 5; i++ {
		user := &model.User{
			Id:       int64(i + 1),
			UserName: fmt.Sprintf("lym:%d", i),
			Age:      18,
		}
		producer.SendMessage(ctx, user)
	}
	producer.Producer.Close() // 消息發(fā)送完畢后,關(guān)閉生產(chǎn)者
}

可以看到五條消息都發(fā)送成功
go kafka庫,go,golang,kafka,開發(fā)語言

三、消費(fèi)者

consumer.go

package consumer

import (
	"context"
	"encoding/json"
	"fmt"
	"golang-trick/24-gin-learning/class08/model"
	"time"

	"github.com/segmentio/kafka-go"
)

var (
	topic    = "user"
	Consumer *kafka.Reader
)

func init() {
	Consumer = kafka.NewReader(kafka.ReaderConfig{
		Brokers:                []string{"localhost:9092"}, // broker地址 數(shù)組
		GroupID:                "test",                     // 消費(fèi)者組id,每個(gè)消費(fèi)者組可以消費(fèi)kafka的完整數(shù)據(jù),但是同一個(gè)消費(fèi)者組中的消費(fèi)者根據(jù)設(shè)置的分區(qū)消費(fèi)策略共同消費(fèi)kafka中的數(shù)據(jù)
		GroupTopics:            nil,
		Topic:                  topic, // 消費(fèi)哪個(gè)topic
		Partition:              0,
		Dialer:                 nil,
		QueueCapacity:          0,
		MinBytes:               0,
		MaxBytes:               0,
		MaxWait:                0,
		ReadBatchTimeout:       0,
		ReadLagInterval:        0,
		GroupBalancers:         nil,
		HeartbeatInterval:      0,
		CommitInterval:         time.Second, // offset 上報(bào)間隔
		PartitionWatchInterval: 0,
		WatchPartitionChanges:  false,
		SessionTimeout:         0,
		RebalanceTimeout:       0,
		JoinGroupBackoff:       0,
		RetentionTime:          0,
		StartOffset:            kafka.FirstOffset, // 僅對(duì)新創(chuàng)建的消費(fèi)者組生效,從頭開始消費(fèi),工作中可能更常用從最新的開始消費(fèi)kafka.LastOffset
		ReadBackoffMin:         0,
		ReadBackoffMax:         0,
		Logger:                 nil,
		ErrorLogger:            nil,
		IsolationLevel:         0,
		MaxAttempts:            0,
		OffsetOutOfRangeError:  false,
	})
}

// 消費(fèi)消息
func ReadMessage(ctx context.Context) {
	// 消費(fèi)者應(yīng)該通過協(xié)程一直開著,一直消費(fèi)
	for {
		if msg, err := Consumer.ReadMessage(ctx); err != nil {
			fmt.Println(fmt.Sprintf("讀kafka失敗,err:%v", err))
			break // 當(dāng)前消息讀取失敗時(shí),并不退出for終止所有后續(xù)消費(fèi),而是跳過該消息即可
		} else {
			user := &model.User{}
			err := json.Unmarshal(msg.Value, user)
			if err != nil {
				fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))
				break // 當(dāng)前消息處理失敗時(shí),并不退出for終止所有后續(xù)消費(fèi),而是跳過該消息即可
			}

			fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))
		}
	}
}

main.go: 測(cè)試接收消息

package main

import (
	"context"
	"fmt"
	"golang-trick/31-kafka-go/consumer"
	"os"
	"os/signal"
	"syscall"
)

// 需要監(jiān)聽信息2和15,在程序退出時(shí),關(guān)閉Consumer
func listenSignal() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	sig := <-c
	fmt.Printf("收到信號(hào) %s ", sig.String())
	if consumer.Consumer != nil {
		consumer.Consumer.Close()
	}
	os.Exit(0)
}

func main() {
	ctx := context.Background()
	//for i := 0; i < 5; i++ {
	//	user := &model.User{
	//		Id:       int64(i + 1),
	//		UserName: fmt.Sprintf("lym:%d", i),
	//		Age:      18,
	//	}
	//	producer.SendMessage(ctx, user)
	//}
	//producer.Producer.Close()

	go consumer.ReadMessage(ctx)
	listenSignal()
}

啟動(dòng)后,因?yàn)槲覀冊(cè)O(shè)置的從頭開始消費(fèi),所以原有的五條消息消費(fèi)成功,然后在等待著隊(duì)列中有消息時(shí)繼續(xù)消費(fèi)
go kafka庫,go,golang,kafka,開發(fā)語言
我們可以通過kafka客戶端發(fā)兩條消息,看看我們的消費(fèi)者程序是否能消費(fèi)到

go kafka庫,go,golang,kafka,開發(fā)語言
最后關(guān)閉服務(wù)停止消費(fèi)
go kafka庫,go,golang,kafka,開發(fā)語言文章來源地址http://www.zghlxwxcb.cn/news/detail-840758.html

到了這里,關(guān)于51.Go操作kafka示例(kafka-go庫)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Golang:Go語言結(jié)構(gòu)

    在我們開始學(xué)習(xí) Go 編程語言的基礎(chǔ)構(gòu)建模塊前,讓我們先來了解 Go 語言最簡(jiǎn)單程序的結(jié)構(gòu)。 Go 語言的基礎(chǔ)組成有以下幾個(gè)部分: 包聲明 引入包 函數(shù) 變量 語句 表達(dá)式 注釋 接下來讓我們來看下簡(jiǎn)單的代碼,該代碼輸出了\\\"Hello World!\\\": 讓我們來看下以上程序的各個(gè)部分: 第一

    2024年02月10日
    瀏覽(20)
  • 【Golang】三分鐘讓你快速了解Go語言&為什么我們需要Go語言?

    【Golang】三分鐘讓你快速了解Go語言&為什么我們需要Go語言?

    博主簡(jiǎn)介: 努力學(xué)習(xí)的大一在校計(jì)算機(jī)專業(yè)學(xué)生,熱愛學(xué)習(xí)和創(chuàng)作。目前在學(xué)習(xí)和分享:數(shù)據(jù)結(jié)構(gòu)、Go,Java等相關(guān)知識(shí)。 博主主頁: @是瑤瑤子啦 所屬專欄: Go語言核心編程 近期目標(biāo): 寫好專欄的每一篇文章 Go 語言從 2009 年 9 月 21 日開始作為谷歌公司 20% 兼職項(xiàng)目,即相關(guān)

    2023年04月21日
    瀏覽(29)
  • Go語言(Golang)數(shù)據(jù)庫編程

    要想連接到 SQL 數(shù)據(jù)庫,首先需要加載目標(biāo)數(shù)據(jù)庫的驅(qū)動(dòng),驅(qū)動(dòng)里面包含著于該數(shù)據(jù)庫交互的邏輯。 sql.Open() 數(shù)據(jù)庫驅(qū)動(dòng)的名稱 數(shù)據(jù)源名稱 得到一個(gè)指向 sql.DB 這個(gè) struct 的指針 sql.DB 是用來操作數(shù)據(jù)庫的,它代表了0個(gè)或者多個(gè)底層連接的池,這些連接由sql 包來維護(hù),sql 包會(huì)

    2024年02月03日
    瀏覽(93)
  • 【Golang】VScode配置Go語言環(huán)境

    【Golang】VScode配置Go語言環(huán)境

    安裝VScode請(qǐng)參考我的上一篇博客:VScode安裝_?548的博客-CSDN博客 接下來我們直接進(jìn)入正題: Go語言(又稱Golang)是一種開源的編程語言,由Google開發(fā)并于2009年首次發(fā)布。Go語言具有簡(jiǎn)潔、高效、可靠和易于閱讀的特點(diǎn),被設(shè)計(jì)用于解決大型項(xiàng)目的開發(fā)需求。它結(jié)合了靜態(tài)類型

    2024年02月03日
    瀏覽(24)
  • 【GoLang】MAC安裝Go語言環(huán)境

    【GoLang】MAC安裝Go語言環(huán)境

    小試牛刀 首先安裝VScode軟件 或者pycharm mac安裝brew軟件? brew install go 報(bào)了一個(gè)錯(cuò)誤 不提供這個(gè)支持? 重新brew install go 之后又重新brew reinstall go 使用go version 可以看到go 的版本 使用go env ?可以看到go安裝后的配置 配置一個(gè)環(huán)境變量 vim ~/.zshrc, ?

    2024年02月15日
    瀏覽(28)
  • Golang區(qū)塊鏈錢包_go語言錢包

    Golang區(qū)塊鏈錢包_go語言錢包

    Golang區(qū)塊鏈錢包的特點(diǎn) Golang區(qū)塊鏈錢包具有以下幾個(gè)特點(diǎn): 1. 高性能 Golang是一種編譯型語言,具有快速的執(zhí)行速度和較低的內(nèi)存消耗。這使得Golang區(qū)塊鏈錢包在處理大規(guī)模交易數(shù)據(jù)時(shí)表現(xiàn)出色,能夠滿足高性能的需求。 2. 并發(fā)支持 Golang內(nèi)置了輕量級(jí)線程——goroutine,以及

    2024年04月15日
    瀏覽(28)
  • Golang(Go語言)IP地址轉(zhuǎn)換函數(shù)

    Golang(Go語言)IP地址轉(zhuǎn)換函數(shù)

    String形式的IP地址和Int類型互轉(zhuǎn)函數(shù) 代碼 輸出如下: ?

    2024年02月05日
    瀏覽(21)
  • 【GoLang】哪些大公司正在使用Go語言

    【GoLang】哪些大公司正在使用Go語言

    前言: 隨著計(jì)算機(jī)科學(xué)和軟件開發(fā)的快速發(fā)展,編程語言的選擇變得愈加關(guān)鍵。 在這個(gè)多元化的編程語境中,Go語言(簡(jiǎn)稱Golang)以其簡(jiǎn)潔、高效、并發(fā)處理能力等特性逐漸受到業(yè)界關(guān)注。 越來越多的大型科技公司紛紛采用Go語言作為其軟件開發(fā)的首選語言,這種趨勢(shì)反映了

    2024年02月04日
    瀏覽(17)
  • 【Go語言】Golang保姆級(jí)入門教程 Go初學(xué)者chapter3

    【Go語言】Golang保姆級(jí)入門教程 Go初學(xué)者chapter3

    下劃線“_”本身在Go中一個(gè)特殊的標(biāo)識(shí)符,成為空標(biāo)識(shí)符??梢源砣魏纹渌臉?biāo)識(shí)符,但是他對(duì)應(yīng)的值就會(huì)被忽略 僅僅被作為站維度使用, 不能作為標(biāo)識(shí)符使用 因?yàn)镚o語言中沒有private public 所以標(biāo)記變量首字母大寫代表其他包可以使用 小寫就是不可使用的 注意:Go語言中

    2024年02月13日
    瀏覽(92)
  • 【Go語言】Golang保姆級(jí)入門教程 Go初學(xué)者chapter2

    【Go語言】Golang保姆級(jí)入門教程 Go初學(xué)者chapter2

    setting的首選項(xiàng) 一個(gè)程序就是一個(gè)世界 變量是程序的基本組成單位 變量的使用步驟 [外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-zuxG8imp-1691479164956)(https://cdn.staticaly.com/gh/hudiework/img@main/image-20230726152905139.png)] 變量表示內(nèi)存中的一個(gè)存儲(chǔ)區(qū) 注意:

    2024年02月14日
    瀏覽(577)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包