基本概念
簡(jiǎn)介
Kafka 最初是由 LinkedIn 即領(lǐng)英公司基于 Scala 和 Java 語(yǔ)言開(kāi)發(fā)的分布式消息發(fā)布-訂閱系統(tǒng),現(xiàn)已捐獻(xiàn)給Apache 軟件基金會(huì)。其具有高吞吐、低延遲的特性,許多大數(shù)據(jù)實(shí)時(shí)流式處理系統(tǒng)比如 Storm、Spark、Flink等都能很好地與之集成。
總的來(lái)講,Kafka 通常具有 3 重角色:
- 存儲(chǔ)系統(tǒng):通常消息隊(duì)列會(huì)把消息持久化到磁盤(pán),防止消息丟失,保證消息可靠性。Kafka 的消息持久化機(jī)制和多副本機(jī)制使其能夠作為通用數(shù)據(jù)存儲(chǔ)系統(tǒng)來(lái)使用。
- 消息系統(tǒng):Kafka 和傳統(tǒng)的消息隊(duì)列比如 RabbitMQ、RocketMQ、ActiveMQ 類似,支持流量削峰、服務(wù)解耦、異步通信等核心功能。 ==》 先進(jìn)先出 ==》 只針對(duì)分區(qū),不是全局的
- 流處理平臺(tái):Kafka 不僅能夠與大多數(shù)流式計(jì)算框架完美整合,并且自身也提供了一個(gè)完整的流式處理庫(kù),即 Kafka Streaming。Kafka Streaming 提供了類似 Flink 中的窗口、聚合、變換、連接等功能。
一句話概括:Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息中間件,在業(yè)界主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)流式計(jì)算領(lǐng)域,起解耦合和削峰填谷的作用。
特點(diǎn)
- 高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒,每個(gè)topic可以分多個(gè)partition, 由多個(gè)consumer group 對(duì)partition進(jìn)行consume操作。
- 可擴(kuò)展性:kafka集群支持熱擴(kuò)展
- 持久性、可靠性:消息被持久化到本地磁盤(pán),并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
- 容錯(cuò)性:允許集群中有節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/li>
- 高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫(xiě)
Kafka在各種應(yīng)用場(chǎng)景中,起到的作用可以歸納為這么幾個(gè)術(shù)語(yǔ):削峰填谷,解耦!
在大數(shù)據(jù)流式計(jì)算領(lǐng)域中,kafka主要作為計(jì)算系統(tǒng)的前置緩存和輸出結(jié)果緩存;
安裝部署
kafka基于Zookeeper, 因此需要先安裝Zookeeper, 詳見(jiàn)https://www.cnblogs.com/paopaoT/p/17461562.html
- 上傳安裝包
- 解壓
tar -zxvf kafka_2.11-2.2.2.tgz tar -C /opt/apps/
- 修改配置文件
# 進(jìn)入配置文件目錄
cd kafka_2.12-2.3.1/config
# 編輯配置文件
vi server.properties
# 為依次增長(zhǎng)的:0、1、2、3、4,集群中唯一 id
broker.id=0
# 數(shù)據(jù)存儲(chǔ)的?錄
log.dirs=/opt/data/kafka
# 底層存儲(chǔ)的數(shù)據(jù)(日志)留存時(shí)長(zhǎng)(默認(rèn)7天)
log.retention.hours=168
# 底層存儲(chǔ)的數(shù)據(jù)(日志)留存量(默認(rèn)1G)
log.retention.bytes=1073741824
# 指定zk集群地址
zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
- 環(huán)境變量
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
- 分發(fā)安裝包
for i in {2..3}
do
scp -r kafka_2.11-2.2.2 linux0$i:$PWD
done
# 安裝包分發(fā)后,記得修改config/server.properties中的 配置參數(shù): broker.id
# 注意:還需要分發(fā)環(huán)境變量
- 啟停集群(在各個(gè)節(jié)點(diǎn)上啟動(dòng))
bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties
# 停止集群
bin/kafka-server-stop.sh
- 一鍵啟停腳本:
#!/bin/bash
case $1 in
"start"){
for i in linux01 linux02 linux03
do
echo ---------- kafka $i 啟動(dòng) ------------
ssh $i "source /etc/profile; /opt/app/kafka2.4.1/bin/kafka-server-start.sh -daemon /opt/app/kafka2.4.1/config/server.properties"
done
};;
"stop"){
for i in linux01 linux02 linux03
do
echo ---------- kafka $i 停止 ------------
ssh $i "source /etc/profile; /opt/app/kafka2.4.1/bin/kafka-server-stop.sh "
done
};;
esac
基本操作
概述
Kafka 中提供了許多命令行工具(位于$KAFKA_HOME/bin 目錄下)用于管理集群的變更。
腳本 | 作用 |
---|---|
kafka-console-producer.sh | 生產(chǎn)消息 |
kafka-topics.sh | 管理主題 |
kafka-server-stop.sh | 關(guān)閉Kafka服務(wù) |
kafka-server-start.sh | 啟動(dòng)Kafka服務(wù) |
kafka-configs.sh | 配置管理 |
kafka-consumer-perf-test.sh | 測(cè)試消費(fèi)性能 |
kafka-producer-perf-test.sh | 測(cè)試生產(chǎn)性能 |
kafka-dump-log.sh | 查看數(shù)據(jù)日志內(nèi)容 |
kafka-preferred-replica-election.sh | 優(yōu)先副本的選舉 |
kafka-reassign-partitions.sh | 分區(qū)重分配 |
管理操作:kafka-topics
創(chuàng)建topic
--bootstrap-server 和 --zookeeper一樣的效果 ,新版本建議使用 --bootstrap-server
kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --create --topic test01 --partitions 3 --replication-factor 3
參數(shù)解釋:
--replication-factor 副本數(shù)量
--partitions 分區(qū)數(shù)量
--topic topic名稱
# 本方式,副本的存儲(chǔ)位置是系統(tǒng)自動(dòng)決定的
# 手動(dòng)指定分配方案:分區(qū)數(shù),副本數(shù),存儲(chǔ)位置
kafka-topics.sh --create --topic tpc-1 --zookeeper linux01:2181 --replica-assignment 0:1:3,1:2:6
該topic,將有如下partition:(2個(gè)分區(qū) 3個(gè)副本)
partition0 ,所在節(jié)點(diǎn): broker0、broker1、broker3
partition1 ,所在節(jié)點(diǎn): broker1、broker2、broker6
# 查看topic的狀態(tài)信息
kafka-topics.sh --describe --topic tpc-1 --zookeeper linux01:2181
Topic: tpc-1 PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: tpc-1 Partition: 0 Leader: 0 Replicas: 0,1,3 Isr: 0,1
Topic: tpc-1 Partition: 1 Leader: 1 Replicas: 1,2,6 Isr: 1,2
查看topic列表
kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --list
kafka-topics.sh --list --zookeeper linux01:2181
__consumer_offsets
tpc-1
查看topic狀態(tài)信息
kafka-topics.sh --describe --zookeeper linux01:2181 --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# topic的分區(qū)數(shù)量,以及每個(gè)分區(qū)的副本數(shù)量,以及每個(gè)副本所在的broker節(jié)點(diǎn),以及每個(gè)分區(qū)的leader副本所在broker節(jié)點(diǎn),以及每個(gè)分區(qū)的ISR副本列表;
# ISR: in sync replica ,同步副同步本(當(dāng)然也包含leader自身,replica.lag.time.max.ms =30000)
# OSR:out of sync replicas 失去同步的副本(該副本上次請(qǐng)求leader同步數(shù)據(jù)距現(xiàn)在的時(shí)間間隔超出配置閾值)
# ISR同步副本列表
# ISR概念:(同步副本)。每個(gè)分區(qū)的leader會(huì)維護(hù)一個(gè)ISR列表,ISR列表里面就是follower副本的Borker編號(hào),只有跟得上Leader的 follower副本才能加入到 ISR里面
# 這個(gè)是通過(guò)replica.lag.time.max.ms =30000(默認(rèn)值)參數(shù)配置的,只有ISR里的成員才有被選為 leader 的可能。
踢出ISR和重新加入ISR的條件:
- 踢出ISR的條件: 由replica.lag.time.max.ms =30000決定,如上圖;
- 重新加入ISR的條件: OSR副本的LEO(log end offset)追上leader的LEO;
刪除topic
bin/kafka-topics.sh --zookeeper linux01:2181 --delete --topic test
# 刪除topic,server.properties中需要一個(gè)參數(shù)處于啟用狀態(tài): delete.topic.enable = true(默認(rèn)是true)
# 使用 kafka-topics .sh 腳本刪除主題的行為本質(zhì)上只是在 ZooKeeper 中的 /admin/delete_topics 路徑下建一個(gè)與待刪除主題同名的節(jié)點(diǎn),以標(biāo)記該主題為待刪除的狀態(tài)。然后由 Kafka控制器異步完成。
增加分區(qū)數(shù)
kafka-topics.sh --zookeeper linux01:2181 --alter --topic paopao --partitions 3
# Kafka只支持增加分區(qū),不支持減少分區(qū)
# 原因是:減少分區(qū),代價(jià)太大(數(shù)據(jù)的轉(zhuǎn)移,日志段拼接合并)
# 如果真的需要實(shí)現(xiàn)此功能,則完全可以重新創(chuàng)建一個(gè)分區(qū)數(shù)較小的主題,然后將現(xiàn)有主題中的消息按照既定的邏輯復(fù)制過(guò)去;
動(dòng)態(tài)配置topic參數(shù)(不常用)
# 通過(guò)管理命令,可以為已創(chuàng)建的topic增加、修改、刪除topic level參數(shù)
# 添加/修改 指定topic的配置參數(shù):
kafka-topics.sh --zookeeper linux01:2181 --alter --topic tpc2 --config compression.type=gzip
# --config compression.type=gzip 修改或添加參數(shù)配置
# --add-config compression.type=gzip 添加參數(shù)配置
# --delete-config compression.type 刪除配置參數(shù)
生產(chǎn)者:kafka-console-producer
kafka-console-producer.sh --broker-list linux01:9092 --topic test01
>a
>b
>c
>hello
>hi
>hadoop
>hive
順序輪詢(老版本)
順序分配,消息是均勻的分配給每個(gè) partition,即每個(gè)分區(qū)存儲(chǔ)一次消息,輪詢策略是 Kafka Producer 提供的默認(rèn)策略,如果你不使用指定的輪詢策略的話,Kafka 默認(rèn)會(huì)使用順序輪訓(xùn)策略的方式。
隨機(jī)分配
實(shí)現(xiàn)隨機(jī)分配的代碼只需要兩行,如下
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
消費(fèi)者:kafka-console-consumer
消費(fèi)者在消費(fèi)的時(shí)候,需要指定要訂閱的主題,還可以指定消費(fèi)的起始偏移量
起始偏移量的指定策略有3中:
- earliest 起始點(diǎn)
- latest 最新
- 指定的offset( 分區(qū)號(hào):偏移量) ==》 必須的告訴他是哪個(gè)topic 的哪個(gè)分區(qū)的哪個(gè)offset
- 從之前所記錄的偏移量開(kāi)始消費(fèi)
在命令行中,可以指定從什么地方開(kāi)始消費(fèi)
- 加上參數(shù) --from-beginning 指定從最前面開(kāi)始消費(fèi)
- 如果不加--from-beginning 就需要分情況討論了,如果之前記錄過(guò)消費(fèi)的位置,那么就從之前消費(fèi)的位置開(kāi)始消費(fèi),如果說(shuō)之前沒(méi)有記錄過(guò)之前消費(fèi)的偏移量,那么就從最新的位置開(kāi)始消費(fèi)
kafka的topic中的消息,是有序號(hào)的(序號(hào)叫消息偏移量),而且消息的偏移量是在各個(gè)partition中獨(dú)立維護(hù)的,在各個(gè)分區(qū)內(nèi),都是從0開(kāi)始遞增編號(hào)!
# 消費(fèi)消息
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test01 --from-beginning
hive
hello
hadoop
# 指定從最前面開(kāi)始消費(fèi)
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --from-beginning
hadoop
list
hello
kafka
# 不指定他消費(fèi)的位置的時(shí)候,就是從最新的地方開(kāi)始消費(fèi)
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao
# 指定要消費(fèi)的分區(qū),和要消費(fèi)的起始o(jì)ffset
# 從指定的offset(需要指定偏移量和分區(qū)號(hào))
kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --offset 2 --partition 0
yy
abc
3333
2222
消費(fèi)組
- 消費(fèi)組是kafka為了提高消費(fèi)并行度的一種機(jī)制!
- 在kafka的底層邏輯中,任何一個(gè)消費(fèi)者都有自己所屬的組(如果沒(méi)有指定,系統(tǒng)會(huì)自己給你分配一個(gè)組id)
- 組和組之間,沒(méi)有任何關(guān)系,大家都可以消費(fèi)到目標(biāo)topic的所有數(shù)據(jù)
- 但是組內(nèi)的各個(gè)消費(fèi)者,就只能讀到自己所分配到的partitions
- KAFKA中的消費(fèi)組,可以動(dòng)態(tài)增減消費(fèi)者,而且消費(fèi)組中的消費(fèi)者數(shù)量發(fā)生任意變動(dòng),都會(huì)重新分配分區(qū)消費(fèi)任務(wù)(消費(fèi)者組在均衡策略)
如何讓多個(gè)消費(fèi)者組成一個(gè)組: 就是讓這些消費(fèi)者的groupId相同即可!
消費(fèi)位移的記錄
kafka的消費(fèi)者,可以記錄自己所消費(fèi)到的消息偏移量,記錄的這個(gè)偏移量就叫(消費(fèi)位移);
記錄這個(gè)消費(fèi)到的位置,作用就在于消費(fèi)者重啟后可以接續(xù)上一次消費(fèi)到位置來(lái)繼續(xù)往后面消費(fèi);
消費(fèi)位移,是組內(nèi)共享的?。。∠M(fèi)位置記錄在一個(gè)內(nèi)置的topic中 ,默認(rèn)是5s提交一次位移更新。
參數(shù):auto.commit.interval.ms 默認(rèn)是5s記錄一次
# 可以使用特定的工具類 解析內(nèi)置記錄偏移量的topic
kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
# 通過(guò)指定formatter工具類,來(lái)對(duì)__consumer_offsets主題中的數(shù)據(jù)進(jìn)行解析;
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889851318, expireTimestamp=None)
[g01,linux01,2]::OffsetAndMetadata(offset=17, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,1]::OffsetAndMetadata(offset=13, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
[g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
# 如果需要獲取某個(gè)特定 consumer-group的消費(fèi)偏移量信息,則需要計(jì)算該消費(fèi)組的偏移量記錄所在分區(qū): Math.abs(groupID.hashCode()) % numPartitions(50)
# 根據(jù)組id的hash取值%50 確定具體是將這個(gè)組具體每個(gè)分區(qū)消費(fèi)到了哪里
# __consumer_offsets的分區(qū)數(shù)為:50
配置管理 kafka-config
kafka-configs.sh 腳本是專門(mén)用來(lái)進(jìn)行動(dòng)態(tài)參數(shù)配置操作的,這里的操作是運(yùn)行狀態(tài)修改原有的配置,如此可以達(dá)到動(dòng)態(tài)變更的目的;一般情況下不會(huì)進(jìn)行動(dòng)態(tài)修改 。
動(dòng)態(tài)配置的參數(shù),會(huì)被存儲(chǔ)在zookeeper上,因而是持久生效的
可用參數(shù)的查閱地址: https://kafka.apache.org/documentation/#configuration
# kafka-configs.sh 腳本包含:變更alter、查看describe 這兩種指令類型;
# kafka-configs. sh 支持主題、 broker 、用戶和客戶端這4個(gè)類型的配置。
# kafka-configs.sh 腳本使用 entity-type 參數(shù)來(lái)指定操作配置的類型,并且使 entity-name參數(shù)來(lái)指定操作配置的名稱。
# 比如查看topic的配置可以按如下方式執(zhí)行:
kafka-configs.sh --zookeeper linux01:2181 --describe --entity-type topics --entity-name paopao
# 查看broker的動(dòng)態(tài)配置可以按如下方式執(zhí)行:
kafka-configs.sh --describe --entity-type brokers --entity-name 0 --zookeeper linux01:2181
entity-type和entity-name的對(duì)應(yīng)關(guān)系
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-474589.html
# 示例:添加topic級(jí)別參數(shù)
kafka-configs.sh --zookeeper linux01:2181 --alter --entity-type topics --entity-name paopao --add-config cleanup.policy=compact,max.message.bytes=10000
# 示例:添加broker參數(shù)
kafka-configs.sh --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server linux01:9092,linux02:9092,linux03:9092
動(dòng)態(tài)配置topic參數(shù)
通過(guò)管理命令,可以為已創(chuàng)建的topic增加、修改、刪除topic level參數(shù)
添加/修改 指定topic的配置參數(shù):文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-474589.html
kafka-topics.sh --topic paopao --alter --config compression.type=gzip --zookeeper linux01:2181
# 如果利用 kafka-configs.sh 腳本來(lái)對(duì)topic、producer、consumer、broker等進(jìn)行參數(shù)動(dòng)態(tài)
# 添加、修改配置參數(shù)
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --add-config compression.type=gzip
# 刪除配置參數(shù)
kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --delete-config compression.type
到了這里,關(guān)于kafka的安裝和基本操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!