国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

深入理解Kafka3.6.0的核心概念,搭建與使用

這篇具有很好參考價(jià)值的文章主要介紹了深入理解Kafka3.6.0的核心概念,搭建與使用。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Kafka是最初由Linkedin公司開發(fā),是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實(shí)時的處理大量數(shù)據(jù)以滿足各種需求場景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時系統(tǒng)、Storm/Spark流式處理引擎,web/nginx日志、訪問日志,消息服務(wù)等等,用scala語言編寫,kafka部署包“kafka_2.13-3.6.0”前面的2.13就是scala的版本

1、Kafka的使用場景


日志收集:一個公司可以用Kafka收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等。
用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網(wǎng)頁、搜索、點(diǎn)擊等活動,這些活動信息被各個服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實(shí)時的監(jiān)控分析,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘。
運(yùn)營指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
?

2、Kafka基本概念


kafka是一個分布式的,分區(qū)的消息(官方稱之為commit log)服務(wù)。它提供一個消息系統(tǒng)應(yīng)該具備的功能,但是確有著獨(dú)特的設(shè)計(jì)。可以這樣來說,Kafka借鑒了JMS規(guī)范的思想,但是并沒有完全遵循JMS規(guī)范。

首先,讓我們來看一下基礎(chǔ)的消息(Message)相關(guān)術(shù)語:

名稱?? ?解釋
Broker?? ?消息中間件處理節(jié)點(diǎn),一個Kafka節(jié)點(diǎn)就是一個broker,一個或者多個Broker可以組成一個Kafka集群
Topic?? ?Kafka根據(jù)topic對消息進(jìn)行歸類,發(fā)布到Kafka集群的每條消息都需要指定一個topic
Producer?? ?消息生產(chǎn)者,向Broker發(fā)送消息的客戶端
Consumer?? ?消息消費(fèi)者,從Broker讀取消息的客戶端
ConsumerGroup?? ?每個Consumer屬于一個特定的Consumer Group,一條消息可以被多個不同的Consumer Group消費(fèi),但是一個Consumer Group中只能有一個Consumer能夠消費(fèi)該消息
Partition?? ?物理上的概念,一個topic可以分為多個partition,每個partition內(nèi)部消息是有序的
Replica(副本)?? ?一個 topic 的每個分區(qū)都有若干個副本,一個 Leader 和若干個 Follower
Leader?? ?每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象都是 Leader
Follower?? ?每個分區(qū)多個副本中的“從”,實(shí)時從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時,某個 Follower 會成為新的 Leader。

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK
服務(wù)端(brokers)和客戶端(producer、consumer)之間通信通過TCP協(xié)議來完成。

3、Topic與Partition


在Kafka中,Topic就是一個主題,生產(chǎn)者往topic里面發(fā)送消息,消費(fèi)者從topic里面撈數(shù)據(jù)進(jìn)行消費(fèi)。

假設(shè)現(xiàn)在有一個場景,如果我們現(xiàn)在有100T的數(shù)據(jù)需要進(jìn)行消費(fèi),但是現(xiàn)在我們一臺主機(jī)上面并不能存儲這么多數(shù)據(jù)該怎么辦呢?

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

其實(shí)做法很簡單,就是將海量的數(shù)據(jù)進(jìn)行切割,并且在Topic中添加分區(qū)的概念,每一個分區(qū)都對應(yīng)一臺主機(jī),并且存儲切分到的數(shù)據(jù)

當(dāng)然為了實(shí)現(xiàn)高可用,其實(shí)分區(qū)可以實(shí)現(xiàn)主從架構(gòu),這個后面再了解

這樣做的好處是:

分區(qū)存儲,可以解決一個topic中文件過大無法存儲的問題
提高了讀寫的吞吐量,讀寫可以在多個分區(qū)中同時進(jìn)行
?

4、搭建部署

首先部署java,不再贅述,很簡單,然后去官網(wǎng)下載兩個安裝包

kafka_2.13-3.6.0

apache-zookeeper-3.9.1-bin.tar

(3.0之后kafka自帶zookeeper,也可以省略)

解壓后進(jìn)入conf文件夾

cp zoo_sample.cfg zoo1.cfg#復(fù)制一份zk的配置文件
修改配置文件內(nèi)容

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zkdata #啟動之前需要建好
datalogDir=/data/zklog #啟動之前需要建好
clientPort=2181
autopurge.purgeInterval=24
autopurge.snapRetainCount=3
server.1=192.168.1.1:2888:3888

啟動

bin/zkServer.sh start 
然后
bin/zkServer.sh status
查看狀態(tài)。一定要查看。啟動的時候不論如何都會輸出成功

?解壓kafka文件夾。進(jìn)入config文件夾

修改 server.properties


auto.create.topics.enable=true #配置自動創(chuàng)建topic
delete.topic.enable=true #是否允許刪除主題
broker.id=0 #集群中唯一
listeners=PLAINTEXT://192.168.1.1:9092 #不要填localhost:9092 ,localhost表示只能通過本機(jī)連接,可以設(shè)置為0.0.0.0或本地局域網(wǎng)地址,server接受客戶端連接的端口
num.network.threads=4 #broker處理消息的最大線程數(shù),一般情況下數(shù)量為cpu核數(shù)
num.io.threads=8 #broker處理磁盤IO的線程數(shù),數(shù)值為cpu核數(shù)2倍
socket.send.buffer.bytes=1024000  #socket的發(fā)送緩沖區(qū) 不要太小 避免頻繁操作
socket.receive.buffer.bytes=1024000  # 接收緩沖區(qū) 不要太小
socket.request.max.bytes=1048576000 #socket請求的最大數(shù)值 message.max.bytes必然要小于socket.request.max.bytes,會被topic創(chuàng)建時的指定參數(shù)覆蓋
log.dirs=/data/kafkalog #kafka存放數(shù)據(jù)的路徑。這個路徑并不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當(dāng)創(chuàng)建新partition時,都會選擇在包含最少partitions的路徑下進(jìn)行。
num.partitions=2  #為1的時候不能自動創(chuàng)建topic,創(chuàng)建topic的默認(rèn)分區(qū)數(shù)
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000  #和下面的一起,一些日志存儲的配置,默認(rèn)不清理日志
log.flush.interval.ms=1000
log.retention.hours=24 #每個日志文件刪除之前保存的時間。默認(rèn)數(shù)據(jù)保存時間對所有topic都一樣
log.roll.hours=12
log.cleanup.policy=delete
log.retention.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.1:2181#zookeeper如果是集群,連接方式為 hostname1:port1, hostname2:port2, hostname3:port3
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

初次啟動可以不后臺

bin/kafka-server-start.sh -daemon config/server.properties
沒問題就放后臺
bin/kafka-server-start.sh -daemon config/server.properties

?5、Kafka核心概念之Topic

Kafka中,Topic是一個非常重要的概念,topic可以實(shí)現(xiàn)消息的分類,不同消費(fèi)者訂閱不同的topic

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

partition(分區(qū))是kafka的一個核心概念,kafka將1個topic分成了一個或多個分區(qū),每個分區(qū)在物理上對應(yīng)一個目錄
分區(qū)目錄下存儲的是該分區(qū)的日志段(segment),包括日志的數(shù)據(jù)文件和兩個索引文件

執(zhí)行以下命令創(chuàng)建名為test的topic,這個topic只有一個partition,并且備份因子也設(shè)置為1:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1

查看當(dāng)前kafka內(nèi)有哪些topic

./kafka-topics.sh --bootstrap-server localhost:9092 --list 

kafka自帶了一個producer命令客戶端,可以從本地文件中讀取內(nèi)容,或者我們也可以以命令行中直接輸入內(nèi)容,并將這些內(nèi)容以消息的形式發(fā)送到kafka集群中。

在默認(rèn)情況下,每一個行會被當(dāng)做成一個獨(dú)立的消息。使用kafka的發(fā)送消息的客戶端,指定發(fā)送到的kafka服務(wù)器地址和topic

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

對于consumer,kafka同樣也攜帶了一個命令行客戶端,會將獲取到內(nèi)容在命令中進(jìn)行輸出,默認(rèn)是消費(fèi)最新的消息。使用kafka的消費(fèi)者消息的客戶端,從指定kafka服務(wù)器的指定topic中消費(fèi)消息

方式一:從最后一條消息的偏移量+1開始消費(fèi)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test


方式二:從頭開始消費(fèi)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

幾個注意點(diǎn):

  • 消息會被存儲
  • 消息是順序存儲
  • 消息是有偏移量的
  • 消費(fèi)時可以指明偏移量進(jìn)行消費(fèi)

在上面我們展示了兩種不同的消費(fèi)方式,根據(jù)偏移量消費(fèi)和從頭開始消費(fèi),其實(shí)這個偏移量可以我們自己進(jìn)行維護(hù)

我們進(jìn)入我們在server.properties里面配置的日志文件地址/data/kafkalog

我們可以看到默認(rèn)一共有五十個偏移量地址,里面就記錄了當(dāng)前消費(fèi)的偏移量。
我們先關(guān)注test-0這個文件
kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

?我們進(jìn)入這個kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK文件,可以看到其中有個log文件,里面就保存了Topic發(fā)送的數(shù)據(jù)

生產(chǎn)者將消息發(fā)送給broker,broker會將消息保存在本地的日志文件中

/data/kafkalog/主題-分區(qū)/00000000.log
  • 消息的保存是有序的,通過offset偏移量來描述消息的有序性

  • 消費(fèi)者消費(fèi)消息時也是通過offset來描述當(dāng)前要消費(fèi)的那條消息的位置

?kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

?6、?單播消息


我們現(xiàn)在假設(shè)有一個場景,有一個生產(chǎn)者,兩個消費(fèi)者,問:生產(chǎn)者發(fā)送消息,是否會同時被兩個消費(fèi)者消費(fèi)?

創(chuàng)建一個topic

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2 --partitions 1


創(chuàng)建一個生產(chǎn)者

./kafka-console-producer.sh --broker-list localhost:9092 --topic test2


分別在兩個終端上面創(chuàng)建兩個消費(fèi)者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK


這里就要引申出一個概念:消費(fèi)組,當(dāng)我們配置多個消費(fèi)者在一個消費(fèi)組里面的時候,其實(shí)只會有一個消費(fèi)者進(jìn)行消費(fèi)

這樣其實(shí)才符合常理,畢竟一條消息被消費(fèi)一次就夠了

我們可以通過命令--consumer-property group.id=testGroup在設(shè)置消費(fèi)者時將其劃分到一個消費(fèi)組里面

./kafka-console-consumer.sh --bootstrap-server localhost:9092 ?--consumer-property group.id=testGroup --topic test2

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK
這個時候,如果消費(fèi)組里面有一個消費(fèi)者掛掉了,就會由其他消費(fèi)者來進(jìn)行消費(fèi)

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

小結(jié)一下:兩個消費(fèi)者在同一個組,只有一個能接到消息,兩個在不同組或者未指定組則都能收到

7、多播消息


當(dāng)多個消費(fèi)組同時訂閱一個Topic時,那么不同的消費(fèi)組中只有一個消費(fèi)者能收到消息。實(shí)際上也是多個消費(fèi)組中的多個消費(fèi)者收到了同一個消息

// 消費(fèi)組1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 ?--consumer-property group.id=testGroup1 --topic test2
// 消費(fèi)組2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 ?--consumer-property group.id=testGroup2 --topic test2

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

8、查看消費(fèi)組的詳細(xì)信息


通過以下命令可以查看到消費(fèi)組的詳細(xì)信息:

# 查看當(dāng)前所有的消費(fèi)組
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看指定消費(fèi)組具體信息,比如當(dāng)前偏移量、最后一條消息的偏移量、堆積的消息數(shù)量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

9、創(chuàng)建分區(qū)


我們在上面已經(jīng)了解了Topic與Partition的概念,現(xiàn)在我們可以通過以下命令給一個topic創(chuàng)建多個分區(qū)

# 創(chuàng)建兩個分區(qū)的主題
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test3 --partitions 2
# 查看下創(chuàng)建的topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list?


現(xiàn)在我們再進(jìn)到日志文件中看一眼,可以看到日志是以分區(qū)來命名的

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK
我們知道分區(qū)文件中

00000.log: 這個文件中保存的就是消息

__consumer_offsets-49:

kafka內(nèi)部自己創(chuàng)建了__consumer_offsets主題包含了50個分區(qū)。這個主題用來存放消費(fèi)者消費(fèi)某個主題的偏移量。因?yàn)槊總€消費(fèi)者都會自己維護(hù)著消費(fèi)的主題的偏移量,也就是說每個消費(fèi)者會把消費(fèi)的主題的偏移量自主上報(bào)給kafka中的默認(rèn)主題:consumer_offsets。
因此kafka為了提升這個主題的并發(fā)性,默認(rèn)設(shè)置了50個分區(qū)。

提交到哪個分區(qū):通過hash函數(shù):hash(consumerGroupId) % __consumer_offsets主題的分區(qū)數(shù)

提交到該主題中的內(nèi)容是:key是consumerGroupId + topic + 分區(qū)號,value就是當(dāng)前offset的值

文件中保存的消息,根據(jù)log.retention.hour這個參數(shù)確定。到期后消息會被刪除。

10、副本的概念

在創(chuàng)建主題時,除了指明了主題的分區(qū)數(shù)以外,還指明了副本數(shù),那么副本是一個什么概念呢?

我們現(xiàn)在創(chuàng)建一個主題、兩個分區(qū)、三個副本的topic(注意:副本只有在集群下才有意義)

./kafka-topics.sh \
--bootstrap-server localhost:9092 \ # 指定啟動的機(jī)器
--create --topic my-replicated-topic \ # 創(chuàng)建一個topic
--partitions 2 \ # 設(shè)置分區(qū)數(shù)為2
--replication-factor 3  # 設(shè)置副本數(shù)為3 
# 查看topic情況
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK
?

leader

kafka的寫和讀的操作,都發(fā)生在leader上。leader負(fù)責(zé)把數(shù)據(jù)同步給follower。當(dāng)leader掛了,經(jīng)過主從選舉,從多個follower中選舉產(chǎn)生一個新的leader

follower

接收leader的同步的數(shù)據(jù)

isr

可以同步和已同步的節(jié)點(diǎn)會被存入到isr集合中。這里有一個細(xì)節(jié):如果isr中的節(jié)點(diǎn)性能較差,會被提出isr集合。

此時,broker、主題、分區(qū)、副本 這些概念就全部展現(xiàn)了

  • 集群中有多個broker
  • 創(chuàng)建主題時可以指明主題有多個分區(qū)(把消息拆分到不同的分區(qū)中存儲)
  • 可以為分區(qū)創(chuàng)建多個副本,不同的副本存放在不同的broker里

11、集群消費(fèi)

向集群發(fā)送消息
./kafka-console-producer.sh --broker-list node1:9092,node1:9093,node1:9094 --topic my-replicated-topic

從集群中消費(fèi)消息
./kafka-console-consumer.sh --bootstrap-server liang:9092,dd1:9092,dd2:9092 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

指定消費(fèi)組來消費(fèi)消息
./kafka-console-consumer.sh --bootstrap-server node1:9092,node1:9093,node1:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic


這里有一個細(xì)節(jié),結(jié)合上面的單播消息我們很容易可以想到下面的這種情況,因?yàn)橐粋€Partition只能被一個consumer Group里面的一個consumer,所有很容易就可以形成組內(nèi)單播的現(xiàn)象,即:

多Partition與多consumer一一對應(yīng)
這樣的好處是:

分區(qū)存儲,可以解決一個topic中文件過大無法存儲的問題
提高了讀寫的吞吐量,讀寫可以在多個分區(qū)中同時進(jìn)行

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

Kafka這種通過分區(qū)與分組進(jìn)行并行消費(fèi)的方式,讓kafka擁有極大的吞吐量

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

小結(jié)一下:

一個partition只能被一個消費(fèi)組中的一個消費(fèi)者消費(fèi),目的是為了保證消費(fèi)的順序性,但是多個partion的多個消費(fèi)者消費(fèi)的總的順序性是得不到保證的,那怎么做到消費(fèi)的總順序性呢?這個后面揭曉答案

partition的數(shù)量決定了消費(fèi)組中消費(fèi)者的數(shù)量,建議同一個消費(fèi)組中消費(fèi)者的數(shù)量不要超過partition的數(shù)量,否則多的消費(fèi)者消費(fèi)不到消息

如果消費(fèi)者掛了,那么會觸發(fā)rebalance機(jī)制(后面介紹),會讓其他消費(fèi)者來消費(fèi)該分區(qū)

kafka通過partition 可以保證每條消息的原子性,但是不會保證每條消息的順序性
?

12、生產(chǎn)者核心概念


在消息發(fā)送的過程中,涉及到了兩個線程

main 線程
Sender 線程
在 main 線程中創(chuàng)建了一個雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator, Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker

在main線程中,消息的生產(chǎn),要經(jīng)歷攔截器、序列化器和分區(qū)器,其中一個分區(qū)就會創(chuàng)建一個隊(duì)列,這樣方便數(shù)據(jù)的管理

其中隊(duì)列默認(rèn)是32M,而存放到隊(duì)列里面的數(shù)據(jù)也會經(jīng)過壓縮為16k再發(fā)往send線程進(jìn)行發(fā)送,但是這樣也會有問題,就是如果只有一條消息,難道就不發(fā)送了嗎?其實(shí)還有一個參數(shù)linger.ms,用來表示一條消息如果超過這個時間就會直接發(fā)送,不用管大小,其實(shí)可以類比坐車的場景,人滿或者時間到了 都發(fā)車

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

send線程發(fā)送給kafka集群的時候,我們需要聯(lián)系到上面的Topic與Partition已經(jīng)消費(fèi)組,形成一個Partition對應(yīng)consumer Group里面的一個consumer這種組內(nèi)單播的效果,進(jìn)行并發(fā)讀寫
?

13、ack的概念

在同步發(fā)送的前提下,生產(chǎn)者在獲得集群返回的ack之前會一直阻塞。那么集群什么時候返回ack呢?此時ack有3個配置:

ack = 0 kafka-cluster不需要任何的broker收到消息,就立即返回ack給生產(chǎn)者,最容易丟消息的,效率是最高的

ack=1(默認(rèn)): 多副本之間的leader已經(jīng)收到消息,并把消息寫入到本地的log中,才會返回ack給生產(chǎn)者,性能和安全性是最均衡的s

ack=-1/all。里面有默認(rèn)的配置min.insync.replicas=2(默認(rèn)為1,推薦配置大于等于2),此時就需要leader和一個follower同步完后,才會返回ack給生產(chǎn)者(此時集群中有2個broker已完成數(shù)據(jù)的接收),這種方式最安全,但性能最差。
?

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

  • kafka默認(rèn)會創(chuàng)建一個消息緩沖區(qū),用來存放要發(fā)送的消息,緩沖區(qū)是32m
  • kafka本地線程會去緩沖區(qū)中一次拉16k的數(shù)據(jù),發(fā)送到broker
  • 如果線程拉不到16k的數(shù)據(jù),間隔10ms也會將已拉到的數(shù)據(jù)發(fā)到broker

13、kafka集群中的controller、rebalance、HW

什么是controller呢?其實(shí)就是集群中的一個broker,當(dāng)集群中的leader掛掉時需要controller來組織進(jìn)行選舉

那么集群中誰來充當(dāng)controller呢?

每個broker啟動時會向zk創(chuàng)建一個臨時序號節(jié)點(diǎn),獲得的序號最小的那個broker將會作為集群中的controller,負(fù)責(zé)這么幾件事:

當(dāng)集群中有一個副本的leader掛掉,需要在集群中選舉出一個新的leader,選舉的規(guī)則是從isr集合中最左邊獲得
當(dāng)集群中有broker新增或減少,controller會同步信息給其他broker
當(dāng)集群中有分區(qū)新增或減少,controller會同步信息給其他broker


rebalance機(jī)制
前提:消費(fèi)組中的消費(fèi)者沒有指明分區(qū)來消費(fèi)

觸發(fā)的條件:當(dāng)消費(fèi)組中的消費(fèi)者和分區(qū)的關(guān)系發(fā)生變化的時候

分區(qū)分配的策略:在rebalance之前,分區(qū)怎么分配會有這么三種策略

range:根據(jù)公式計(jì)算得到每個消費(fèi)者消費(fèi)哪幾個分區(qū):第一個消費(fèi)者是(分區(qū)總數(shù) / 消費(fèi)者數(shù)量 )+1,之后的消費(fèi)者是分區(qū)總數(shù)/消費(fèi)者數(shù)量(假設(shè) n=分區(qū)數(shù)/消費(fèi)者數(shù)量 = 2, m=分區(qū)數(shù)%消費(fèi)者數(shù)量 = 1,那么前 m 個消費(fèi)者每個分配 n+1 個分區(qū),后面的(消費(fèi)者數(shù)量-m )個消費(fèi)者每個分配 n 個分區(qū))
輪詢:大家輪著來
sticky:粘合策略,如果需要rebalance,會在之前已分配的基礎(chǔ)上調(diào)整,不會改變之前的分配情況。如果這個策略沒有開,那么就要進(jìn)行全部的重新分配。建議開啟

HW和LEO
LEO是某個副本最后消息的消息位置(log-end-offset)

HW是已完成同步的位置。消息在寫入broker時,且每個broker完成這條消息的同步后,hw才會變化。在這之前消費(fèi)者是消費(fèi)不到這條消息的。在同步完成之后,HW更新之后,消費(fèi)者才能消費(fèi)到這條消息,這樣的目的是防止消息的丟失。

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

14、Kafka中的優(yōu)化問題

如何防止消息丟失


生產(chǎn)者:
1)使用同步發(fā)送
2)把a(bǔ)ck設(shè)成1或者all,并且設(shè)置同步的分區(qū)數(shù)>=2
消費(fèi)者:把自動提交改成手動提交

如何防止重復(fù)消費(fèi)


在防止消息丟失的方案中,如果生產(chǎn)者發(fā)送完消息后,因?yàn)榫W(wǎng)絡(luò)抖動,沒有收到ack,但實(shí)際上broker已經(jīng)收到了。

此時生產(chǎn)者會進(jìn)行重試,于是broker就會收到多條相同的消息,而造成消費(fèi)者的重復(fù)消費(fèi)。

怎么解決:

生產(chǎn)者關(guān)閉重試:會造成丟消息(不建議)

消費(fèi)者解決非冪等性消費(fèi)問題:

所謂的冪等性:多次訪問的結(jié)果是一樣的。對于rest的請求(get(冪等)、post(非冪等)、put(冪等)、delete(冪等))

解決方案:

在數(shù)據(jù)庫中創(chuàng)建聯(lián)合主鍵,防止相同的主鍵 創(chuàng)建出多條記錄
使用分布式鎖,以業(yè)務(wù)id為鎖。保證只有一條記錄能夠創(chuàng)建成功

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK

如何做到消息的順序消費(fèi)

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK
其實(shí)我們知道在發(fā)送消息的時候我們可以通過設(shè)置key來指定發(fā)送的分區(qū),所以首先我們一定要指定key然后發(fā)到同一個分區(qū)

生產(chǎn)者:使用同步的發(fā)送,并且通過設(shè)置key指定路由策略,只發(fā)送到一個分區(qū)中;ack設(shè)置成非0的值。
消費(fèi)者:主題只能設(shè)置一個分區(qū),消費(fèi)組中只能有一個消費(fèi)者;不要設(shè)置異步線程防止異步導(dǎo)致的亂序,或者設(shè)置一個阻塞隊(duì)列進(jìn)行異步消費(fèi)
kafka的順序消費(fèi)使用場景不多,因?yàn)闋奚袅诵阅?,但是比如rocketmq在這一塊有專門的功能已設(shè)計(jì)好。

如何解決消息積壓問題

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK


1)消息積壓問題的出現(xiàn)
消息的消費(fèi)者的消費(fèi)速度遠(yuǎn)趕不上生產(chǎn)者的生產(chǎn)消息的速度,導(dǎo)致kafka中有大量的數(shù)據(jù)沒有被消費(fèi)。隨著沒有被消費(fèi)的數(shù)據(jù)堆積越多,消費(fèi)者尋址的性能會越來越差,最后導(dǎo)致整個kafka對外提供的服務(wù)的性能很差,從而造成其他服務(wù)也訪問速度變慢,造成服務(wù)雪崩。

2)消息積壓的解決方案
在這個消費(fèi)者中,使用多線程,充分利用機(jī)器的性能進(jìn)行消費(fèi)消息。
通過業(yè)務(wù)的架構(gòu)設(shè)計(jì),提升業(yè)務(wù)層面消費(fèi)的性能。
創(chuàng)建多個消費(fèi)組,多個消費(fèi)者,部署到其他機(jī)器上,一起消費(fèi),提高消費(fèi)者的消費(fèi)速度
創(chuàng)建一個消費(fèi)者,該消費(fèi)者在kafka另建一個主題,配上多個分區(qū),多個分區(qū)再配上多個消費(fèi)者。該消費(fèi)者將poll下來的消息,不進(jìn)行消費(fèi),直接轉(zhuǎn)發(fā)到新建的主題上。此時,新的主題的多個分區(qū)的多個消費(fèi)者就開始一起消費(fèi)了?!怀S?/p>

實(shí)現(xiàn)延時隊(duì)列的效果


1)應(yīng)用場景
訂單創(chuàng)建后,超過30分鐘沒有支付,則需要取消訂單,這種場景可以通過延時隊(duì)列來實(shí)現(xiàn)

2)具體方案

kafka 3.6,ELK,hadoop,kafka,hadoop,kafka,ELK
kafka中創(chuàng)建創(chuàng)建相應(yīng)的主題
消費(fèi)者消費(fèi)該主題的消息(輪詢)
消費(fèi)者消費(fèi)消息時判斷消息的創(chuàng)建時間和當(dāng)前時間是否超過30分鐘(前提是訂單沒支付)
如果是:去數(shù)據(jù)庫中修改訂單狀態(tài)為已取消
如果否:記錄當(dāng)前消息的offset,并不再繼續(xù)消費(fèi)之后的消息。等待1分鐘后,再次向kafka拉取該offset及之后的消息,繼續(xù)進(jìn)行判斷,以此反復(fù)。文章來源地址http://www.zghlxwxcb.cn/news/detail-761879.html

到了這里,關(guān)于深入理解Kafka3.6.0的核心概念,搭建與使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包