国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Go操作各大消息隊列教程(RabbitMQ、Kafka)

這篇具有很好參考價值的文章主要介紹了Go操作各大消息隊列教程(RabbitMQ、Kafka)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Go操作各大消息隊列教程

1 RabbitMQ

1.1 概念

①基本名詞

當(dāng)前市面上mq的產(chǎn)品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐獻給Apache的RocketMQ。甚至連redis這種NoSQL都支持MQ的功能。

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

  1. Broker:表示消息隊列服務(wù)實體
  2. Virtual Host:虛擬主機。標(biāo)識一批交換機、消息隊列和相關(guān)對象。vhost是AMQP概念的基礎(chǔ),必須在鏈接時指定,RabbitMQ默認(rèn)的vhost是 /。
    • AMQP(Advanced Message Queuing Protocol)高級消息隊列協(xié)議
  3. Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列。
  4. Queue:消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
②常見模式
1. simple簡單模式

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

消息的消費者(consumer) 監(jiān)聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經(jīng)從隊列中消失了,造成消息的丟失)

2. worker工作模式

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

多個消費者從一個隊列中爭搶消息

  • (隱患,高并發(fā)情況下,默認(rèn)會產(chǎn)生某一個消息被多個消費者共同使用,可以設(shè)置一個開關(guān)(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個消費者使用)
  • 應(yīng)用場景:紅包;大項目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到消息隊列中,空閑的系統(tǒng)自動爭搶)
3. publish/subscribe發(fā)布訂閱(共享資源)

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

消費者訂閱消息,然后從訂閱的隊列中獲取消息進行消費。

  • X代表交換機rabbitMQ內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機,交換機發(fā)布訂閱把消息發(fā)送到所有消息隊列中,對應(yīng)消息隊列的消費者拿到消息進行消費
  • 相關(guān)場景:郵件群發(fā),群聊天,廣播(廣告)
4. routing路由模式

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

  • 交換機根據(jù)路由規(guī)則,將消息路由到不同的隊列中
  • 消息生產(chǎn)者將消息發(fā)送給交換機按照路由判斷,路由是字符串(info) 當(dāng)前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機根據(jù)路由的key,只能匹配上路由key對應(yīng)的消息隊列,對應(yīng)的消費者才能消費消息;
5. topic主題模式(路由模式的一種)

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

  • 星號井號代表通配符
  • 星號代表多個單詞,井號代表一個單詞
  • 路由功能添加模糊匹配
  • 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機
  • 交換機根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊列,由隊列的監(jiān)聽消費者接收消息消費

1.2 搭建(docker方式)

①拉取鏡像
# 拉取鏡像
docker pull rabbitmq:3.7-management
②創(chuàng)建并啟動容器
# 創(chuàng)建并運行容器
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management
#5672是項目中連接rabbitmq的端口(我這里映射的是5672),15672是rabbitmq的web管理界面端口(我映射為15672)

# 輸入網(wǎng)址http://ip:15672即可進入rabbitmq的web管理頁面,賬戶密碼:guest / guest
③web界面創(chuàng)建用戶和virtual host

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

下面為了我們后續(xù)的操作,首先我們新建一個Virtual Host并且給他分配一個用戶名,用來隔離數(shù)據(jù),根據(jù)自己需要自行創(chuàng)建

  1. 新增virtual host
    Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ
  2. 新增用戶
    Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ
  3. 點擊新建好的用戶,設(shè)置其host
    Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ
    Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ
  4. 最終效果
    Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

1.3 代碼操作

①RabbitMQ struct:包含創(chuàng)建、消費、生產(chǎn)消息
package RabbitMQ

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

//amqp:// 賬號 密碼@地址:端口號/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"

type RabbitMQ struct {
	//連接
	conn *amqp.Connection
	//管道
	channel *amqp.Channel
	//隊列名稱
	QueueName string
	//交換機
	Exchange string
	//key Simple模式 幾乎用不到
	Key string
	//連接信息
	Mqurl string
}

//創(chuàng)建RabbitMQ結(jié)構(gòu)體實例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
	var err error
	//創(chuàng)建rabbitmq連接
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "創(chuàng)建連接錯誤!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "獲取channel失敗")
	return rabbitmq
}

//斷開channel和connection
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

//錯誤處理函數(shù)
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", message, err)
		panic(fmt.Sprintf("%s:%s", message, err))
	}
}

//簡單模式step:1。創(chuàng)建簡單模式下RabbitMQ實例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

//訂閱模式創(chuàng)建rabbitmq實例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
	//創(chuàng)建rabbitmq實例
	rabbitmq := NewRabbitMQ("", exchangeName, "")
	var err error
	//獲取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
	//獲取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel!")
	return rabbitmq
}

//訂閱模式生成
func (r *RabbitMQ) PublishPub(message string) {
	//嘗試創(chuàng)建交換機,不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機名稱
		r.Exchange,
		//交換機類型 廣播類型
		"fanout",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")

	//2 發(fā)送消息
	err = r.channel.Publish(
		r.Exchange,
		"",
		false,
		false,
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//訂閱模式消費端代碼
func (r *RabbitMQ) RecieveSub() {
	//嘗試創(chuàng)建交換機,不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機名稱
		r.Exchange,
		//交換機類型 廣播類型
		"fanout",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//2試探性創(chuàng)建隊列,創(chuàng)建隊列
	q, err := r.channel.QueueDeclare(
		"", //隨機生產(chǎn)隊列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//綁定隊列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,這里的key要為空
		"",
		r.Exchange,
		false,
		nil,
	)
	//消費消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出請按 Ctrl+C")
	<-forever
}

//話題模式 創(chuàng)建RabbitMQ實例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
	//創(chuàng)建rabbitmq實例
	rabbitmq := NewRabbitMQ("", exchagne, routingKey)
	var err error
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

//話題模式發(fā)送信息
func (r *RabbitMQ) PublishTopic(message string) {
	//嘗試創(chuàng)建交換機,不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機名稱
		r.Exchange,
		//交換機類型 話題模式
		"topic",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "topic failed to declare an excha"+"nge")
	//2發(fā)送信息
	err = r.channel.Publish(
		r.Exchange,
		//要設(shè)置
		r.Key,
		false,
		false,
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//話題模式接收信息
//要注意key
//其中* 用于匹配一個單詞,#用于匹配多個單詞(可以是零個)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
	//嘗試創(chuàng)建交換機,不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機名稱
		r.Exchange,
		//交換機類型 話題模式
		"topic",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an exchange")
	//2試探性創(chuàng)建隊列,創(chuàng)建隊列
	q, err := r.channel.QueueDeclare(
		"", //隨機生產(chǎn)隊列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//綁定隊列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,這里的key要為空
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	//消費消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出請按 Ctrl+C")
	<-forever
}

//路由模式 創(chuàng)建RabbitMQ實例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
	//創(chuàng)建rabbitmq實例
	rabbitmq := NewRabbitMQ("", exchagne, routingKey)
	var err error
	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
	rabbitmq.failOnErr(err, "failed     to connect rabbingmq!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

//路由模式發(fā)送信息
func (r *RabbitMQ) PublishRouting(message string) {
	//嘗試創(chuàng)建交換機,不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機名稱
		r.Exchange,
		//交換機類型 廣播類型
		"direct",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//發(fā)送信息
	err = r.channel.Publish(
		r.Exchange,
		//要設(shè)置
		r.Key,
		false,
		false,
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
	//嘗試創(chuàng)建交換機,不存在創(chuàng)建
	err := r.channel.ExchangeDeclare(
		//交換機名稱
		r.Exchange,
		//交換機類型 廣播類型
		"direct",
		//是否持久化
		true,
		//是否字段刪除
		false,
		//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an excha"+"nge")
	//2試探性創(chuàng)建隊列,創(chuàng)建隊列
	q, err := r.channel.QueueDeclare(
		"", //隨機生產(chǎn)隊列名稱
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare a queue")
	//綁定隊列到exchange中
	err = r.channel.QueueBind(
		q.Name,
		//在pub/sub模式下,這里的key要為空
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	//消費消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	forever := make(chan bool)
	go func() {
		for d := range message {
			log.Printf("Received a message:%s,", d.Body)
		}
	}()
	fmt.Println("退出請按 Ctrl+C")
	<-forever
}

//簡單模式Step:2、簡單模式下生產(chǎn)代碼
func (r *RabbitMQ) PublishSimple(message string) {
	//1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建
	//優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中
	_, err := r.channel.QueueDeclare(
		//隊列名稱
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除 當(dāng)最后一個消費者斷開連接之后,是否把消息從隊列中刪除
		false,
		//是否具有排他性 true表示自己可見 其他用戶不能訪問
		false,
		//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
		false,
		//額外數(shù)據(jù)
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}

	//2.發(fā)送消息到隊列中
	r.channel.Publish(
		//默認(rèn)的Exchange交換機是default,類型是direct直接類型
		r.Exchange,
		//要賦值的隊列名稱
		r.QueueName,
		//如果為true,根據(jù)exchange類型和routkey規(guī)則,如果無法找到符合條件的隊列那么會把發(fā)送的消息返回給發(fā)送者
		false,
		//如果為true,當(dāng)exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有綁定消費者,則會把消息還給發(fā)送者
		false,
		//消息
		amqp.Publishing{
			//類型
			ContentType: "text/plain",
			//消息
			Body: []byte(message),
		})
}

func (r *RabbitMQ) ConsumeSimple() {
	//1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建
	//優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中
	_, err := r.channel.QueueDeclare(
		//隊列名稱
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除 當(dāng)最后一個消費者斷開連接之后,是否把消息從隊列中刪除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//額外數(shù)據(jù)
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	//接收消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用來區(qū)分多個消費者
		"",
		//是否自動應(yīng)答
		true,
		//是否具有排他性
		false,
		//如果設(shè)置為true,表示不能同一個connection中發(fā)送的消息傳遞給這個connection中的消費者
		false,
		//隊列是否阻塞
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	forever := make(chan bool)

	//啟用協(xié)程處理
	go func() {
		for d := range msgs {
			//實現(xiàn)我們要處理的邏輯函數(shù)
			log.Printf("Received a message:%s", d.Body)
			//fmt.Println(d.Body)
		}
	}()

	log.Printf("【*】warting for messages, To exit press CCTRAL+C")
	<-forever
}

func (r *RabbitMQ) ConsumeWorker(consumerName string) {
	//1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建
	//優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中
	_, err := r.channel.QueueDeclare(
		//隊列名稱
		r.QueueName,
		//是否持久化
		false,
		//是否為自動刪除 當(dāng)最后一個消費者斷開連接之后,是否把消息從隊列中刪除
		false,
		//是否具有排他性
		false,
		//是否阻塞
		false,
		//額外數(shù)據(jù)
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	//接收消息
	msgs, err := r.channel.Consume(
		r.QueueName,
		//用來區(qū)分多個消費者
		consumerName,
		//是否自動應(yīng)答
		true,
		//是否具有排他性
		false,
		//如果設(shè)置為true,表示不能同一個connection中發(fā)送的消息傳遞給這個connection中的消費者
		false,
		//隊列是否阻塞
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err)
	}
	forever := make(chan bool)

	//啟用協(xié)程處理
	go func() {
		for d := range msgs {
			//實現(xiàn)我們要處理的邏輯函數(shù)
			log.Printf("%s Received a message:%s", consumerName, d.Body)
			//fmt.Println(d.Body)
		}
	}()

	log.Printf("【*】warting for messages, To exit press CCTRAL+C")
	<-forever
}
②測試代碼
1. simple簡單模式

consumer.go

func main() {
	//消費者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
	rabbitmq.ConsumeSimple()
}

producer.go

func main() {
	//Simple模式 生產(chǎn)者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
	for i := 0; i < 5; i++ {
		time.Sleep(time.Second * 2)
		rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
	}
}
2. worker模式

consumer.go

func main() {
	/*
		worker模式無非就是多個消費者去同一個隊列中消費消息
	*/
	//消費者1
	rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	go rabbitmq1.ConsumeWorker("consumer1")
	//消費者2
	rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	rabbitmq2.ConsumeWorker("consumer2")
}

producer.go

func main() {
	//Worker模式 生產(chǎn)者
	rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
	for i := 0; i < 100; i++ {
		//time.Sleep(time.Second * 2)
		rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
	}
}
3. publish/subscribe模式

consumer.go:

func main() {
	//消費者
	rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
	rabbitmq.RecieveSub()
}

producer.go

func main() {
	//訂閱模式發(fā)送者
	rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
	for i := 0; i <= 20; i++ {
		rabbitmq.PublishPub("訂閱模式生產(chǎn)第" + strconv.Itoa(i) + "條數(shù)據(jù)")
		fmt.Println(i)
		time.Sleep(1 * time.Second)
	}
}
4. router模式

consumer.go

func main() {
	//消費者
	rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
	rabbitmq.RecieveRouting()
}

producer.go

func main() {
	//路由模式生產(chǎn)者
	imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
	imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")

	for i := 0; i <= 10; i++ {
		imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
		imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}
5. topic模式

consumer.go

func main() {
	/*
		星號井號代表通配符
		星號代表多個單詞,井號代表一個單詞
		路由功能添加模糊匹配
		消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機
		交換機根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊列,由隊列的監(jiān)聽消費者接收消息消費
	*/
	//Topic消費者
	//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99
	rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的
	rabbitmq.RecieveTopic()
}

producer.go

func main() {
	//Topic模式生產(chǎn)者
	imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
	imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")

	for i := 0; i <= 10; i++ {
		imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
		imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Println(i)
	}
}

2 Kafka

2.1 基本概念

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ
Kafka是分布式的,其所有的構(gòu)件borker(server服務(wù)端集群)、producer(消息生產(chǎn))、consumer(消息消費者)都可以是分布式的。
producer給broker發(fā)送數(shù)據(jù),這些消息會存到kafka server里,然后consumer再向kafka server發(fā)起請求去消費這些數(shù)據(jù)。
kafka server在這個過程中像是一個幫你保管數(shù)據(jù)的中間商。所以kafka服務(wù)器也可以叫做broker(broker直接翻譯可以是中間人或者經(jīng)紀(jì)人的意思)。

在消息的生產(chǎn)時可以使用一個標(biāo)識topic來區(qū)分,且可以進行分區(qū);每一個分區(qū)都是一個順序的、不可變的消息隊列, 并且可以持續(xù)的添加。
同時為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態(tài)是在consumer端維護,而不是由server端維護。當(dāng)失敗時能自動平衡

參考:https://blog.csdn.net/lingfy1234/article/details/122900348

  • 應(yīng)用場景
    • 監(jiān)控
    • 消息隊列
    • 流處理
    • 日志聚合
    • 持久性日志
  • 基礎(chǔ)概念
    • topic:話題
    • broker:kafka服務(wù)集群,已發(fā)布的消息保存在一組服務(wù)器中,稱之為kafka集群。集群中的每一個服務(wù)器都是一個代理(broker)
    • partition:分區(qū),topic物理上的分組
    • message:消息,每個producer可以向一個topic主題發(fā)布一些消息

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ
1.?產(chǎn)者從Kafka集群獲取分區(qū)leader信息
2.?產(chǎn)者將消息發(fā)送給leader
3.leader將消息寫入本地磁盤
4.follower從leader拉取消息數(shù)據(jù)
5.follower將消息寫入本地磁盤后向leader發(fā)送ACK
6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK

2.2 常見模式

①點對點模式:火車站出租車搶客

發(fā)送者將消息發(fā)送到消息隊列中,消費者去消費,如果消費者有多個,他們會競爭地消費,也就是說對于某一條消息,只有一個消費者能“搶“到它。類似于火車站門口的出租車搶客的場景。

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

②發(fā)布訂閱模式:組間無競爭,組內(nèi)有競爭

消費者訂閱對應(yīng)的topic(主題),只有訂閱了對應(yīng)topic消費者的才會接收到消息。

例如:

  • 牛奶有很多種,光明牛奶,希望牛奶等,只有你訂閱了光明牛奶,送奶工才會把光明牛奶送到對應(yīng)位置,你也才會有機會消費這個牛奶

注意:為了提高消費者的消費能力,kafka中引入了消費者組的概念。相當(dāng)于是:不同消費者組之間因為訂閱的topic不同,不會有競爭關(guān)系。但是消費者組內(nèi)是有競爭關(guān)系。

例如:

  • 成都、廈門的出租車司機分別組成各自的消費者組。
  • 成都的出租車司機只拉成都的人,廈門的只拉廈門的人。(因此他們兩個消費者組不是競爭關(guān)系)
  • 成都市內(nèi)的出租車司機之間是競爭關(guān)系。(消費者組內(nèi)是競爭關(guān)系)

2.3 docker-compose部署

 vim docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:6.2.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      #KAFKA_ADVERTISED_LISTENERS后面改為自己本地宿主機的ip,例如我本地mac的ip為192.168.0.101
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
# 進入到docker-compose.yml所在目錄,執(zhí)行下面命令
docker-compose up -d
# 查看部署結(jié)果,狀態(tài)為up表明部署成功
docker-compose ps 

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

2.4 代碼操作

# 1. 創(chuàng)建對應(yīng)topic
docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092

# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ

①producer.go
package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

// 基于sarama第三方庫開發(fā)的kafka client

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 發(fā)送完數(shù)據(jù)需要leader和follow都確認(rèn)
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
	config.Producer.Return.Successes = true                   // 成功交付的消息將在success channel返回

	// 構(gòu)造一個消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 連接kafka
	client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 發(fā)送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
②consumer.go
package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

// kafka consumer

func main() {
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		fmt.Printf("fail to start consumer, err:%v\n", err)
		return
	}
	partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū)
	if err != nil {
		fmt.Printf("fail to get list of partition:err%v\n", err)
		return
	}
	fmt.Println(partitionList)
	for partition := range partitionList { // 遍歷所有的分區(qū)
		// 針對每個分區(qū)創(chuàng)建一個對應(yīng)的分區(qū)消費者
		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 異步從每個分區(qū)消費信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
			}
		}(pc)
	}
	//演示時使用
	select {}
}
③運行效果

Go操作各大消息隊列教程(RabbitMQ、Kafka),go,go,kafka,消息隊列,RabbitMQ文章來源地址http://www.zghlxwxcb.cn/news/detail-681151.html

到了這里,關(guān)于Go操作各大消息隊列教程(RabbitMQ、Kafka)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 怎么去選消息隊列? Kafka vs. RabbitMQ

    怎么去選消息隊列? Kafka vs. RabbitMQ

    在上周,我們討論了使用消息隊列的好處。然后我們回顧了消息隊列產(chǎn)品的發(fā)展歷史。如今,在項目中需要使用消息隊列時,Apache Kafka似乎是首選產(chǎn)品。然而,考慮到特定需求時,它并不總是最佳選擇。 基于數(shù)據(jù)庫的隊列 讓我們再次使用星巴克的例子。最重要的兩個需求是

    2024年02月11日
    瀏覽(17)
  • Go操作Kafka之kafka-go

    Go操作Kafka之kafka-go

    Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),本文介紹了如何使用kafka-go這個庫實現(xiàn)Go語言與kafka的交互。 Go社區(qū)中目前有三個比較常用的kafka客戶端庫 , 它們各有特點。 首先是IBM/sarama(這個庫已經(jīng)由Shopify轉(zhuǎn)給了IBM),之前我寫過一篇使用sarama操作Kafka的教程,相較于sar

    2024年04月26日
    瀏覽(14)
  • 【面試需了解之消息隊列】RocketMQ、kafka、RabbitMQ概述

    消息隊列說明:RocketMQ、kafka、RabbitMQ概述及關(guān)鍵概念 概述 消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量控制等問題。實現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu),是大型分布式系統(tǒng)不可缺少的中間件 作用 異構(gòu)系統(tǒng)消息傳遞:上游系統(tǒng)

    2024年02月10日
    瀏覽(62)
  • 51.Go操作kafka示例(kafka-go庫)

    51.Go操作kafka示例(kafka-go庫)

    代碼地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go 之前已經(jīng)介紹過一個操作kafka的go庫了,28.windows安裝kafka,Go操作kafka示例(sarama庫) ,但是這個庫比較老了,當(dāng)前比較流行的庫是 github.com/segmentio/kafka-go ,所以本次我們就使用一下它。 我們在 GitHub 直接輸入 kafk

    2024年03月17日
    瀏覽(23)
  • Java中如何使用消息隊列實現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)

    在 Java 中,可以使用消息隊列實現(xiàn)異步處理。下面是一個簡單的示例代碼,用于說明如何使用 ActiveMQ 實現(xiàn)消息隊列異步處理: 添加 ActiveMQ 依賴 在 pom.xml 文件中添加以下依賴: 創(chuàng)建消息隊列 創(chuàng)建一個名為 “TestQueue” 的消息隊列,并配置 ActiveMQ 連接信息: 創(chuàng)建消息消費者

    2024年02月16日
    瀏覽(33)
  • 分布式消息隊列:Kafka vs RabbitMQ vs ActiveMQ

    在現(xiàn)代分布式系統(tǒng)中,消息隊列是一種常見的異步通信模式,它可以幫助系統(tǒng)處理高并發(fā)、高可用性以及容錯等問題。在這篇文章中,我們將深入探討三種流行的分布式消息隊列:Apache Kafka、RabbitMQ和ActiveMQ。我們將討論它們的核心概念、算法原理、特點以及使用場景。 隨著

    2024年02月02日
    瀏覽(19)
  • 基于golang多消息隊列中間件的封裝nsq,rabbitmq,kafka

    場景 在創(chuàng)建個人的公共方法庫中有這樣一個需求,就是不同的項目會用到不同的消息隊列中間件,我的思路把所有的消息隊列中間件進行封裝一個消息隊列接口(MQer)有兩個方法一個生產(chǎn)一個消費,那么在實例化對象的時候根據(jù)配置文件指定當(dāng)前項目使用的那個消息隊列中

    2024年02月14日
    瀏覽(93)
  • 消息隊列黃金三劍客:RabbitMQ、RocketMQ和Kafka全面對決,誰是最佳選擇?

    消息隊列黃金三劍客:RabbitMQ、RocketMQ和Kafka全面對決,誰是最佳選擇?

    1.RabbitMQ: 適用于易用性和靈活性要求較高的場景 異步任務(wù)處理:RabbitMQ提供可靠的消息傳遞機制,適用于處理異步任務(wù),例如將耗時的任務(wù)放入消息隊列中,然后由消費者異步處理,提高系統(tǒng)的響應(yīng)速度和可伸縮性。 解耦系統(tǒng)組件:通過使用RabbitMQ作為消息中間件,不同的

    2024年02月14日
    瀏覽(18)
  • 消息隊列之六脈神劍:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar對比和如何使用

    消息隊列(Message Queue)是一種異步通信機制,它將消息發(fā)送者和接收者解耦,從而提高了應(yīng)用程序的性能、可擴展性和可靠性。在分布式系統(tǒng)中,消息隊列經(jīng)常被用于處理高并發(fā)、異步處理、應(yīng)用解耦等場景。 本篇回答將分析比較常見的六種消息隊列:RabbitMQ、Kafka、Active

    2024年02月14日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包