Go操作各大消息隊列教程
1 RabbitMQ
1.1 概念
①基本名詞
當(dāng)前市面上mq的產(chǎn)品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐獻給Apache的RocketMQ。甚至連redis這種NoSQL都支持MQ的功能。
- Broker:表示消息隊列服務(wù)實體
- Virtual Host:虛擬主機。標(biāo)識一批交換機、消息隊列和相關(guān)對象。vhost是AMQP概念的基礎(chǔ),必須在鏈接時指定,RabbitMQ默認(rèn)的vhost是 /。
- AMQP(Advanced Message Queuing Protocol)高級消息隊列協(xié)議
- Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列。
- Queue:消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
②常見模式
1. simple簡單模式
消息的消費者(consumer) 監(jiān)聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經(jīng)從隊列中消失了,造成消息的丟失)
2. worker工作模式
多個消費者從一個隊列中爭搶消息
- (隱患,高并發(fā)情況下,默認(rèn)會產(chǎn)生某一個消息被多個消費者共同使用,可以設(shè)置一個開關(guān)(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個消費者使用)
- 應(yīng)用場景:紅包;大項目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到消息隊列中,空閑的系統(tǒng)自動爭搶)
3. publish/subscribe發(fā)布訂閱(共享資源)
消費者訂閱消息,然后從訂閱的隊列中獲取消息進行消費。
- X代表交換機rabbitMQ內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機,交換機發(fā)布訂閱把消息發(fā)送到所有消息隊列中,對應(yīng)消息隊列的消費者拿到消息進行消費
- 相關(guān)場景:郵件群發(fā),群聊天,廣播(廣告)
4. routing路由模式
- 交換機根據(jù)路由規(guī)則,將消息路由到不同的隊列中
- 消息生產(chǎn)者將消息發(fā)送給交換機按照路由判斷,路由是字符串(info) 當(dāng)前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機根據(jù)路由的key,只能匹配上路由key對應(yīng)的消息隊列,對應(yīng)的消費者才能消費消息;
5. topic主題模式(路由模式的一種)
- 星號井號代表通配符
- 星號代表多個單詞,井號代表一個單詞
- 路由功能添加模糊匹配
- 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機
- 交換機根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊列,由隊列的監(jiān)聽消費者接收消息消費
1.2 搭建(docker方式)
①拉取鏡像
# 拉取鏡像
docker pull rabbitmq:3.7-management
②創(chuàng)建并啟動容器
# 創(chuàng)建并運行容器
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7-management
#5672是項目中連接rabbitmq的端口(我這里映射的是5672),15672是rabbitmq的web管理界面端口(我映射為15672)
# 輸入網(wǎng)址http://ip:15672即可進入rabbitmq的web管理頁面,賬戶密碼:guest / guest
③web界面創(chuàng)建用戶和virtual host
下面為了我們后續(xù)的操作,首先我們新建一個Virtual Host并且給他分配一個用戶名,用來隔離數(shù)據(jù),根據(jù)自己需要自行創(chuàng)建
- 新增virtual host
- 新增用戶
- 點擊新建好的用戶,設(shè)置其host
- 最終效果
1.3 代碼操作
①RabbitMQ struct:包含創(chuàng)建、消費、生產(chǎn)消息
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//amqp:// 賬號 密碼@地址:端口號/vhost
const MQURL = "amqp://ziyi:ziyi@10.253.50.145:5672/ziyi"
type RabbitMQ struct {
//連接
conn *amqp.Connection
//管道
channel *amqp.Channel
//隊列名稱
QueueName string
//交換機
Exchange string
//key Simple模式 幾乎用不到
Key string
//連接信息
Mqurl string
}
//創(chuàng)建RabbitMQ結(jié)構(gòu)體實例
func NewRabbitMQ(queuename string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName: queuename, Exchange: exchange, Key: key, Mqurl: MQURL}
var err error
//創(chuàng)建rabbitmq連接
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "創(chuàng)建連接錯誤!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "獲取channel失敗")
return rabbitmq
}
//斷開channel和connection
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//錯誤處理函數(shù)
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
//簡單模式step:1。創(chuàng)建簡單模式下RabbitMQ實例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
//訂閱模式創(chuàng)建rabbitmq實例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
//創(chuàng)建rabbitmq實例
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
//獲取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connecct rabbitmq!")
//獲取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel!")
return rabbitmq
}
//訂閱模式生成
func (r *RabbitMQ) PublishPub(message string) {
//嘗試創(chuàng)建交換機,不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機名稱
r.Exchange,
//交換機類型 廣播類型
"fanout",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2 發(fā)送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
//類型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//訂閱模式消費端代碼
func (r *RabbitMQ) RecieveSub() {
//嘗試創(chuàng)建交換機,不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機名稱
r.Exchange,
//交換機類型 廣播類型
"fanout",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2試探性創(chuàng)建隊列,創(chuàng)建隊列
q, err := r.channel.QueueDeclare(
"", //隨機生產(chǎn)隊列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//綁定隊列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
"",
r.Exchange,
false,
nil,
)
//消費消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出請按 Ctrl+C")
<-forever
}
//話題模式 創(chuàng)建RabbitMQ實例
func NewRabbitMQTopic(exchagne string, routingKey string) *RabbitMQ {
//創(chuàng)建rabbitmq實例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//話題模式發(fā)送信息
func (r *RabbitMQ) PublishTopic(message string) {
//嘗試創(chuàng)建交換機,不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機名稱
r.Exchange,
//交換機類型 話題模式
"topic",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "topic failed to declare an excha"+"nge")
//2發(fā)送信息
err = r.channel.Publish(
r.Exchange,
//要設(shè)置
r.Key,
false,
false,
amqp.Publishing{
//類型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//話題模式接收信息
//要注意key
//其中* 用于匹配一個單詞,#用于匹配多個單詞(可以是零個)
//匹配 表示匹配imooc.* 表示匹配imooc.hello,但是imooc.hello.one需要用imooc.#才能匹配到
func (r *RabbitMQ) RecieveTopic() {
//嘗試創(chuàng)建交換機,不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機名稱
r.Exchange,
//交換機類型 話題模式
"topic",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an exchange")
//2試探性創(chuàng)建隊列,創(chuàng)建隊列
q, err := r.channel.QueueDeclare(
"", //隨機生產(chǎn)隊列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//綁定隊列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
r.Key,
r.Exchange,
false,
nil,
)
//消費消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出請按 Ctrl+C")
<-forever
}
//路由模式 創(chuàng)建RabbitMQ實例
func NewRabbitMQRouting(exchagne string, routingKey string) *RabbitMQ {
//創(chuàng)建rabbitmq實例
rabbitmq := NewRabbitMQ("", exchagne, routingKey)
var err error
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabbingmq!")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
//路由模式發(fā)送信息
func (r *RabbitMQ) PublishRouting(message string) {
//嘗試創(chuàng)建交換機,不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機名稱
r.Exchange,
//交換機類型 廣播類型
"direct",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//發(fā)送信息
err = r.channel.Publish(
r.Exchange,
//要設(shè)置
r.Key,
false,
false,
amqp.Publishing{
//類型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
//路由模式接收信息
func (r *RabbitMQ) RecieveRouting() {
//嘗試創(chuàng)建交換機,不存在創(chuàng)建
err := r.channel.ExchangeDeclare(
//交換機名稱
r.Exchange,
//交換機類型 廣播類型
"direct",
//是否持久化
true,
//是否字段刪除
false,
//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
nil,
)
r.failOnErr(err, "failed to declare an excha"+"nge")
//2試探性創(chuàng)建隊列,創(chuàng)建隊列
q, err := r.channel.QueueDeclare(
"", //隨機生產(chǎn)隊列名稱
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "Failed to declare a queue")
//綁定隊列到exchange中
err = r.channel.QueueBind(
q.Name,
//在pub/sub模式下,這里的key要為空
r.Key,
r.Exchange,
false,
nil,
)
//消費消息
message, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message:%s,", d.Body)
}
}()
fmt.Println("退出請按 Ctrl+C")
<-forever
}
//簡單模式Step:2、簡單模式下生產(chǎn)代碼
func (r *RabbitMQ) PublishSimple(message string) {
//1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建
//優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中
_, err := r.channel.QueueDeclare(
//隊列名稱
r.QueueName,
//是否持久化
false,
//是否為自動刪除 當(dāng)最后一個消費者斷開連接之后,是否把消息從隊列中刪除
false,
//是否具有排他性 true表示自己可見 其他用戶不能訪問
false,
//是否阻塞 true表示要等待服務(wù)器的響應(yīng)
false,
//額外數(shù)據(jù)
nil,
)
if err != nil {
fmt.Println(err)
}
//2.發(fā)送消息到隊列中
r.channel.Publish(
//默認(rèn)的Exchange交換機是default,類型是direct直接類型
r.Exchange,
//要賦值的隊列名稱
r.QueueName,
//如果為true,根據(jù)exchange類型和routkey規(guī)則,如果無法找到符合條件的隊列那么會把發(fā)送的消息返回給發(fā)送者
false,
//如果為true,當(dāng)exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有綁定消費者,則會把消息還給發(fā)送者
false,
//消息
amqp.Publishing{
//類型
ContentType: "text/plain",
//消息
Body: []byte(message),
})
}
func (r *RabbitMQ) ConsumeSimple() {
//1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建
//優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中
_, err := r.channel.QueueDeclare(
//隊列名稱
r.QueueName,
//是否持久化
false,
//是否為自動刪除 當(dāng)最后一個消費者斷開連接之后,是否把消息從隊列中刪除
false,
//是否具有排他性
false,
//是否阻塞
false,
//額外數(shù)據(jù)
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用來區(qū)分多個消費者
"",
//是否自動應(yīng)答
true,
//是否具有排他性
false,
//如果設(shè)置為true,表示不能同一個connection中發(fā)送的消息傳遞給這個connection中的消費者
false,
//隊列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//啟用協(xié)程處理
go func() {
for d := range msgs {
//實現(xiàn)我們要處理的邏輯函數(shù)
log.Printf("Received a message:%s", d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
func (r *RabbitMQ) ConsumeWorker(consumerName string) {
//1、申請隊列,如果隊列存在就跳過,不存在創(chuàng)建
//優(yōu)點:保證隊列存在,消息能發(fā)送到隊列中
_, err := r.channel.QueueDeclare(
//隊列名稱
r.QueueName,
//是否持久化
false,
//是否為自動刪除 當(dāng)最后一個消費者斷開連接之后,是否把消息從隊列中刪除
false,
//是否具有排他性
false,
//是否阻塞
false,
//額外數(shù)據(jù)
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
r.QueueName,
//用來區(qū)分多個消費者
consumerName,
//是否自動應(yīng)答
true,
//是否具有排他性
false,
//如果設(shè)置為true,表示不能同一個connection中發(fā)送的消息傳遞給這個connection中的消費者
false,
//隊列是否阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//啟用協(xié)程處理
go func() {
for d := range msgs {
//實現(xiàn)我們要處理的邏輯函數(shù)
log.Printf("%s Received a message:%s", consumerName, d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("【*】warting for messages, To exit press CCTRAL+C")
<-forever
}
②測試代碼
1. simple簡單模式
consumer.go
func main() {
//消費者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
rabbitmq.ConsumeSimple()
}
producer.go
func main() {
//Simple模式 生產(chǎn)者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiSimple")
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 2)
rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
}
}
2. worker模式
consumer.go
func main() {
/*
worker模式無非就是多個消費者去同一個隊列中消費消息
*/
//消費者1
rabbitmq1 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
go rabbitmq1.ConsumeWorker("consumer1")
//消費者2
rabbitmq2 := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
rabbitmq2.ConsumeWorker("consumer2")
}
producer.go
func main() {
//Worker模式 生產(chǎn)者
rabbitmq := RabbitMQ.NewRabbitMQSimple("ziyiWorker")
for i := 0; i < 100; i++ {
//time.Sleep(time.Second * 2)
rabbitmq.PublishSimple(fmt.Sprintf("%s %d", "hello", i))
}
}
3. publish/subscribe模式
consumer.go:
func main() {
//消費者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
rabbitmq.RecieveSub()
}
producer.go
func main() {
//訂閱模式發(fā)送者
rabbitmq := RabbitMQ.NewRabbitMQPubSub("" + "newProduct")
for i := 0; i <= 20; i++ {
rabbitmq.PublishPub("訂閱模式生產(chǎn)第" + strconv.Itoa(i) + "條數(shù)據(jù)")
fmt.Println(i)
time.Sleep(1 * time.Second)
}
}
4. router模式
consumer.go
func main() {
//消費者
rabbitmq := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
rabbitmq.RecieveRouting()
}
producer.go
func main() {
//路由模式生產(chǎn)者
imoocOne := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_one")
imoocTwo := RabbitMQ.NewRabbitMQRouting("exZi", "imooc_two")
for i := 0; i <= 10; i++ {
imoocOne.PublishRouting("hello imooc one!" + strconv.Itoa(i))
imoocTwo.PublishRouting("hello imooc two!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
5. topic模式
consumer.go
func main() {
/*
星號井號代表通配符
星號代表多個單詞,井號代表一個單詞
路由功能添加模糊匹配
消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機
交換機根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊列,由隊列的監(jiān)聽消費者接收消息消費
*/
//Topic消費者
//rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "#") //匹配所有的key:topic88和topic99
rabbitmq := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three") //只匹配topic88的
rabbitmq.RecieveTopic()
}
producer.go
func main() {
//Topic模式生產(chǎn)者
imoocOne := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic88.three")
imoocTwo := RabbitMQ.NewRabbitMQTopic("exImoocTopic", "imooc.topic99.four")
for i := 0; i <= 10; i++ {
imoocOne.PublishTopic("hello imooc topic three!" + strconv.Itoa(i))
imoocTwo.PublishTopic("hello imooc topic four!" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Println(i)
}
}
2 Kafka
2.1 基本概念
Kafka是分布式的,其所有的構(gòu)件borker(server服務(wù)端集群)、producer(消息生產(chǎn))、consumer(消息消費者)都可以是分布式的。
producer給broker發(fā)送數(shù)據(jù),這些消息會存到kafka server里,然后consumer再向kafka server發(fā)起請求去消費這些數(shù)據(jù)。
kafka server在這個過程中像是一個幫你保管數(shù)據(jù)的中間商。所以kafka服務(wù)器也可以叫做broker(broker直接翻譯可以是中間人或者經(jīng)紀(jì)人的意思)。
在消息的生產(chǎn)時可以使用一個標(biāo)識topic來區(qū)分,且可以進行分區(qū);每一個分區(qū)都是一個順序的、不可變的消息隊列, 并且可以持續(xù)的添加。
同時為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態(tài)是在consumer端維護,而不是由server端維護。當(dāng)失敗時能自動平衡
參考:https://blog.csdn.net/lingfy1234/article/details/122900348
- 應(yīng)用場景
- 監(jiān)控
- 消息隊列
- 流處理
- 日志聚合
- 持久性日志
- 基礎(chǔ)概念
- topic:話題
- broker:kafka服務(wù)集群,已發(fā)布的消息保存在一組服務(wù)器中,稱之為kafka集群。集群中的每一個服務(wù)器都是一個代理(broker)
- partition:分區(qū),topic物理上的分組
- message:消息,每個producer可以向一個topic主題發(fā)布一些消息
1.?產(chǎn)者從Kafka集群獲取分區(qū)leader信息
2.?產(chǎn)者將消息發(fā)送給leader
3.leader將消息寫入本地磁盤
4.follower從leader拉取消息數(shù)據(jù)
5.follower將消息寫入本地磁盤后向leader發(fā)送ACK
6.leader收到所有的follower的ACK之后向生產(chǎn)者發(fā)送ACK
2.2 常見模式
①點對點模式:火車站出租車搶客
發(fā)送者將消息發(fā)送到消息隊列中,消費者去消費,如果消費者有多個,他們會競爭地消費,也就是說對于某一條消息,只有一個消費者能“搶“到它。類似于火車站門口的出租車搶客的場景。
②發(fā)布訂閱模式:組間無競爭,組內(nèi)有競爭
消費者訂閱對應(yīng)的topic(主題),只有訂閱了對應(yīng)topic消費者的才會接收到消息。
例如:
- 牛奶有很多種,光明牛奶,希望牛奶等,只有你訂閱了光明牛奶,送奶工才會把光明牛奶送到對應(yīng)位置,你也才會有機會消費這個牛奶
注意
:為了提高消費者的消費能力,kafka中引入了消費者組的概念。相當(dāng)于是:不同消費者組之間因為訂閱的topic不同,不會有競爭關(guān)系。但是消費者組內(nèi)是有競爭關(guān)系。
例如:
- 成都、廈門的出租車司機分別組成各自的消費者組。
- 成都的出租車司機只拉成都的人,廈門的只拉廈門的人。(因此他們兩個消費者組不是競爭關(guān)系)
- 成都市內(nèi)的出租車司機之間是競爭關(guān)系。(消費者組內(nèi)是競爭關(guān)系)
2.3 docker-compose部署
vim docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#KAFKA_ADVERTISED_LISTENERS后面改為自己本地宿主機的ip,例如我本地mac的ip為192.168.0.101
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
# 進入到docker-compose.yml所在目錄,執(zhí)行下面命令
docker-compose up -d
# 查看部署結(jié)果,狀態(tài)為up表明部署成功
docker-compose ps
2.4 代碼操作
# 1. 創(chuàng)建對應(yīng)topic
docker-compose exec kafka kafka-topics --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092
# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181
文章來源:http://www.zghlxwxcb.cn/news/detail-681151.html
①producer.go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// 基于sarama第三方庫開發(fā)的kafka client
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follow都確認(rèn)
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
// 構(gòu)造一個消息
msg := &sarama.ProducerMessage{}
msg.Topic = "web_log"
msg.Value = sarama.StringEncoder("this is a test log")
// 連接kafka
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 發(fā)送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
②consumer.go
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("web_log") // 根據(jù)topic取到所有的分區(qū)
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList { // 遍歷所有的分區(qū)
// 針對每個分區(qū)創(chuàng)建一個對應(yīng)的分區(qū)消費者
pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
// 異步從每個分區(qū)消費信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
}
}(pc)
}
//演示時使用
select {}
}
③運行效果
文章來源地址http://www.zghlxwxcb.cn/news/detail-681151.html
到了這里,關(guān)于Go操作各大消息隊列教程(RabbitMQ、Kafka)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!