一、概述
Kafka是由Apache基金會(huì)開(kāi)發(fā)的分布式流處理平臺(tái),采用發(fā)布-訂閱模式,支持高吞吐量、低延遲的數(shù)據(jù)傳輸。主要用于處理實(shí)時(shí)數(shù)據(jù)管道、數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)分析等大數(shù)據(jù)應(yīng)用場(chǎng)景。Kafka采用高效的數(shù)據(jù)壓縮算法,可以在集群中存儲(chǔ)大量的數(shù)據(jù),并通過(guò)分區(qū)機(jī)制來(lái)實(shí)現(xiàn)數(shù)據(jù)的高可靠性和可擴(kuò)展性。Kafka常用于以下場(chǎng)景:
-
數(shù)據(jù)管道:在數(shù)據(jù)采集和分發(fā)過(guò)程中構(gòu)建可擴(kuò)展的流式數(shù)據(jù)管道,用于實(shí)時(shí)數(shù)據(jù)處理和分析。例如,數(shù)據(jù)收集、日志聚合、網(wǎng)絡(luò)追蹤、用戶(hù)活動(dòng)跟蹤等。
-
數(shù)據(jù)存儲(chǔ):將Kafka作為持久化存儲(chǔ)來(lái)存儲(chǔ)大量的數(shù)據(jù),以便用于后續(xù)的批量處理和離線(xiàn)分析,例如數(shù)據(jù)挖掘、機(jī)器學(xué)習(xí)等應(yīng)用場(chǎng)景。
-
實(shí)時(shí)流處理:通過(guò)將Kafka與追求低延遲的流式處理平臺(tái),例如Apache Storm、Apache Samza和Apache Flink等相結(jié)合,可以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)處理和分析。這是許多實(shí)時(shí)數(shù)據(jù)分析和日志處理需求的主要場(chǎng)景。
-
系統(tǒng)日志跟蹤:通過(guò)Kafka將來(lái)自不同系統(tǒng)的日志數(shù)據(jù)統(tǒng)一收集和存儲(chǔ),便于進(jìn)行統(tǒng)一的日志分析和事件跟蹤,在軟件開(kāi)發(fā)過(guò)程中可以快速定位和解決問(wèn)題。
總之,Kafka是高性能、可靠、可擴(kuò)展的分布式流處理平臺(tái),可用于實(shí)時(shí)數(shù)據(jù)管道、數(shù)據(jù)存儲(chǔ)、實(shí)時(shí)流處理和日志跟蹤等多個(gè)領(lǐng)域。它已被廣泛應(yīng)用于各種大數(shù)據(jù)場(chǎng)景,并成為了大數(shù)據(jù)架構(gòu)中的一個(gè)重要組成部分。
這里只是講解kafka容器快速部署,用于測(cè)試和學(xué)習(xí)作用,生成不建議使用容器部署,想了解更多的kafka知識(shí)點(diǎn)可參考我這篇文章:Kafka原理介紹+安裝+基本操作
二、前期準(zhǔn)備
1)部署 docker
# 安裝yum-config-manager配置工具
yum -y install yum-utils
# 建議使用阿里云yum源:(推薦)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安裝docker-ce版本
yum install -y docker-ce
# 啟動(dòng)并開(kāi)機(jī)啟動(dòng)
systemctl enable --now docker
docker --version
2)部署 docker-compose
curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version
三、創(chuàng)建網(wǎng)絡(luò)
# 創(chuàng)建,注意不能使用hadoop_network,要不然啟動(dòng)hs2服務(wù)的時(shí)候會(huì)有問(wèn)題?。?!
docker network create hadoop-network
# 查看
docker network ls
四、安裝 Zookeeper
Zookeeper在Kafka中扮演重要的角色,主要用于管理Kafka集群的元數(shù)據(jù)和實(shí)現(xiàn)Kafka集群的協(xié)調(diào)和管理。在Kafka集群中,Zookeeper主要有以下作用:
-
配置管理:Kafka集群的配置信息存儲(chǔ)在ZK節(jié)點(diǎn)中,包括Kafka Broker的配置信息、Topic的分區(qū)信息、消費(fèi)者和生產(chǎn)者的相關(guān)配置等。Kafka可以通過(guò)ZK感知集群狀態(tài)的變化,并自動(dòng)重新分配Topic的分區(qū)和對(duì)應(yīng)的Broker。
-
Broker控制:Kafka集群中的所有Broker都連接到ZK中。ZK維護(hù)了所有活動(dòng)Broker的列表和狀態(tài)信息,包括Leader、Follower等信息。如果某個(gè)Broker出現(xiàn)故障,ZK可以自動(dòng)感知它的下線(xiàn),并通知集群中的其他Broker重新分配Leader。
-
分布式鎖:Zookeeper提供群眾同步的機(jī)制,使得多個(gè)Kafka Broker的協(xié)調(diào)和管理變得可行。Kafka中的一些操作需要集群中的所有Broker都達(dá)成一致意見(jiàn),因此需要使用ZK協(xié)調(diào)器的分布式鎖機(jī)制來(lái)維護(hù)這些操作的一致性,并防止數(shù)據(jù)的意外損壞。
Zookeeper快速部署教程可參考我上一篇文章:【中間件】通過(guò) docker-compose 快速部署 Zookeeper 保姆級(jí)教程
總之,Zookeeper在Kafka集群中發(fā)揮了重要的角色,它管理著Kafka的發(fā)布/訂閱機(jī)制、Broker狀態(tài)信息、Topic的元數(shù)據(jù)信息等,使得Kafka集群的分布式協(xié)同和協(xié)調(diào)變得可能。在Kafka寫(xiě)操作(生產(chǎn)者或管理員在Kafka生產(chǎn)或維護(hù)上修改了配置)上,ZK用于協(xié)作鎖定。在Kafka讀操作(消費(fèi)者將訂閱的主題分區(qū)元數(shù)據(jù)讀取到kafka消費(fèi)者中)上,ZK用于協(xié)作。
五、Kafka 編排部署
1)下載 Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz --no-check-certificate
注意還需要java環(huán)境,可以去官網(wǎng)下載,也可以在我下面提供的地址下載:
鏈接: https://pan.baidu.com/s/1o_z3t16v0eASYWN4VcjYeg?pwd=kuac 提取碼:
kuac
2)配置
config/kafka-node1/server.properties
# 常見(jiàn)配置掛載目錄
mkdir config/{kafka-node1,kafka-node2,kafka-node3} -p
# 配置
cat >config/kafka-node1/server.properties<<EOF
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=1
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán)IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲(chǔ)位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分區(qū)數(shù)量,這里設(shè)置為3。 默認(rèn)只有一個(gè)分區(qū),設(shè)置多分區(qū)可以支持并發(fā)讀寫(xiě)和負(fù)載均衡
num.partitions=3
#副本,默認(rèn)只有一個(gè)副本,不會(huì)進(jìn)行數(shù)據(jù)備份和冗余
replication.factor=3
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=60000
EOF
config/kafka-node2/server.properties
cat >config/kafka-node2/server.properties<<EOF
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=2
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán)IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲(chǔ)位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分區(qū)數(shù)量,這里設(shè)置為3。 默認(rèn)只有一個(gè)分區(qū),設(shè)置多分區(qū)可以支持并發(fā)讀寫(xiě)和負(fù)載均衡
num.partitions=3
#副本,默認(rèn)只有一個(gè)副本,不會(huì)進(jìn)行數(shù)據(jù)備份和冗余
replication.factor=3
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=60000
EOF
config/kafka-node3/server.properties
cat >config/kafka-node3/server.properties<<EOF
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=3
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán)IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲(chǔ)位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分區(qū)數(shù)量,這里設(shè)置為3。 默認(rèn)只有一個(gè)分區(qū),設(shè)置多分區(qū)可以支持并發(fā)讀寫(xiě)和負(fù)載均衡
num.partitions=3
#副本,默認(rèn)只有一個(gè)副本,不會(huì)進(jìn)行數(shù)據(jù)備份和冗余
replication.factor=3
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=60000
EOF
3)啟動(dòng)腳本 bootstrap.sh
#!/usr/bin/env sh
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
4)構(gòu)建鏡像 Dockerfile
FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos:7.7.1908
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
# 創(chuàng)建用戶(hù)和用戶(hù)組,跟yaml編排里的user: 10000:10000
RUN groupadd --system --gid=10000 hadoop && useradd --system --home-dir /home/hadoop --uid=10000 --gid=hadoop hadoop -m
# 安裝sudo
RUN yum -y install sudo ; chmod 640 /etc/sudoers
# 給hadoop添加sudo權(quán)限
RUN echo "hadoop ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
RUN yum -y install install net-tools telnet wget nc less
RUN mkdir /opt/apache/
# 添加配置 JDK
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH
# 添加配置 kafka server
ENV KAFKA_VERSION 2.12-3.4.0
ADD kafka_${KAFKA_VERSION}.tgz /opt/apache/
ENV KAFKA_HOME /opt/apache/kafka
RUN ln -s /opt/apache/kafka_${KAFKA_VERSION}-bin $KAFKA_HOME
# 創(chuàng)建數(shù)據(jù)存儲(chǔ)目錄
RUN mkdir -p ${KAFKA_HOME}/data/logs
# copy bootstrap.sh
COPY bootstrap.sh /opt/apache/
RUN chmod +x /opt/apache/bootstrap.sh
RUN chown -R hadoop:hadoop /opt/apache
WORKDIR $KAFKA_HOME
開(kāi)始構(gòu)建鏡像
# 需要查看構(gòu)建鏡像詳細(xì)過(guò)程則需要加上 --progress=plain 選項(xiàng)
docker build -t registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0 . --no-cache --progress=plain
# 為了方便小伙伴下載即可使用,我這里將鏡像文件推送到阿里云的鏡像倉(cāng)庫(kù)
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
### 參數(shù)解釋
# -t:指定鏡像名稱(chēng)
# . :當(dāng)前目錄Dockerfile
# -f:指定Dockerfile路徑
# --no-cache:不緩存
5)編排 docker-compose.yaml
version: '3'
services:
kafka-node1:
image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
user: "hadoop:hadoop"
container_name: kafka-node1
hostname: kafka-node1
restart: always
privileged: true
env_file:
- .env
volumes:
- ./config/kafka-node1/server.properties:${KAFKA_HOME}/config/server.properties
ports:
- "${KAFKA_NODE1_SERVER_PORT}:9092"
expose:
- 2888
- 3888
command: ["sh","-c","/opt/apache/bootstrap.sh"]
networks:
- hadoop-network
healthcheck:
test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
interval: 10s
timeout: 10s
retries: 5
kafka-node2:
image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
user: "hadoop:hadoop"
container_name: kafka-node2
hostname: kafka-node2
restart: always
privileged: true
env_file:
- .env
volumes:
- ./config/kafka-node2/server.properties:${KAFKA_HOME}/config/server.properties
ports:
- "${KAFKA_NODE2_SERVER_PORT}:9092"
expose:
- 2888
- 3888
command: ["sh","-c","/opt/apache/bootstrap.sh"]
networks:
- hadoop-network
healthcheck:
test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
interval: 10s
timeout: 10s
retries: 5
kafka-node3:
image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
user: "hadoop:hadoop"
container_name: kafka-node3
hostname: kafka-node3
restart: always
privileged: true
env_file:
- .env
volumes:
- ./config/kafka-node3/server.properties:${KAFKA_HOME}/config/server.properties
ports:
- "${KAFKA_NODE3_SERVER_PORT}:9092"
expose:
- 2888
- 3888
command: ["sh","-c","/opt/apache/bootstrap.sh"]
networks:
- hadoop-network
healthcheck:
test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
interval: 10s
timeout: 10s
retries: 5
# 連接外部網(wǎng)絡(luò)
networks:
hadoop-network:
external: true
.env
環(huán)境變量文件內(nèi)容如下:
# 對(duì)外暴露的端口
cat << EOF > .env
KAFKA_HOME=/opt/apache/kafka
KAFKA_NODE1_SERVER_PORT=39092
KAFKA_NODE2_SERVER_PORT=39093
KAFKA_NODE3_SERVER_PORT=39094
EOF
6)開(kāi)始部署
docker-compose -f docker-compose.yaml up -d
# 查看
docker-compose -f docker-compose.yaml ps
六、簡(jiǎn)單測(cè)試驗(yàn)證
# 登錄zookeeper,在zookeeper查看brokers
${ZOOKEEPER_HOME}/bin/zkCli.sh ls /brokers/ids
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/1
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/2
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/3
七、常用的 Kafka 客戶(hù)端命令
1)添加topic
# 隨便登錄
docker exec -it kafka-node1 bash
# 創(chuàng)建topic,1副本,1分區(qū),設(shè)置數(shù)據(jù)過(guò)期時(shí)間72小時(shí)(-1表示不過(guò)期),單位ms,72*3600*1000=259200000
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
2)查看topic
# 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
# 查看topic列表詳情
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe
# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002
# 查看消費(fèi)者組
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002
3)修改topic
# 修改分區(qū),擴(kuò)分區(qū),不能減少分區(qū)
${KAFKA_HOME}/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partitions 2
# 修改過(guò)期時(shí)間,下面兩行都可以
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --topic test002 --add-config retention.ms=86400000
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000
# 修改副本數(shù),將副本數(shù)修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002
4)擴(kuò)容分區(qū)
#把test002 topic擴(kuò)容為6個(gè)分區(qū)。
#注意:目前不支持減少分區(qū),擴(kuò)容前必須存在這個(gè)主題。
${KAFKA_HOME}/bin/kafka-topics.sh -alter --partitions 6 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe
5)刪除topic
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
6)生成者和消費(fèi)者
生產(chǎn)者
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}
消費(fèi)者
# 從頭開(kāi)始消費(fèi)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --from-beginning
# 指定從分區(qū)的某個(gè)位置開(kāi)始消費(fèi),這里只指定了一個(gè)分區(qū),可以多寫(xiě)幾行或者遍歷對(duì)應(yīng)的所有分區(qū)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partition 0 --offset 100
7)消費(fèi)組
在 Kafka 中,消費(fèi)組(Consumer Group)是一組獨(dú)立消費(fèi)者的集合,它們共同消費(fèi)一個(gè)或多個(gè) Topic 中的數(shù)據(jù)。消費(fèi)組內(nèi)的消費(fèi)者協(xié)同工作,通過(guò)分?jǐn)傇?Topic 中的所有分區(qū),以實(shí)現(xiàn)消息的消費(fèi)和處理。
消費(fèi)組在 Kafka 消息隊(duì)列中起到了至關(guān)重要的作用。它可以提供如下功能:
-
并發(fā)消費(fèi):消費(fèi)組內(nèi)的每個(gè)消費(fèi)者都可以獨(dú)立地消費(fèi)消息,可以實(shí)現(xiàn)高并發(fā)處理。
-
自動(dòng)負(fù)載均衡:消費(fèi)組內(nèi)的消費(fèi)者會(huì)自動(dòng)協(xié)作,將消費(fèi)任務(wù)均分到所有消費(fèi)者上,使得每個(gè)消費(fèi)者都能處理相同數(shù)量的消息。
-
提高可用性:當(dāng)消費(fèi)組內(nèi)的一個(gè)或多個(gè)消費(fèi)者故障退出時(shí),消息會(huì)自動(dòng)分配到其他消費(fèi)者上,保證消費(fèi)任務(wù)的不間斷執(zhí)行。
-
支持多租戶(hù):可以通過(guò) Consumer Group 來(lái)對(duì)不同的租戶(hù)進(jìn)行消息隔離,不同的 Consumer Group 可以讀取同一個(gè) Topic 的不同副本,或者讀取不同 Topic 的不同分區(qū),實(shí)現(xiàn)多個(gè)實(shí)例共享同一 Topic 或分散處理不同 Topic。
示例如下:
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --group test002
通常情況下,消費(fèi)組中的消費(fèi)者都運(yùn)行在不同的機(jī)器上,這樣就可以實(shí)現(xiàn)分布式消費(fèi),以提高消息處理性能和可用性。Kafka 對(duì)消費(fèi)組的實(shí)現(xiàn)也非常簡(jiǎn)單,通過(guò)在消費(fèi)者在訂閱 Topic 時(shí),接受一個(gè) Group ID 參數(shù),就可以自動(dòng)加入到一個(gè)消費(fèi)組中。Kafka 會(huì)將Group ID 相同的消費(fèi)者映射到同一個(gè) Consumer Group 中,以實(shí)現(xiàn)協(xié)同消費(fèi)和分?jǐn)傁M(fèi)任務(wù)的目的。
8)查看數(shù)據(jù)積壓
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002
9)kafka 數(shù)據(jù)積壓處理方法
在 Kafka 中,由于消息的生產(chǎn)和消費(fèi)速度可能不一致,導(dǎo)致消息會(huì)積壓在 Kafka 的分區(qū)中,如果這些積壓的消息處理不及時(shí),會(huì)導(dǎo)致 Kafka 系統(tǒng)的性能下降和可用性降低等問(wèn)題。因此,需要采取一些處理方法來(lái)解決數(shù)據(jù)積壓?jiǎn)栴}:
-
增加消費(fèi)者:增加消費(fèi)者可以使消費(fèi)任務(wù)并行執(zhí)行,加快消息的處理速度。可以通過(guò)增加消費(fèi)者的方式將積壓的消息消費(fèi)掉,提高系統(tǒng)處理速度和效率。
-
調(diào)整消費(fèi)者組:當(dāng)一個(gè)消費(fèi)組中的消費(fèi)者無(wú)法處理所有的消息時(shí),可以考慮調(diào)整消費(fèi)者組??梢栽黾酉M(fèi)者的數(shù)量或者更換消費(fèi)者組,以適應(yīng)消息處理的速度和大小。
-
調(diào)整消息分區(qū):Kafka 中Topic 的分區(qū)數(shù)也會(huì)影響數(shù)據(jù)積壓的情況。可以調(diào)整分區(qū)數(shù)以改善數(shù)據(jù)讀取和分發(fā)的情況,或者對(duì)熱點(diǎn) Topic 進(jìn)行分區(qū)處理,以實(shí)現(xiàn)更好的性能和可用性。
-
調(diào)整消費(fèi) offset:若積壓的消息都已經(jīng)被處理過(guò)了,卻還在 Kafka 中存在,可能是消費(fèi)者消費(fèi) offset 設(shè)置錯(cuò)誤導(dǎo)致的??梢酝ㄟ^(guò) Kafka 的 offset 操作,重置消費(fèi) offset,跳過(guò)已經(jīng)處理過(guò)的消息,減少數(shù)據(jù)積壓的問(wèn)題。
-
執(zhí)行消息清洗:在消費(fèi) Kafka 消息時(shí),可以額外執(zhí)行一些消息清洗處理操作,將無(wú)用的數(shù)據(jù)過(guò)濾出去,或者將數(shù)據(jù)進(jìn)行清理和格式化處理,減少中間處理環(huán)節(jié),提高數(shù)據(jù)消費(fèi)的效率和可用性。
以上是一些解決 Kafka 數(shù)據(jù)積壓?jiǎn)栴}的常用方法,需要視具體情況而定,選擇合適的方法來(lái)解決。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-658424.html
通過(guò) docker-compose 快速部署 Kafka 教程就先到這里了,有任何疑問(wèn)歡迎給我留言或私信,可關(guān)注我公眾號(hào)【大數(shù)據(jù)與云原生技術(shù)分享】加群交流或私信溝通~文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-658424.html
到了這里,關(guān)于【中間件】通過(guò) docker-compose 快速部署 Kafka 保姆級(jí)教程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!