国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【中間件】通過(guò) docker-compose 快速部署 Kafka 保姆級(jí)教程

這篇具有很好參考價(jià)值的文章主要介紹了【中間件】通過(guò) docker-compose 快速部署 Kafka 保姆級(jí)教程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、概述

Kafka是由Apache基金會(huì)開(kāi)發(fā)的分布式流處理平臺(tái),采用發(fā)布-訂閱模式,支持高吞吐量、低延遲的數(shù)據(jù)傳輸。主要用于處理實(shí)時(shí)數(shù)據(jù)管道、數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)分析等大數(shù)據(jù)應(yīng)用場(chǎng)景。Kafka采用高效的數(shù)據(jù)壓縮算法,可以在集群中存儲(chǔ)大量的數(shù)據(jù),并通過(guò)分區(qū)機(jī)制來(lái)實(shí)現(xiàn)數(shù)據(jù)的高可靠性和可擴(kuò)展性。Kafka常用于以下場(chǎng)景:

  • 數(shù)據(jù)管道:在數(shù)據(jù)采集和分發(fā)過(guò)程中構(gòu)建可擴(kuò)展的流式數(shù)據(jù)管道,用于實(shí)時(shí)數(shù)據(jù)處理和分析。例如,數(shù)據(jù)收集、日志聚合、網(wǎng)絡(luò)追蹤、用戶(hù)活動(dòng)跟蹤等。

  • 數(shù)據(jù)存儲(chǔ):將Kafka作為持久化存儲(chǔ)來(lái)存儲(chǔ)大量的數(shù)據(jù),以便用于后續(xù)的批量處理和離線(xiàn)分析,例如數(shù)據(jù)挖掘、機(jī)器學(xué)習(xí)等應(yīng)用場(chǎng)景。

  • 實(shí)時(shí)流處理:通過(guò)將Kafka與追求低延遲的流式處理平臺(tái),例如Apache Storm、Apache Samza和Apache Flink等相結(jié)合,可以實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)處理和分析。這是許多實(shí)時(shí)數(shù)據(jù)分析和日志處理需求的主要場(chǎng)景。

  • 系統(tǒng)日志跟蹤:通過(guò)Kafka將來(lái)自不同系統(tǒng)的日志數(shù)據(jù)統(tǒng)一收集和存儲(chǔ),便于進(jìn)行統(tǒng)一的日志分析和事件跟蹤,在軟件開(kāi)發(fā)過(guò)程中可以快速定位和解決問(wèn)題。

總之,Kafka是高性能、可靠、可擴(kuò)展的分布式流處理平臺(tái),可用于實(shí)時(shí)數(shù)據(jù)管道、數(shù)據(jù)存儲(chǔ)、實(shí)時(shí)流處理和日志跟蹤等多個(gè)領(lǐng)域。它已被廣泛應(yīng)用于各種大數(shù)據(jù)場(chǎng)景,并成為了大數(shù)據(jù)架構(gòu)中的一個(gè)重要組成部分。

docker-compose 安裝kafka,中間件,kafka,docker,kafka,中間件,docker
這里只是講解kafka容器快速部署,用于測(cè)試和學(xué)習(xí)作用,生成不建議使用容器部署,想了解更多的kafka知識(shí)點(diǎn)可參考我這篇文章:Kafka原理介紹+安裝+基本操作

二、前期準(zhǔn)備

1)部署 docker

# 安裝yum-config-manager配置工具
yum -y install yum-utils

# 建議使用阿里云yum源:(推薦)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

# 安裝docker-ce版本
yum install -y docker-ce
# 啟動(dòng)并開(kāi)機(jī)啟動(dòng)
systemctl enable --now docker
docker --version

2)部署 docker-compose

curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose

chmod +x /usr/local/bin/docker-compose
docker-compose --version

三、創(chuàng)建網(wǎng)絡(luò)

# 創(chuàng)建,注意不能使用hadoop_network,要不然啟動(dòng)hs2服務(wù)的時(shí)候會(huì)有問(wèn)題?。?!
docker network create hadoop-network

# 查看
docker network ls

四、安裝 Zookeeper

Zookeeper在Kafka中扮演重要的角色,主要用于管理Kafka集群的元數(shù)據(jù)和實(shí)現(xiàn)Kafka集群的協(xié)調(diào)和管理。在Kafka集群中,Zookeeper主要有以下作用:

  • 配置管理:Kafka集群的配置信息存儲(chǔ)在ZK節(jié)點(diǎn)中,包括Kafka Broker的配置信息、Topic的分區(qū)信息、消費(fèi)者和生產(chǎn)者的相關(guān)配置等。Kafka可以通過(guò)ZK感知集群狀態(tài)的變化,并自動(dòng)重新分配Topic的分區(qū)和對(duì)應(yīng)的Broker。

  • Broker控制:Kafka集群中的所有Broker都連接到ZK中。ZK維護(hù)了所有活動(dòng)Broker的列表和狀態(tài)信息,包括Leader、Follower等信息。如果某個(gè)Broker出現(xiàn)故障,ZK可以自動(dòng)感知它的下線(xiàn),并通知集群中的其他Broker重新分配Leader。

  • 分布式鎖:Zookeeper提供群眾同步的機(jī)制,使得多個(gè)Kafka Broker的協(xié)調(diào)和管理變得可行。Kafka中的一些操作需要集群中的所有Broker都達(dá)成一致意見(jiàn),因此需要使用ZK協(xié)調(diào)器的分布式鎖機(jī)制來(lái)維護(hù)這些操作的一致性,并防止數(shù)據(jù)的意外損壞。

Zookeeper快速部署教程可參考我上一篇文章:【中間件】通過(guò) docker-compose 快速部署 Zookeeper 保姆級(jí)教程

總之,Zookeeper在Kafka集群中發(fā)揮了重要的角色,它管理著Kafka的發(fā)布/訂閱機(jī)制、Broker狀態(tài)信息、Topic的元數(shù)據(jù)信息等,使得Kafka集群的分布式協(xié)同和協(xié)調(diào)變得可能。在Kafka寫(xiě)操作(生產(chǎn)者或管理員在Kafka生產(chǎn)或維護(hù)上修改了配置)上,ZK用于協(xié)作鎖定。在Kafka讀操作(消費(fèi)者將訂閱的主題分區(qū)元數(shù)據(jù)讀取到kafka消費(fèi)者中)上,ZK用于協(xié)作。

五、Kafka 編排部署

1)下載 Kafka

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz --no-check-certificate

注意還需要java環(huán)境,可以去官網(wǎng)下載,也可以在我下面提供的地址下載:

鏈接: https://pan.baidu.com/s/1o_z3t16v0eASYWN4VcjYeg?pwd=kuac 提取碼: kuac

2)配置

  • config/kafka-node1/server.properties
# 常見(jiàn)配置掛載目錄
mkdir config/{kafka-node1,kafka-node2,kafka-node3} -p

# 配置
cat >config/kafka-node1/server.properties<<EOF
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=1

#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán)IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲(chǔ)位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分區(qū)數(shù)量,這里設(shè)置為3。 默認(rèn)只有一個(gè)分區(qū),設(shè)置多分區(qū)可以支持并發(fā)讀寫(xiě)和負(fù)載均衡
num.partitions=3
#副本,默認(rèn)只有一個(gè)副本,不會(huì)進(jìn)行數(shù)據(jù)備份和冗余
replication.factor=3
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=60000
EOF
  • config/kafka-node2/server.properties
cat >config/kafka-node2/server.properties<<EOF
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=2

#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán)IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲(chǔ)位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分區(qū)數(shù)量,這里設(shè)置為3。 默認(rèn)只有一個(gè)分區(qū),設(shè)置多分區(qū)可以支持并發(fā)讀寫(xiě)和負(fù)載均衡
num.partitions=3
#副本,默認(rèn)只有一個(gè)副本,不會(huì)進(jìn)行數(shù)據(jù)備份和冗余
replication.factor=3
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=60000
EOF
  • config/kafka-node3/server.properties
cat >config/kafka-node3/server.properties<<EOF
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=3

#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線(xiàn)程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤(pán)IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka數(shù)據(jù)的存儲(chǔ)位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分區(qū)數(shù)量,這里設(shè)置為3。 默認(rèn)只有一個(gè)分區(qū),設(shè)置多分區(qū)可以支持并發(fā)讀寫(xiě)和負(fù)載均衡
num.partitions=3
#副本,默認(rèn)只有一個(gè)副本,不會(huì)進(jìn)行數(shù)據(jù)備份和冗余
replication.factor=3
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線(xiàn)程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper連接超時(shí)時(shí)間
zookeeper.connection.timeout.ms=60000
EOF

3)啟動(dòng)腳本 bootstrap.sh

#!/usr/bin/env sh

${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

4)構(gòu)建鏡像 Dockerfile

FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos:7.7.1908

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN export LANG=zh_CN.UTF-8

# 創(chuàng)建用戶(hù)和用戶(hù)組,跟yaml編排里的user: 10000:10000
RUN groupadd --system --gid=10000 hadoop && useradd --system --home-dir /home/hadoop --uid=10000 --gid=hadoop hadoop -m

# 安裝sudo
RUN yum -y install sudo ; chmod 640 /etc/sudoers

# 給hadoop添加sudo權(quán)限
RUN echo "hadoop ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers

RUN yum -y install install net-tools telnet wget nc less

RUN mkdir /opt/apache/

# 添加配置 JDK
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH

# 添加配置 kafka server
ENV KAFKA_VERSION 2.12-3.4.0
ADD kafka_${KAFKA_VERSION}.tgz /opt/apache/
ENV KAFKA_HOME /opt/apache/kafka
RUN ln -s /opt/apache/kafka_${KAFKA_VERSION}-bin $KAFKA_HOME

# 創(chuàng)建數(shù)據(jù)存儲(chǔ)目錄
RUN mkdir -p ${KAFKA_HOME}/data/logs

# copy bootstrap.sh
COPY bootstrap.sh /opt/apache/
RUN chmod +x /opt/apache/bootstrap.sh

RUN chown -R hadoop:hadoop /opt/apache

WORKDIR $KAFKA_HOME

開(kāi)始構(gòu)建鏡像

# 需要查看構(gòu)建鏡像詳細(xì)過(guò)程則需要加上 --progress=plain 選項(xiàng)
docker build -t registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0 . --no-cache --progress=plain

# 為了方便小伙伴下載即可使用,我這里將鏡像文件推送到阿里云的鏡像倉(cāng)庫(kù)
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0

### 參數(shù)解釋
# -t:指定鏡像名稱(chēng)
# . :當(dāng)前目錄Dockerfile
# -f:指定Dockerfile路徑
#  --no-cache:不緩存

5)編排 docker-compose.yaml

version: '3'
services:
  kafka-node1:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
    user: "hadoop:hadoop"
    container_name: kafka-node1
    hostname: kafka-node1
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node1/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE1_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  kafka-node2:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
    user: "hadoop:hadoop"
    container_name: kafka-node2
    hostname: kafka-node2
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node2/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE2_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  kafka-node3:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
    user: "hadoop:hadoop"
    container_name: kafka-node3
    hostname: kafka-node3
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node3/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE3_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5

# 連接外部網(wǎng)絡(luò)
networks:
  hadoop-network:
    external: true

.env 環(huán)境變量文件內(nèi)容如下:

# 對(duì)外暴露的端口
cat << EOF > .env
KAFKA_HOME=/opt/apache/kafka
KAFKA_NODE1_SERVER_PORT=39092
KAFKA_NODE2_SERVER_PORT=39093
KAFKA_NODE3_SERVER_PORT=39094
EOF

6)開(kāi)始部署

docker-compose -f docker-compose.yaml up -d

# 查看
docker-compose -f docker-compose.yaml ps

六、簡(jiǎn)單測(cè)試驗(yàn)證

# 登錄zookeeper,在zookeeper查看brokers
${ZOOKEEPER_HOME}/bin/zkCli.sh ls /brokers/ids

${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/1
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/2
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/3

七、常用的 Kafka 客戶(hù)端命令

1)添加topic

# 隨便登錄
docker exec -it kafka-node1 bash

# 創(chuàng)建topic,1副本,1分區(qū),設(shè)置數(shù)據(jù)過(guò)期時(shí)間72小時(shí)(-1表示不過(guò)期),單位ms,72*3600*1000=259200000
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092  --partitions 1 --replication-factor 1 --config retention.ms=259200000

2)查看topic

# 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list

# 查看topic列表詳情
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe

# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002

# 查看消費(fèi)者組
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe  --group test002

3)修改topic

# 修改分區(qū),擴(kuò)分區(qū),不能減少分區(qū)
${KAFKA_HOME}/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partitions 2

# 修改過(guò)期時(shí)間,下面兩行都可以
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --topic test002 --add-config retention.ms=86400000

${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000

# 修改副本數(shù),將副本數(shù)修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002

4)擴(kuò)容分區(qū)

#把test002 topic擴(kuò)容為6個(gè)分區(qū)。
#注意:目前不支持減少分區(qū),擴(kuò)容前必須存在這個(gè)主題。
${KAFKA_HOME}/bin/kafka-topics.sh -alter --partitions 6 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002

${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe

docker-compose 安裝kafka,中間件,kafka,docker,kafka,中間件,docker

5)刪除topic

${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092

6)生成者和消費(fèi)者

生產(chǎn)者

${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

消費(fèi)者

# 從頭開(kāi)始消費(fèi)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --from-beginning

# 指定從分區(qū)的某個(gè)位置開(kāi)始消費(fèi),這里只指定了一個(gè)分區(qū),可以多寫(xiě)幾行或者遍歷對(duì)應(yīng)的所有分區(qū)
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partition 0 --offset 100

7)消費(fèi)組

在 Kafka 中,消費(fèi)組(Consumer Group)是一組獨(dú)立消費(fèi)者的集合,它們共同消費(fèi)一個(gè)或多個(gè) Topic 中的數(shù)據(jù)。消費(fèi)組內(nèi)的消費(fèi)者協(xié)同工作,通過(guò)分?jǐn)傇?Topic 中的所有分區(qū),以實(shí)現(xiàn)消息的消費(fèi)和處理。

消費(fèi)組在 Kafka 消息隊(duì)列中起到了至關(guān)重要的作用。它可以提供如下功能:

  • 并發(fā)消費(fèi):消費(fèi)組內(nèi)的每個(gè)消費(fèi)者都可以獨(dú)立地消費(fèi)消息,可以實(shí)現(xiàn)高并發(fā)處理。

  • 自動(dòng)負(fù)載均衡:消費(fèi)組內(nèi)的消費(fèi)者會(huì)自動(dòng)協(xié)作,將消費(fèi)任務(wù)均分到所有消費(fèi)者上,使得每個(gè)消費(fèi)者都能處理相同數(shù)量的消息。

  • 提高可用性:當(dāng)消費(fèi)組內(nèi)的一個(gè)或多個(gè)消費(fèi)者故障退出時(shí),消息會(huì)自動(dòng)分配到其他消費(fèi)者上,保證消費(fèi)任務(wù)的不間斷執(zhí)行。

  • 支持多租戶(hù):可以通過(guò) Consumer Group 來(lái)對(duì)不同的租戶(hù)進(jìn)行消息隔離,不同的 Consumer Group 可以讀取同一個(gè) Topic 的不同副本,或者讀取不同 Topic 的不同分區(qū),實(shí)現(xiàn)多個(gè)實(shí)例共享同一 Topic 或分散處理不同 Topic。

示例如下:

${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --group test002

通常情況下,消費(fèi)組中的消費(fèi)者都運(yùn)行在不同的機(jī)器上,這樣就可以實(shí)現(xiàn)分布式消費(fèi),以提高消息處理性能和可用性。Kafka 對(duì)消費(fèi)組的實(shí)現(xiàn)也非常簡(jiǎn)單,通過(guò)在消費(fèi)者在訂閱 Topic 時(shí),接受一個(gè) Group ID 參數(shù),就可以自動(dòng)加入到一個(gè)消費(fèi)組中。Kafka 會(huì)將Group ID 相同的消費(fèi)者映射到同一個(gè) Consumer Group 中,以實(shí)現(xiàn)協(xié)同消費(fèi)和分?jǐn)傁M(fèi)任務(wù)的目的。

8)查看數(shù)據(jù)積壓

${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002

9)kafka 數(shù)據(jù)積壓處理方法

在 Kafka 中,由于消息的生產(chǎn)和消費(fèi)速度可能不一致,導(dǎo)致消息會(huì)積壓在 Kafka 的分區(qū)中,如果這些積壓的消息處理不及時(shí),會(huì)導(dǎo)致 Kafka 系統(tǒng)的性能下降和可用性降低等問(wèn)題。因此,需要采取一些處理方法來(lái)解決數(shù)據(jù)積壓?jiǎn)栴}:

  • 增加消費(fèi)者:增加消費(fèi)者可以使消費(fèi)任務(wù)并行執(zhí)行,加快消息的處理速度。可以通過(guò)增加消費(fèi)者的方式將積壓的消息消費(fèi)掉,提高系統(tǒng)處理速度和效率。

  • 調(diào)整消費(fèi)者組:當(dāng)一個(gè)消費(fèi)組中的消費(fèi)者無(wú)法處理所有的消息時(shí),可以考慮調(diào)整消費(fèi)者組??梢栽黾酉M(fèi)者的數(shù)量或者更換消費(fèi)者組,以適應(yīng)消息處理的速度和大小。

  • 調(diào)整消息分區(qū):Kafka 中Topic 的分區(qū)數(shù)也會(huì)影響數(shù)據(jù)積壓的情況。可以調(diào)整分區(qū)數(shù)以改善數(shù)據(jù)讀取和分發(fā)的情況,或者對(duì)熱點(diǎn) Topic 進(jìn)行分區(qū)處理,以實(shí)現(xiàn)更好的性能和可用性。

  • 調(diào)整消費(fèi) offset:若積壓的消息都已經(jīng)被處理過(guò)了,卻還在 Kafka 中存在,可能是消費(fèi)者消費(fèi) offset 設(shè)置錯(cuò)誤導(dǎo)致的??梢酝ㄟ^(guò) Kafka 的 offset 操作,重置消費(fèi) offset,跳過(guò)已經(jīng)處理過(guò)的消息,減少數(shù)據(jù)積壓的問(wèn)題。

  • 執(zhí)行消息清洗:在消費(fèi) Kafka 消息時(shí),可以額外執(zhí)行一些消息清洗處理操作,將無(wú)用的數(shù)據(jù)過(guò)濾出去,或者將數(shù)據(jù)進(jìn)行清理和格式化處理,減少中間處理環(huán)節(jié),提高數(shù)據(jù)消費(fèi)的效率和可用性。

以上是一些解決 Kafka 數(shù)據(jù)積壓?jiǎn)栴}的常用方法,需要視具體情況而定,選擇合適的方法來(lái)解決。


通過(guò) docker-compose 快速部署 Kafka 教程就先到這里了,有任何疑問(wèn)歡迎給我留言或私信,可關(guān)注我公眾號(hào)【大數(shù)據(jù)與云原生技術(shù)分享】加群交流或私信溝通~文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-658424.html

到了這里,關(guān)于【中間件】通過(guò) docker-compose 快速部署 Kafka 保姆級(jí)教程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Solr】中間件-solr快速使用

    pom中添加依賴(lài): 提供一個(gè)demo:

    2024年02月11日
    瀏覽(18)
  • 快速掌握MQ消息中間件rabbitmq

    Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 需求: 1.video A https://www.bilibili.com/video/BV1cb4y1o7zz?p=12vd_source=533ee415c42b820b0f4105acb4932a02 參考資料 官方文檔 開(kāi)源社區(qū) 博客文

    2024年02月11日
    瀏覽(27)
  • 【消息中間件】1小時(shí)快速上手RabbitMQ

    【消息中間件】1小時(shí)快速上手RabbitMQ

    前 言 ?? 作者簡(jiǎn)介:半舊518,長(zhǎng)跑型選手,立志堅(jiān)持寫(xiě)10年博客,專(zhuān)注于java后端 ?專(zhuān)欄簡(jiǎn)介:深入、全面、系統(tǒng)的介紹消息中間件 ?? 文章簡(jiǎn)介:本文將介紹RabbitMQ,一小時(shí)快速上手RabbitMQ 下面是MQ的優(yōu)缺點(diǎn) 1.2.1 優(yōu)點(diǎn)1:應(yīng)用解耦 考慮由訂單系統(tǒng)與庫(kù)存系統(tǒng)、支付系統(tǒng)、物流

    2024年02月02日
    瀏覽(42)
  • 阿里云順利通過(guò)云原生中間件成熟度評(píng)估

    阿里云順利通過(guò)云原生中間件成熟度評(píng)估

    前言: 2023 年 6 月 6 日,由中國(guó)信息通信研究院(以下簡(jiǎn)稱(chēng)“中國(guó)信通院”)承辦的“ICT中國(guó)·2023 高層論壇-云原生產(chǎn)業(yè)發(fā)展論壇”在北京召開(kāi),會(huì)上正式發(fā)布了一系列云原生領(lǐng)域評(píng)估結(jié)果。 阿里云計(jì)算有限公司(以下簡(jiǎn)稱(chēng)“阿里云”)消息隊(duì)列 RocketMQ 通過(guò)了“云原生中間件

    2024年02月12日
    瀏覽(292)
  • Docker安裝消息中間件

    docker pull rocketmqinc/rocketmq mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876 -v /docker/rocketmq/data/namesrv/logs:/root/logs -v /docker/rocketmq/data/namesrv/store:/root/store -e “MAX_POSSIBLE_HEAP=100000000” rocketmqinc/rocketmq sh mqname

    2024年02月10日
    瀏覽(21)
  • 【中間件】docker數(shù)據(jù)卷

    【中間件】docker數(shù)據(jù)卷

    ?? ???????個(gè)人主頁(yè): 五敷有你 ? ? ?? ???系列專(zhuān)欄: 中間件 ??穩(wěn)中求進(jìn),曬太陽(yáng) 修改nginx的html頁(yè)面時(shí),需要進(jìn)入nginx內(nèi)部。并且因?yàn)閮?nèi)部沒(méi)有編輯器,修改文件也很麻煩。 這就是因?yàn)槿萜髋c數(shù)據(jù)(容器內(nèi)文件)耦合帶來(lái)的后果。要解決這個(gè)問(wèn)題,必須將數(shù)據(jù)與容

    2024年03月27日
    瀏覽(16)
  • Windows安裝Docker運(yùn)行中間件(詳細(xì))

    Windows安裝Docker運(yùn)行中間件(詳細(xì))

    Docker是一個(gè)開(kāi)源的應(yīng)用容器引擎,讓開(kāi)發(fā)者可以打包他們的應(yīng)用以及依賴(lài)包到一個(gè)可移植的鏡像中,然后發(fā)布到任何流行的 Linux或Windows操作系統(tǒng)的機(jī)器上,也可以實(shí)現(xiàn)虛擬化。容器是完全使用沙箱機(jī)制,相互之間不會(huì)有任何接口。 Docker容器是一個(gè)輕量級(jí)的沙箱環(huán)境,每個(gè)容

    2024年01月20日
    瀏覽(20)
  • Docker的安裝及其常見(jiàn)中間件的部署

    Docker的安裝及其常見(jiàn)中間件的部署

    基于centos7安裝docker(Docker要求CentOS系統(tǒng)的內(nèi)核版本高于3.10 uname -r 查看內(nèi)核版本) 最好安裝7.5以上版本支持k8s (1) 如果之前下載過(guò)需要運(yùn)行命令卸載 (2)安裝 Docker-CE 基本環(huán)境 (3)設(shè)置 docker repo 的 yum 位置 (4)安裝 docker,以及 docker-cli (5)啟動(dòng)docker (6)停止docker (7)重啟docker (8)查看

    2024年02月19日
    瀏覽(40)
  • docker 離線(xiàn)安裝中間件應(yīng)用--nacos

    docker 離線(xiàn)安裝中間件應(yīng)用--nacos

    由于很多項(xiàng)目的部署環(huán)境是內(nèi)網(wǎng)環(huán)境,中間件安裝部署起來(lái)比較麻煩,故采用docker 進(jìn)行離線(xiàn)部署。本文以docker離線(xiàn)安裝部署nacos 為例,其他的中間件也是相同的部署步驟。 1、離線(xiàn)安裝docker 和 docker-compose 具體請(qǐng)參考一下鏈接進(jìn)行安裝 docker docker-compose離線(xiàn)部署 2、下載nacos鏡像

    2024年02月11日
    瀏覽(23)
  • 【中間件-Openjob】高性能任務(wù)調(diào)度框架Openjob簡(jiǎn)介及快速搭建

    【中間件-Openjob】高性能任務(wù)調(diào)度框架Openjob簡(jiǎn)介及快速搭建

    一款分布式高性能任務(wù)調(diào)度框架,支持多種定時(shí)任務(wù)、延時(shí)任務(wù)、工作流設(shè)計(jì)、輕量級(jí)分布式計(jì)算、無(wú)限水平擴(kuò)容,并具有較高的可伸縮性和容錯(cuò)性,以及完善權(quán)限管理、強(qiáng)大的告警監(jiān)控、原生支持多語(yǔ)言。 基礎(chǔ)信息 中文官網(wǎng) :https://openjob.io/zh-Hans/ 開(kāi)源地址 :https://githu

    2024年02月12日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包