kafka
Apache Kafka 是一個分布式的流處理平臺。它具有以下特點:
- 支持消息的發(fā)布和訂閱,類似于 RabbtMQ、ActiveMQ 等消息隊列
- 支持?jǐn)?shù)據(jù)實時處理
- 能保證消息的可靠性投遞
- 支持消息的持久化存儲,并通過多副本分布式的存儲方案來保證消息的容錯
- 高吞吐率,單 Broker 可以輕松處理數(shù)千個分區(qū)以及每秒百萬級的消息量
架構(gòu)簡介
Messages and Batches
kafka基本數(shù)據(jù)單元為消息,為了提高網(wǎng)絡(luò)使用效率,采用批寫入方式
Topics and Partitions
topic為kafka消費(fèi)主題,每個主題下有若干分區(qū)(partitions),Kafka 通過分區(qū)來實現(xiàn)數(shù)據(jù)的冗余和伸縮性,分區(qū)可以分布在不同的服務(wù)器上。由于多個partition的特性,kafka無法保證topic范圍內(nèi)的消息順序,但是可以保證單個分區(qū)內(nèi)消息的順序
broker
broker 對應(yīng)著一個 kafka 的進(jìn)程;一個 kafka 集群會包含多個 broker;同時需要在這些 broker中選舉出一個controller,選舉是通過 zk 來實現(xiàn);controller 負(fù)責(zé)協(xié)調(diào)管理集群狀態(tài),同時也負(fù)責(zé) partition 的 leader 選舉;
Producers And Consumers
- 消息的生產(chǎn)者,負(fù)責(zé)將消息發(fā)送到不同的 partition 中;消息的生產(chǎn)需要考慮冪等性、正確性以及安全性;kafka 引入了 ack 機(jī)制;ack 為 0,則不需要 kafka 回復(fù),此時可能造成數(shù)據(jù)丟失;ack為 1, 則需要等待 leader 回復(fù),此時其他 replica 可能還沒同步 leader 掛掉,數(shù)據(jù)安全性沒法得到保證;ack 為 -1,則需要等待其他 replica 同步完成后,才回復(fù),此時數(shù)據(jù)最健壯,但是效率最低;
- 消息的消費(fèi)者,負(fù)責(zé)消費(fèi)消息;一個 partition 對應(yīng)一個consumer, 而一個 consumer 可以對應(yīng)多個 partition;消費(fèi)同一類消息的高吞吐量,可以設(shè)置 consumer group;
副本同步策略
每個分區(qū)里有多個副本,這些副本有一個leader。只有副本全部同步完成才發(fā)送ack。這里指同步策略,是全量同步,而不是半數(shù)以上同步了就認(rèn)為該數(shù)據(jù)已經(jīng)commit。不過也可以設(shè)置最少同步副本數(shù)提高性能(min.insync.replicas)
ISR
Leader 維護(hù)了一個動態(tài)的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合。當(dāng) ISR 中的 follower 完成數(shù)據(jù)的同步之后,leader 就會給 producer 發(fā)送 ack。如果 follower 長時間未向 leader 同步數(shù)據(jù),則該 follower 將被踢出 ISR,該時間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定。Leader 發(fā)生故障之后,就會從 ISR 中選舉新的 leader。
數(shù)據(jù)可見性
需要注意的是,并不是所有保存在分區(qū)首領(lǐng)上的數(shù)據(jù)都可以被客戶端讀取到,為了保證數(shù)據(jù)一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的數(shù)據(jù)才能被客戶端讀取到。
kafka讀寫機(jī)制
producer寫流程
producer寫入消息流程如下:
-
連接ZK集群,從ZK中拿到對應(yīng)topic的partition信息和partition的Leader的相關(guān)信息
-
連接到對應(yīng)Leader對應(yīng)的broker
-
將消息按批次發(fā)送到partition的Leader上
-
其他Follower從Leader上復(fù)制數(shù)據(jù)
-
依次返回ACK
-
直到所有ISR中的數(shù)據(jù)寫完成,才完成提交,整個寫過程結(jié)束
consumer 讀流程
-
連接ZK集群,從ZK中拿到對應(yīng)topic的partition信息和partition的Leader的相關(guān)信息
-
連接到對應(yīng)Leader對應(yīng)的broker
-
consumer將自己保存的offset發(fā)送給Leader
-
Leader根據(jù)offset等信息定位到segment(索引文件和日志文件)
-
根據(jù)索引文件中的內(nèi)容,定位到日志文件中該偏移量對應(yīng)的開始位置讀取相應(yīng)長度的數(shù)據(jù)并返回給consumer
kafka集群選舉
副本leader選舉
只有完全追上Leader數(shù)據(jù)的follower才能進(jìn)行選舉,Leader發(fā)生故障之后,會從ISR中選出一個新的Leader
controller選舉
這部分由ZK完成,不過高本版kafka引入kratf,就可以完成去ZK化了。 ratf是一種簡單易理解并且嚴(yán)格復(fù)合數(shù)學(xué)歸納的共識算法。
自測環(huán)境搭建
zk
docker pull wurstmeister/zookeeper
docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper
kafka
docker pull wurstmeister/kafka
docker run -itd --name kafka -p 9092:9092 -e HOST_IP=10.74.18.61 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=10.74.18.61 --link zookeeper:zookeeper wurstmeister/kafka
go鏈接kafka生產(chǎn)消費(fèi)
go版本:1.21
生產(chǎn)者文章來源:http://www.zghlxwxcb.cn/news/detail-766945.html
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// 等待服務(wù)器所有副本都保存成功后的響應(yīng),對應(yīng)ack=-1
config.Producer.RequiredAcks = sarama.WaitForAll
// 隨機(jī)的分區(qū)類型:返回一個分區(qū)器,該分區(qū)器每次選擇一個隨機(jī)分區(qū)
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失敗后的響應(yīng)
config.Producer.Return.Successes = true
// 使用給定代理地址和配置創(chuàng)建一個同步生產(chǎn)者
producer, err := sarama.NewSyncProducer([]string{"10.74.18.61:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
//構(gòu)建發(fā)送的消息,
msg := &sarama.ProducerMessage{
//Topic: "test",//包含了消息的主題
Partition: int32(10), //
Key: sarama.StringEncoder("key"), //
}
var value string
var msgType string
for {
_, err := fmt.Scanf("%s", &value)
if err != nil {
break
}
fmt.Scanf("%s", &msgType)
fmt.Println("msgType = ", msgType, ",value = ", value)
msg.Topic = msgType
//將字符串轉(zhuǎn)換為字節(jié)數(shù)組
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//SendMessage:該方法是生產(chǎn)者生產(chǎn)給定的消息
//生產(chǎn)成功的時候返回該消息的分區(qū)和所在的偏移量
//生產(chǎn)失敗的時候返回error
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail", err)
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
}
}
消費(fèi)者文章來源地址http://www.zghlxwxcb.cn/news/detail-766945.html
package main
import (
"fmt"
"sync"
"github.com/IBM/sarama"
)
var (
wg sync.WaitGroup
)
func main() {
// 根據(jù)給定的代理地址和配置創(chuàng)建一個消費(fèi)者
consumer, err := sarama.NewConsumer([]string{"10.74.18.61:9092"}, nil)
if err != nil {
panic(err)
}
//Partitions(topic):該方法返回了該topic的所有分區(qū)id
partitionList, err := consumer.Partitions("test")
if err != nil {
panic(err)
}
for partition := range partitionList {
//ConsumePartition方法根據(jù)主題,分區(qū)和給定的偏移量創(chuàng)建創(chuàng)建了相應(yīng)的分區(qū)消費(fèi)者
//如果該分區(qū)消費(fèi)者已經(jīng)消費(fèi)了該信息將會返回error
//sarama.OffsetNewest:表明了為最新消息
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
//Messages()該方法返回一個消費(fèi)消息類型的只讀通道,由代理產(chǎn)生
for msg := range pc.Messages() {
fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
wg.Wait()
consumer.Close()
}
到了這里,關(guān)于golang—kafka架構(gòu)原理快速入門以及自測環(huán)境搭建(docker單節(jié)點部署)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!