国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka(生產(chǎn)者)

這篇具有很好參考價(jià)值的文章主要介紹了Kafka(生產(chǎn)者)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1.概述

1.1 消息隊(duì)列

目 前 企 業(yè) 中 比 較 常 見(jiàn) 的 消 息 隊(duì) 列 產(chǎn) 品 主 要 有

  • Kafka(在大數(shù)據(jù)場(chǎng)景主要采用 Kafka 作為消息隊(duì)列。)
  • ActiveMQ
  • RabbitMQ
  • RocketMQ

1.1.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景

傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場(chǎng)景包括:緩存/消峰、解耦異步通信。

  • 緩沖/消峰:
    有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
    kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq
  • 解耦:
    允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束。
    kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq
  • 異步通信:
    允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它,然后在需要的時(shí)候再去處理它們。
    kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

1.1.2 消息隊(duì)列的兩種模式

  1. 點(diǎn)對(duì)點(diǎn)模式
  2. 發(fā)布/訂閱模式

一、點(diǎn)對(duì)點(diǎn)模式
消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除消息
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

二、發(fā)布/訂閱模式

  • 可以有多個(gè)topic主題(瀏覽、點(diǎn)贊、收藏、評(píng)論等)
  • 消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)
  • 每個(gè)消費(fèi)者相互獨(dú)立,都可以消費(fèi)到數(shù)據(jù)

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


1.2 kafka基礎(chǔ)結(jié)構(gòu)

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

  • Producer:消息生產(chǎn)者,就是向 Kafka broker 發(fā)消息的客戶端。
  • Consumer:消息消費(fèi)者,向 Kafka broker 取消息的客戶端。
  • Consumer Group(CG):消費(fèi)者組,由多個(gè) consumer 組成。
    消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
  • Broker:一臺(tái) Kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè)broker 可以容納多個(gè) topic。
  • Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic。
  • Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。
  • Replica:副本。一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè)Follower。
  • Leader:每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 Leader。
  • Follower:每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 會(huì)成為新的 Leader。

2.kafka的快速入門(mén)

2.1 集群部署

2.1.1 安裝java

一、上傳并解壓安裝包
1.將資料中的jdk-8u361-linux-x64.tar.gz上傳到/usr/local/java目錄下

mkdir /usr/local/java/
tar -zxvf jdk-8u361-linux-x64.tar.gz

二、配置環(huán)境變量

export JAVA_HOME=/usr/local/java/jdk1.8.0_361
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

三、使環(huán)境變量生效

source /etc/profile

四、檢查java版本

java -version

2.1.2 部署zookeeper集群

一、上傳并解壓安裝包
1.將資料中的apache-zookeeper-3.7.1-bin.tar.gz上傳到/usr/local目錄下

cd /usr/local

2.解壓安裝包到本地文件夾

tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz

3.修改文件名稱

mv apache-zookeeper-3.7.1-bin/ zookeeper

二、修改配置
1.進(jìn)入zookeeper目錄下的conf目錄,將目錄中的zoo_sample.cfg改成zoo.cfg

cd zookeeper/conf/
mv zoo_sample.cfg zoo.cfg

2.在zookeeper目錄下新建一個(gè)zkData文件夾, 作為數(shù)據(jù)文件存儲(chǔ)目錄

cd /usr/local/zookeeper/
mkdir zkData

3.回到zoo.cfg, 將dataDir的路徑換成我們剛剛新建的zkData的路徑

dataDir=/usr/local/zookeeper/zkData

三、啟動(dòng)和關(guān)閉
進(jìn)入bin目錄下,啟動(dòng)服務(wù)端

./zkServer.sh start

進(jìn)入bin目錄下,關(guān)閉服務(wù)端

./zkServer.sh stop

四、集群部署_生成myid
1.在/zkData下新建myid文件, 并寫(xiě)入對(duì)應(yīng)的server.num中的num數(shù)字

echo 1 > myid

2.依次給其他兩個(gè)服務(wù)器上的zookeeper設(shè)置num,分別是2和3

五、集群部署_配置集群中的各個(gè)節(jié)點(diǎn)
1.回到zoo.cfg, 將其他幾個(gè)節(jié)點(diǎn)配置進(jìn)來(lái)

server.1=192.168.202.128:2888:3888
server.2=192.168.202.130:2888:3888
server.3=192.168.202.131:2888:3888

六、分別啟動(dòng)三臺(tái)zookeeper服務(wù)
進(jìn)入 /bin 路徑下,執(zhí)行如下命令,啟動(dòng) zookeeper 服務(wù):

./zkServer.sh start

七、查看zk集群狀態(tài)
可以看到哪個(gè)zk是leader,哪些是follow

./bin/zkServer.sh status ./conf/zoo.cfg

八、驗(yàn)證是否搭建成功
在命令行中輸入:./zkCli.sh -server 127.0.0.1:2181,即可連接到本機(jī) ZooKeeper 服務(wù)器。其他自動(dòng)實(shí)現(xiàn)同步,客戶端只需要和一臺(tái)保持連接即可。出現(xiàn)如下語(yǔ)句表示鏈接成功:

注意:關(guān)閉其他linux的防火墻

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0] 

2.1.3 部署kafka集群

一、集群規(guī)劃
我們?cè)谌齻€(gè)服務(wù)器上分別部署zk和kafka

linux_1 linux_2 linux_3
zk zk zk
kafka kafka kafka

二、上傳并解壓安裝包
1.將資料中的kafka_2.12-3.0.0.tgz上傳到/usr/local目錄下

cd /usr/local

2.解壓安裝包到本地文件夾

tar -zxvf kafka_2.12-3.0.0.tgz

3.修改文件名稱

mv kafka_2.12-3.0.0/ kafka

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

三、修改kafka配置文件
1.進(jìn)入到kafka 目錄, 修改配置文件

cd config/
vim server.properties

2.修改以下內(nèi)容:

#broker 的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。
broker.id=0

#kafka 運(yùn)行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動(dòng)幫你創(chuàng)建,可以
配置多個(gè)磁盤(pán)路徑,路徑與路徑之間可以用","分隔
log.dirs=/usr/local/kafka/datas

# topic 在當(dāng)前broker上的分片個(gè)數(shù),與broker保持一致
num.partitions=3

#配置連接 Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=192.168.202.128:2181,192.168.202.130:2181,192.168.202.131:2181

3.依次修改其他節(jié)點(diǎn)的配置, 需要保證broker.id唯一
分別修改另兩個(gè)節(jié)點(diǎn)的broker.id=1、broker.id=2

四、啟動(dòng)集群
1.啟動(dòng)Zookeeper 集群,然后啟動(dòng) Kafka。
2.依次在這三個(gè)節(jié)點(diǎn)上啟動(dòng) Kafka。(在kafka目錄下執(zhí)行)

bin/kafka-server-start.sh -daemon config/server.properties

五、關(guān)閉集群

bin/kafka-server-stop.sh

注意:停止 Kafka 集群時(shí),一定要等 Kafka 所有節(jié)點(diǎn)進(jìn)程全部停止后再停止 Zookeeper集群。因?yàn)?Zookeeper 集群當(dāng)中記錄著 Kafka 集群相關(guān)信息,Zookeeper 集群一旦先停止,Kafka 集群就沒(méi)有辦法再獲取停止進(jìn)程的信息,只能手動(dòng)殺死 Kafka 進(jìn)程了。

Kafka啟動(dòng)報(bào)錯(cuò):The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
處理方式:

  1. log.dirs配置的路徑要正確
  2. 刪除這里面的meta.properties文件

參考文章:Linux 搭建Kafka集群,最新教程,細(xì)到極致


2.2 Kafka命令行操作

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

2.2.1 主題命令行操作

操作主題命令行參數(shù)

bin/kafka-topics.sh
參數(shù) 描述
--bootstrap-server <String: server toconnect to> 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)。
--topic <String: topic> 操作的 topic 名稱。
--create 創(chuàng)建主題。
--delete 刪除主題。
--alter 修改主題。
--list 查看所有主題。
--describe 查看主題詳細(xì)描述。
--partitions <Integer: # of partitions> 設(shè)置分區(qū)數(shù)。
--replication-factor<Integer: replication factor> 設(shè)置分區(qū)副本。
--config <String: name=value> 更新系統(tǒng)默認(rèn)的配置。

一、查看當(dāng)前服務(wù)器中所有topic

# 連接多個(gè)節(jié)點(diǎn)(集群)
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092,192.168.202.130:9092,192.168.202.131:9092 --list

二、創(chuàng)建first topic

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 1 --replication-factor 3 --topic first
  • --topic first:代表創(chuàng)建名稱為first的topic
  • --partitions 1:代表這個(gè)topic只有一個(gè)分區(qū)
  • --replication-factor 3:代表這個(gè)topic有三個(gè)副本

三、查看first主題的詳情

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --describe --topic first

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

  • Replicas: 2,0,1 代表副本存儲(chǔ)在哪些節(jié)點(diǎn)中, first這個(gè)topic我們?cè)O(shè)置了3個(gè)副本,此處的2,0,1對(duì)應(yīng)的就是這三個(gè)副本存儲(chǔ)的位置(broker.id)
  • Isr: 2,0,1 Isr代表同步副本, 此處2,0,1都屬于同步副本(主副本也屬于同步副本)
  • Leader: 2 代表主副本是borker.id=2的節(jié)點(diǎn)

四、修改分區(qū)數(shù)
注意:分區(qū)數(shù)只能增加,不能減少

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --alter --topic first --partitions 3

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq
無(wú)法通過(guò)命令行的方式修改副本

五、刪除topic

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --delete --topic first

2.2.2 生產(chǎn)者命令行操作

操作生產(chǎn)者命令參數(shù)

bin/kafka-console-producer.sh
參數(shù) 描述
-`-bootstrap-server <String: server toconnect to> 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)。
--topic <String: topic> 操作的 topic 名稱。

一、發(fā)送消息

bin/kafka-console-producer.sh --bootstrap-server 192.168.202.128:9092 --topic first
>hello world
>atguigu atguigu

2.2.3 消費(fèi)者命令行操作

操作消費(fèi)者命令參數(shù)

bin/kafka-console-consumer.sh
參數(shù) 描述
-`-bootstrap-server <String: server toconnect to> 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)。
--topic <String: topic> 操作的 topic 名稱。
--from-beginning 從頭開(kāi)始消費(fèi)。
--group <String: consumer group id> 指定消費(fèi)者組名稱。

一、消費(fèi)first主題的數(shù)據(jù)

bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --topic first

二、把主題中所有的數(shù)據(jù)都讀取出來(lái)(包括歷史數(shù)據(jù))。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --from-beginning --topic first

3 Kafka 生產(chǎn)者

3.1 生產(chǎn)者消息發(fā)送流程

3.1.1 發(fā)送原理

  1. KafkaProducer將消息封裝成ProducerRecord, 然后通過(guò)攔截鏈, 根據(jù)指定序列化方式進(jìn)行序列化, 其次在分區(qū)器中根據(jù)設(shè)置的分區(qū)策略進(jìn)行數(shù)據(jù)分區(qū),封裝成TopicPartition, TopicPartition中就包含了目標(biāo)partition的信息
  2. 其后分區(qū)器將消息寫(xiě)入RecordAccumulator進(jìn)行緩沖, RecordAccumulator是一個(gè)雙端隊(duì)列, RecordAccumulator中維護(hù)了一個(gè)ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 類型的集合, 其中的Key是TopicPartition,它用來(lái)標(biāo)識(shí)目標(biāo)partition(消息的最終存儲(chǔ)位置), Value是Deque<ProducerBatch> 隊(duì)列,用來(lái)緩沖發(fā)往目標(biāo) partition 的消息。
  3. 當(dāng)Deque達(dá)到一定閾值后,就會(huì)喚醒sender線程將消息發(fā)送到kafka集群

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.1.2 生產(chǎn)者重要參數(shù)列表

參數(shù)名稱 描述
bootstrap.servers 生產(chǎn)者連接集群所需的broker地址清單. 例如hadoop102:9092,hadoop103:9092, 可以設(shè)置1個(gè)或者多個(gè), 中間用逗號(hào)隔開(kāi)。注意這里并非需要所有的 broker 地址,因?yàn)樯a(chǎn)者從給定的 broker里查找到其他 broker 信息
key.serializer和value.serializer 指定發(fā)送消息的 key 和 value 的序列化類型。一定要寫(xiě)全類名。
buffer.memory RecordAccumulator 緩沖區(qū)總大小,默認(rèn) 32m。
batch.size 緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16k。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加。
linger.ms 如果數(shù)據(jù)遲遲未達(dá)到 batch.size,sender 等待 linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms,默認(rèn)值是 0ms,表示沒(méi)有延遲。生產(chǎn)環(huán)境建議該值大小為 5-100ms 之間。
acks 0:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)落盤(pán)應(yīng)答。1:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader 收到數(shù)據(jù)后應(yīng)答。-1(all):生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader+和 isr 隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1 和all 是等價(jià)的。
max.in.flight.requests.per.connection 允許最多沒(méi)有返回 ack 的次數(shù),默認(rèn)為 5,開(kāi)啟冪等性要保證該值是 1-5 的數(shù)字。
retries 當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試次數(shù)。默認(rèn)是 int 最大值,2147483647。如果設(shè)置了重試,還想保證消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否則在重試此失敗消息的時(shí)候,其他的消息可能發(fā)送成功了。
retry.backoff.ms 兩次重試之間的時(shí)間間隔,默認(rèn)是 100ms。
enable.idempotence 是否開(kāi)啟冪等性,默認(rèn) true,開(kāi)啟冪等性。
compression.type 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。

3.2 異步發(fā)送API

3.2.1 普通異步發(fā)送

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

一、在broker2中開(kāi)啟kafka消費(fèi)者

 bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.130:9092 --topic first

二、生產(chǎn)者生產(chǎn)消息

public class CustomProducer {

    public static void main(String[] args) {
        // 1.創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2.給kafka配置對(duì)象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");

        // 3.key,value 序列化(必須)
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 4.創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 5.調(diào)用send方法, 發(fā)送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
        }

        // 6.關(guān)閉資源
        kafkaProducer.close();
    }
}

觀察broker2中控制臺(tái)消息
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.2.2 帶回調(diào)函數(shù)的異步發(fā)送

回調(diào)函數(shù)會(huì)在 producer 收到 ack 時(shí)調(diào)用,為異步調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說(shuō)明消息發(fā)送成功,如果 Exception 不為 null,說(shuō)明消息發(fā)送失敗。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

callback只有在收到ack時(shí)調(diào)用, 它可以幫我們返回:主題、分區(qū)等消息, 例如我們想看看消息最終存儲(chǔ)到哪個(gè)topic的哪個(gè)分區(qū)中

public class CustomProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        // 1.創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2.給kafka配置對(duì)象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");

        // 3.key,value序列化(必須):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 4.創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 5.調(diào)用send方法,發(fā)送消息
        for (int i = 0; i < 5; i++) {
            // 添加回調(diào)
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {

                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception == null) {
                        // 沒(méi)有異常, 輸出消息到控制臺(tái)
                        System.out.println("主題:" + recordMetadata.topic() + "->" + "分區(qū):" + recordMetadata.partition());
                    } else {
                        // 出現(xiàn)異常打印
                        exception.printStackTrace();
                    }
                }
            });

            // 延遲一會(huì)會(huì)看到數(shù)據(jù)發(fā)往不同分區(qū)
            Thread.sleep(2);
        }

        // 6.關(guān)閉資源
        kafkaProducer.close();
    }
}

觀察控制臺(tái)
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.3 同步發(fā)送API

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get()方法即可。

public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2.給kafka配置對(duì)象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");

        // 3.key,value 序列化(必須)
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 4.創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 5.調(diào)用send方法, 發(fā)送消息
        for (int i = 0; i < 10; i++) {
            // 異步發(fā)送
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
            // 同步發(fā)送
            kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();
        }

        // 6.關(guān)閉資源
        kafkaProducer.close();
    }
}

3.4 生產(chǎn)者分區(qū)

3.4.1 分區(qū)好處

  1. 便于合理使用存儲(chǔ)資源
  2. 提供并行速度

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.4.2 生產(chǎn)者發(fā)送消息的分區(qū)策略

默認(rèn)的分區(qū)器DefaultPartitioner
在在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

案例一:將數(shù)據(jù)發(fā)往指定 partition

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq
觀察kafka控制臺(tái)中是否接收到消息。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

在 IDEA 控制臺(tái)觀察回調(diào)信息。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

案例二:沒(méi)有指明 partition 值但有 key 的情況下
將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

1.key="a"時(shí),在控制臺(tái)查看結(jié)果。

主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1

2.key="b"時(shí),在控制臺(tái)查看結(jié)果。

主題:first->分區(qū):2
主題:first->分區(qū):2
主題:first->分區(qū):2
主題:first->分區(qū):2
主題:first->分區(qū):2

3.key="f"時(shí),在控制臺(tái)查看結(jié)果。

主題:first->分區(qū):0
主題:first->分區(qū):0
主題:first->分區(qū):0
主題:first->分區(qū):0
主題:first->分區(qū):0

3.4.3 自定義分區(qū)器

如果研發(fā)人員可以根據(jù)企業(yè)需求,自己重新實(shí)現(xiàn)分區(qū)器。

例如我們實(shí)現(xiàn)一個(gè)分區(qū)器實(shí)現(xiàn),發(fā)送過(guò)來(lái)的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號(hào)分區(qū),
不包含 atguigu,就發(fā)往 1 號(hào)分區(qū)。

一、定義自定義分區(qū)器

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 獲取消息
        String msgValue = value.toString();

        // 創(chuàng)建 partition
        int partition;

        // 判斷消息是否包含atguigu
        if (msgValue.contains("atguigu")) {
            partition = 0;
        } else {
            partition = 1;
        }

        // 返回分區(qū)號(hào)
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

二、使用自定義分區(qū)器

// 添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");

3.5 生產(chǎn)經(jīng)驗(yàn)—生產(chǎn)者如何提高吞吐量

Deque達(dá)到一定閾值后,就會(huì)喚醒sender線程將消息發(fā)送到kafka集群, 這個(gè)閾值受兩個(gè)參數(shù)影響

  1. batch.size:批次大小, 默認(rèn)16K
    當(dāng)Deque中的積壓的消息達(dá)到16K后, 就會(huì)喚醒sender線程將消息打包發(fā)送到同一個(gè)分區(qū)
  2. linger.ms:等待時(shí)間,默認(rèn)為0
    如果Deque中的消息一直沒(méi)有達(dá)到16K, 此時(shí)會(huì)根據(jù)linger.ms設(shè)置的時(shí)間,比如設(shè)置了1秒, 那么到了這個(gè)時(shí)間(上一個(gè)批次的消息發(fā)送完成后開(kāi)始計(jì)時(shí)),即使數(shù)據(jù)沒(méi)有達(dá)到16K, 也會(huì)喚醒sender線程發(fā)送消息

一、linger.ms=0產(chǎn)生的問(wèn)題
因?yàn)閘inger.ms默認(rèn)為0, 所以來(lái)一個(gè)消息就會(huì)喚醒sender來(lái)發(fā)送消息, 這樣的效率并不高(會(huì)頻繁開(kāi)啟線程發(fā)送消息), 為了提高拉取速度的能力, 我們希望一次能發(fā)送很多消息

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq
所以在生產(chǎn)環(huán)境中, 我們一般會(huì)修改linger.ms的值, 改為5~100ms, 而batch.size使用默認(rèn)值即可
注意點(diǎn):不能將batch.size和linger.ms設(shè)置的很大, 這樣每批次消息的發(fā)送時(shí)間間隔就會(huì)很大(延遲過(guò)大)

// batch.size:批次大小,默認(rèn) 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待時(shí)間,默認(rèn) 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

// RecordAccumulator:緩沖區(qū)大小,默認(rèn) 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

// compression.type:壓縮,默認(rèn) none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

3.6 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)可靠性

為了保證producer發(fā)送的數(shù)據(jù), 能可靠的發(fā)送到指定的partition, kafka為producer提供了消息確認(rèn)機(jī)制(ack)

3.6.1 消息確認(rèn)機(jī)制(ack)

  • ack = 0
  • ack = 1
  • ack = -1或者all

一、ack=0
生產(chǎn)者發(fā)送消息后, 不需要等待任何來(lái)自服務(wù)器的響應(yīng)

  • 優(yōu)點(diǎn):
    • 1.不需要等待服務(wù)器的響應(yīng), 所以可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息, 從而達(dá)到很高的吞吐量
    • 2.producer不管發(fā)送成不成功,只發(fā)送一次就不再發(fā)送了, 至少保證消息不會(huì)被重復(fù)消費(fèi)
  • 缺點(diǎn):
    • 如果當(dāng)中出現(xiàn)問(wèn)題,導(dǎo)致服務(wù)器沒(méi)有收到消息, 沒(méi)有落盤(pán)到partition,生產(chǎn)者無(wú)從得知,會(huì)造成消息丟失

二、ack=1
生產(chǎn)者發(fā)送消息后, 等待分區(qū)的leader收到數(shù)據(jù)后應(yīng)答

  • leader節(jié)點(diǎn)收到了消息, 生產(chǎn)者就會(huì)收到服務(wù)器的成功響應(yīng).(代表消息發(fā)送成功)
  • leader節(jié)點(diǎn)沒(méi)有收到消息, 生產(chǎn)者就會(huì)收到服務(wù)器的錯(cuò)誤響應(yīng),為了避免數(shù)據(jù)丟失, 生產(chǎn)者會(huì)重發(fā)消息

存在的問(wèn)題:如果leader落盤(pán)成功了, 向producer也收到了成功響應(yīng), 但是還沒(méi)來(lái)得及將消息同步副本(follower), 此時(shí)leader掛了, 此時(shí)服務(wù)器會(huì)從follower中推選新的leader, 新的leader并沒(méi)有同步消息, 而producer也不會(huì)再發(fā)了, 此時(shí)消息就丟失了

三、ack=-1或者all
生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù), leader和ISR隊(duì)列里面的所有節(jié)點(diǎn)都落盤(pán)成功后, 進(jìn)行應(yīng)答
producer只有收到分區(qū)中所有副本的成功寫(xiě)入通知來(lái)認(rèn)為推送消息成功了

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.6.2 ISR機(jī)制

思考:如果leader在同步數(shù)據(jù)時(shí), 有一個(gè)follow掛了, 遲遲不能與leader進(jìn)行同步, 這個(gè)問(wèn)題怎么解決?

一、ISR的概念
leader維護(hù)了一個(gè)動(dòng)態(tài)的in-sync replica set(ISR), 意指跟leader保持同步的follower + leader集合, 例如我們之前看到的(leader:0, isr:0,1,2)

二、ISR的剔除
如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時(shí)間閾值由replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn)30s。例如2超時(shí)后,會(huì)顯示(leader:0, isr:0,1)。

這樣就不用等長(zhǎng)期聯(lián)系不上或者已經(jīng)故障的節(jié)點(diǎn)


3.6.3 數(shù)據(jù)完全可靠條件

如果分區(qū)的副本設(shè)置為1(即只有l(wèi)eader沒(méi)有follower), 或者ISR中應(yīng)答的最小副本數(shù)量(min.insync.replicas 默認(rèn)為1)設(shè)置為1, 這種情況下就和ack=1時(shí)效果是一樣的, 存在數(shù)據(jù)丟失問(wèn)題(leader:0, isr:0)

數(shù)據(jù)完全可靠條件

  • 分區(qū)副本大于等于2(除了leader以外, 存在至少一個(gè)follow副本)
  • ACK級(jí)別設(shè)置為-1(保證ISR中所有節(jié)點(diǎn)都存入消息)
  • min.insync.replicas >=2
    ISR里應(yīng)答的最小副本數(shù)量大于等于2(ISR中的數(shù)量至少有兩個(gè), 否則broker不處理這條消息, 并直接給生產(chǎn)者報(bào)錯(cuò))

min.insync.replicas = n,代表的語(yǔ)義是,如果生產(chǎn)者acks=all,而在發(fā)送消息時(shí),Broker的ISR數(shù)量沒(méi)有達(dá)到n,Broker不能處理這條消息,需要直接給生產(chǎn)者報(bào)錯(cuò)。


3.6.4 可靠性總結(jié)

  • acks=0,生產(chǎn)者發(fā)送過(guò)來(lái)數(shù)據(jù)就不管了,可靠性差,效率高;
  • acks=1,生產(chǎn)者發(fā)送過(guò)來(lái)數(shù)據(jù)Leader應(yīng)答,可靠性中等,效率中等;
  • acks=-1,生產(chǎn)者發(fā)送過(guò)來(lái)數(shù)據(jù)Leader和ISR隊(duì)列里面所有Follwer應(yīng)答,可靠性高,效率低;

生產(chǎn)環(huán)境的使用場(chǎng)景:

  • acks=0很少使用;
  • acks=1,一般用于傳輸普通日志,允許丟個(gè)別數(shù)據(jù)
  • acks=-1或者all,一般用于傳輸和錢(qián)相關(guān)的數(shù)據(jù),對(duì)可靠性要求比較高的場(chǎng)景。

3.6.5 生產(chǎn)者重復(fù)發(fā)送消息

消息重復(fù)存在幾個(gè)場(chǎng)景

  • 生產(chǎn)端:服務(wù)器響應(yīng)失敗后, 基本的解決措施就是重發(fā)消息
  • 消費(fèi)端: poll 一批數(shù)據(jù),處理完畢還沒(méi)提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì) poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

一、生產(chǎn)端重復(fù)發(fā)送消息的一種情況
Leader收到數(shù)據(jù)后, 將數(shù)據(jù)落盤(pán), 并將數(shù)據(jù)同步到follower, 此時(shí)在給Producer應(yīng)答時(shí)Leader宕機(jī)了, 此時(shí)Producer就會(huì)收到服務(wù)器傳來(lái)的響應(yīng)失敗, 重新發(fā)送消息, 服務(wù)器會(huì)重新挑選一個(gè)follower成為leader, 而這個(gè)新的leader其實(shí)已經(jīng)落盤(pán)了消息

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.7 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)去重

3.7.1 數(shù)據(jù)傳遞語(yǔ)義

  • 至少一次(At Least Once)
    • 什么是至少一次
      : 生產(chǎn)者發(fā)送到kafka集群, 至少kafka集群能收到一次數(shù)據(jù)
    • 如何保證至少一次
      : ACK級(jí)別設(shè)置為-1或者all + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2
    • 至少一次會(huì)產(chǎn)生的問(wèn)題
      : kafka集群重復(fù)收到數(shù)據(jù)的問(wèn)題, 即可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)
  • 最多一次(At Most Once)
    • 什么是最多一次
      : 生產(chǎn)者發(fā)送到kafka集群, 不論成功與否, 只會(huì)發(fā)送一次
    • 如何保證最多一次
      : ACK級(jí)別設(shè)置為0
    • 最多一次會(huì)產(chǎn)生的問(wèn)題
      : 無(wú)法保證數(shù)據(jù)是否落盤(pán), 即可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失
  • 精確一次(Exactly Once)
    • 什么是精確一次
      : 數(shù)據(jù)既不會(huì)丟失, 也不會(huì)重復(fù)發(fā)送
    • 如何保證精確一次
      : Kafka 0.11版本以后,引入了一項(xiàng)重大特性:冪等性事務(wù), 通過(guò)這兩點(diǎn)來(lái)保證嚴(yán)格一次

3.7.2 冪等性

冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會(huì)持久化一條,保證了不重復(fù)。

一、冪等性原理
精確一次(Exactly Once) = 冪等性 + 至少一次(ack=-1或者all + 分區(qū)副本數(shù)>=2 + ISR最小副本數(shù)量>=2)

二、重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn)
一個(gè)消息會(huì)被封裝成TopicPartition, TopicPartition中記錄了以下幾個(gè)信息:PID、Partition、SeqNumber; 重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有PID, Partition, SeqNumber相同主鍵的消息提交時(shí),Broker只會(huì)持久化一條。

  • PID是每個(gè)Producer在初始化時(shí)分配的一個(gè)唯一ID, 對(duì)于一個(gè)PID來(lái)說(shuō), Sequence Number是從0開(kāi)始自增
  • Partition 表示分區(qū)的標(biāo)識(shí)
  • Sequence Number是Producer在發(fā)送消息時(shí), 會(huì)給每一條消息標(biāo)識(shí)Sequence Number, 同一條消息被重復(fù)發(fā)送時(shí), Sequence Number是不會(huì)遞增

三、冪等性的條件

  • 只能保證Producer在單個(gè)會(huì)話內(nèi)不丟不重, 如果producer出現(xiàn)意外掛掉了再重啟是無(wú)法保證冪等性, 因?yàn)镻ID已經(jīng)改變了(單會(huì)話)
  • 冪等性無(wú)法跨域多個(gè)topic-partition, 只能保證單個(gè)partition內(nèi)的冪等性(單分區(qū))

所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

四、如何使用冪等性
enable.idempotence被設(shè)置成true后, Producer自動(dòng)升級(jí)成冪等性Producer,其他所有的代碼邏輯都不需要改變。(enable.idempotence默認(rèn)為true, 不需要手動(dòng)開(kāi)啟)

properties.put(“enable.idempotence”, ture)
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

3.7.3 生產(chǎn)者事務(wù)

開(kāi)啟事務(wù), 必須開(kāi)啟冪等性(即enable.idempotence設(shè)置為true), 不需要保證精確一次(冪等性 + 至少一次)

一、生產(chǎn)者事務(wù)的概念
Kafka事務(wù)是2017年Kafka 0.11.0.0引入的新特性。類似于數(shù)據(jù)庫(kù)的事務(wù)。Kafka事務(wù)指的是消費(fèi)者提交以及生產(chǎn)者生產(chǎn)消息offset的操作可以在一個(gè)原子操作中,要么都成功,要么都失敗。

二、事務(wù)操作的API

// 1 初始化事務(wù)
void initTransactions();

// 2 開(kāi)啟事務(wù)
void beginTransaction() throws ProducerFencedException;

// 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;

// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;

三、事務(wù)編程流程

  • 1.設(shè)置事務(wù)id
  • 2.初始化事務(wù)
  • 3.開(kāi)啟事務(wù)
  • 4.運(yùn)行結(jié)束 > 提交事務(wù)
  • 5.運(yùn)行失敗 > 回滾事務(wù)

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


3.8 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)有序

kafka只能保證單分區(qū)數(shù)據(jù)有序, 多分區(qū)時(shí), 分區(qū)與分區(qū)間無(wú)序


3.9 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)亂序

  • kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下:
    max.in.flight.requests.per.connection=1(不需要考慮是否開(kāi)啟冪等性)。
  • kafka在1.x及以后版本保證數(shù)據(jù)單分區(qū)有序,條件如下:
    • 未開(kāi)啟冪等性
      max.in.flight.requests.per.connection需要設(shè)置為1。
    • 開(kāi)啟冪等性
      max.in.flight.requests.per.connection需要設(shè)置小于等于5。
      原因說(shuō)明:因?yàn)樵趉afka1.x以后,啟用冪等后,kafka服務(wù)端會(huì)緩存producer發(fā)來(lái)的最近5個(gè)request的元數(shù)據(jù),故無(wú)論如何,都可以保證最近5個(gè)request的數(shù)據(jù)都是有序的。(原理是使用了冪等性的Sequence Number, 連續(xù)5個(gè)消息會(huì)自動(dòng)根據(jù)Sequence Number排序)

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


4.Kafka Broker

4.1 Kafka Broker工作流程

4.1.1 Zookeeper存儲(chǔ)的 Kafka 信息

一、啟動(dòng)zookeeper客戶端

bin/zkCli.sh

二、通過(guò) ls 命令可以查看 kafka 相關(guān)信息。

ls /kafka

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


4.1.2 Kafka Broker總體工作流程

一、controller的概念
在Kafka集群中,某個(gè)Broker將被選舉出來(lái)?yè)?dān)任一種特殊的角色,其用于管理和協(xié)調(diào)Kafka集群,即管理集群中的所有分區(qū)的狀態(tài)并執(zhí)行相應(yīng)的管理操作。每個(gè)Kafka集群任意時(shí)刻都只能有一個(gè)Controller。當(dāng)集群?jiǎn)?dòng)時(shí),所有Broker都參與Controller的競(jìng)選,最終有一個(gè)勝出,一旦Controller在某個(gè)時(shí)刻崩潰,集群中的其他的Broker會(huì)收到通知,然后開(kāi)啟新一輪的Controller選舉,新選舉出來(lái)的Controller將承擔(dān)起之前Controller的所有工作。

二、controller的作用

  • 維護(hù)每臺(tái)Broker上的分區(qū)副本信息
  • 維護(hù)每個(gè)分區(qū)的Leader副本信息

三、controller為每個(gè)分區(qū)選舉leader
選舉規(guī)則:在isr中存活為前提, 按照AR中排在前面的優(yōu)先, 例如AR[1, 0, 2], ISR[1, 2], 那么leader會(huì)按照1,2的順序輪巡

對(duì)于topicA的partition0這個(gè)分區(qū),它選舉出broker1作為leader, 而broker0、broker2作為follower, controller會(huì)把這個(gè)信息告訴zookeeper(將節(jié)點(diǎn)信息上傳到zookeeper),這是為了防止controller掛了后, 新的controller不知道主副本信息
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


4.1.2 Kafka Broker上線與下線

模擬 Kafka 上下線,Zookeeper 中數(shù)據(jù)變化

一、查看/kafka/brokers/ids 路徑上的節(jié)點(diǎn)。

[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[0, 1, 2]

二、查看/kafka/controller 路徑上的數(shù)據(jù)。

[zk: localhost:2181(CONNECTED) 15] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

三、查看/kafka/brokers/topics/first/partitions/0/state 路徑上的數(shù)據(jù)。
顯示first這個(gè)topic中id為0的partitions的情況

[zk:localhost:2181(CONNECTED)16] get/kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}

4.1.3 Broker 重要參數(shù)

參數(shù)名稱 描述
replica.lag.time.max.ms ISR中,如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時(shí)間閾值,默認(rèn)30s。
auto.leader.rebalance.enable 默認(rèn)是 true。 自動(dòng)Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默認(rèn)是10%。每個(gè)broker允許的不平衡的leader的比率。如果每個(gè)broker超過(guò)了這個(gè)值,控制器會(huì)觸發(fā)leader的平衡。
leader.imbalance.check.interval.seconds 默認(rèn)值 300 秒。檢查 leader 負(fù)載是否平衡的間隔時(shí)間。
log.segment.bytes Kafka 中 log 日志是分成一塊塊存儲(chǔ)的,此配置是指 log 日志劃分 成塊的大小,默認(rèn)值 1G
log.index.interval.bytes 默認(rèn) 4kb,kafka 里面每當(dāng)寫(xiě)入了 4kb 大小的日志(.log),然后就往 index 文件里面記錄一個(gè)索引。
log.retention.hours Kafka 中數(shù)據(jù)保存的時(shí)間,默認(rèn) 7 天。
log.retention.minutes Kafka 中數(shù)據(jù)保存的時(shí)間,分鐘級(jí)別,默認(rèn)關(guān)閉。
log.retention.ms Kafka 中數(shù)據(jù)保存的時(shí)間,毫秒級(jí)別,默認(rèn)關(guān)閉。
log.retention.check.interval.ms 檢查數(shù)據(jù)是否保存超時(shí)的間隔,默認(rèn)是 5 分鐘。
log.retention.bytes 默認(rèn)等于-1,表示無(wú)窮大。超過(guò)設(shè)置的所有日志總大小,刪除最早的 segment。
log.cleanup.policy 默認(rèn)是 delete,表示所有數(shù)據(jù)啟用刪除策略;如果設(shè)置值為 compact,表示所有數(shù)據(jù)啟用壓縮策略。
num.io.threads 默認(rèn)是 8。負(fù)責(zé)寫(xiě)磁盤(pán)的線程數(shù)。整個(gè)參數(shù)值要占總核數(shù)的 50%。
num.replica.fetchers 副本拉取線程數(shù),這個(gè)參數(shù)占總核數(shù)的 50%的 1/3
num.network.threads 默認(rèn)是 3。數(shù)據(jù)傳輸線程數(shù),這個(gè)參數(shù)占總核數(shù)的50%的 2/3
log.flush.interval.messages 強(qiáng)制頁(yè)緩存刷寫(xiě)到磁盤(pán)的條數(shù),默認(rèn)是 long 的最大值,9223372036854775807。一般不建議修改,交給系統(tǒng)自己管理。
log.flush.interval.ms 每隔多久,刷數(shù)據(jù)到磁盤(pán),默認(rèn)是 null。一般不建議修改,交給系統(tǒng)自己管理。

4.2 生產(chǎn)經(jīng)驗(yàn)—節(jié)點(diǎn)服役和退役

4.2.1 服役新節(jié)點(diǎn)(todo)

4.2.2 退役舊節(jié)點(diǎn)(todo)


4.3 Kafka副本

4.3.1 副本基本信息

一、kafka副本的作用

  • kafka副本的作用:提高數(shù)據(jù)的可靠性
  • kafka默認(rèn)副本1個(gè), 生產(chǎn)環(huán)境一般配置2個(gè), 保證數(shù)據(jù)可靠性; 太多副本會(huì)增加磁盤(pán)存儲(chǔ)空間, 增加網(wǎng)絡(luò)上傳數(shù)據(jù)傳輸, 降低效率
  • kafka中副本分為:Leader和Follower, kafka生產(chǎn)者只會(huì)把數(shù)據(jù)發(fā)往Leader, 然后follower自己找leader進(jìn)行數(shù)據(jù)同步
  • kafka分區(qū)中所有的副本統(tǒng)稱為AR(Assigned Repllicas), AR = ISR + OSR
    • ISR: 表示和 Leader 保持同步的 Follower 集合。
      如果 Follower 長(zhǎng)時(shí)間未向 Leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR。該時(shí)間閾值由 replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn) 30s。Leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的 Leader。
    • OSR: 表示 Follower 與 Leader 副本同步時(shí),延遲過(guò)多的副本。

4.3.2 Leader選舉流程

一、Controller Leader
Kafka 集群中有一個(gè)broker的Controller會(huì)被選舉為Controller Leader,負(fù)責(zé)管理集群broker 的上下線,所有topic的分區(qū)副本分配Leader選舉等工作。(Controller的信息同步工作是依賴于Zookeeper的)
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

1.創(chuàng)建一個(gè)新的topic, 設(shè)置4個(gè)分區(qū), 4個(gè)副本

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4

Created topic atguigu1.

2.查看這4個(gè)分區(qū)的leander分布情況

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --describe --topic atguigu1

Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
	Configs: segment.bytes=1073741824
	Topic: atguigu1Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
	Topic: atguigu1Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
	Topic: atguigu1Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
	Topic: atguigu1Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3

3.停止其中一個(gè)kafka的進(jìn)程, 查看Leader分區(qū)的狀況
其中我們停到了Broker.id=3的節(jié)點(diǎn), 我們發(fā)現(xiàn)分區(qū)0的leader原本是節(jié)點(diǎn)3. 但是因?yàn)楣?jié)點(diǎn)3掛了, 所以ISR中重新找到節(jié)點(diǎn)0作為leader

Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0

4.停止Broker.id=2的節(jié)點(diǎn), 并查看 Leader 分區(qū)情況

Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0

5.重新啟動(dòng)關(guān)停的kafka節(jié)點(diǎn), 查看leader分區(qū)的狀況
我們發(fā)現(xiàn)leader已經(jīng)選舉成功后, 并不會(huì)因?yàn)楣?jié)點(diǎn)的重啟再去選舉一次

Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2

4.3.3 Leader和Follower故障處理細(xì)節(jié)(todo)

4.3.4 分區(qū)副本分配

如果kafka服務(wù)器只有4個(gè)節(jié)點(diǎn), 那么設(shè)置kafka的分區(qū)數(shù) > 服務(wù)器臺(tái)數(shù), 在kafka底層是如何分配存儲(chǔ)副本的呢?

1.創(chuàng)建16個(gè)分區(qū), 3個(gè)副本
創(chuàng)建一個(gè)新的 topic,名稱為 second。

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 16 --replication-factor 3 --topic second

2.查看分區(qū)和副本情況

Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq


4.3.5 手動(dòng)調(diào)整分區(qū)副本存儲(chǔ)

在生產(chǎn)環(huán)境中,每臺(tái)服務(wù)器的配置和性能不一致,但是Kafka只會(huì)根據(jù)自己的代碼規(guī)則創(chuàng)建對(duì)應(yīng)的分區(qū)副本,就會(huì)導(dǎo)致個(gè)別服務(wù)器存儲(chǔ)壓力較大。所有需要手動(dòng)調(diào)整分區(qū)副本的存儲(chǔ)。

需求:創(chuàng)建一個(gè)新的topic,4個(gè)分區(qū),兩個(gè)副本,名稱為three。將該topic的所有副本都存儲(chǔ)到broker0和broker1兩臺(tái)服務(wù)器上。
kafka生產(chǎn)數(shù)據(jù)命令,kafka,java,java-rabbitmq

1.創(chuàng)建一個(gè)新的 topic,名稱為 three。

bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 4 --replication-factor 2 --topic three

2.創(chuàng)建副本存儲(chǔ)計(jì)劃
所有副本都指定存儲(chǔ)在 broker0、broker1 中

vim increase-replication-factor.json

輸入如下內(nèi)容:

{
	"version":1,
	"partitions":[
		{"topic":"three","partition":0,"replicas":[0,1]},
		{"topic":"three","partition":1,"replicas":[0,1]},
		{"topic":"three","partition":2,"replicas":[1,0]},
		{"topic":"three","partition":3,"replicas":[1,0]}
	]
}

3.執(zhí)行副本存儲(chǔ)計(jì)劃

bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.202.128:9092 --reassignment-json-file increase-replication-factor.json --execute

4.3.6 生產(chǎn)經(jīng)驗(yàn)—Leader Partition 負(fù)載平衡(todo)

正常情況下,Kafka本身會(huì)自動(dòng)把Leader Partition均勻分散在各個(gè)機(jī)器上,來(lái)保證每臺(tái)機(jī)器的讀寫(xiě)吞吐量都是均勻的。但是如果某些broker宕機(jī),會(huì)導(dǎo)致Leader Partition過(guò)于集中在其他少部分幾臺(tái)broker上,這會(huì)導(dǎo)致少數(shù)幾臺(tái)broker的讀寫(xiě)請(qǐng)求壓力過(guò)高,其他宕機(jī)的broker重啟之后都是follower partition,讀寫(xiě)請(qǐng)求很低,造成集群負(fù)載不均衡文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-676707.html

4.3.7 生產(chǎn)經(jīng)驗(yàn)—增加副本因子(todo)


4.4 文件存儲(chǔ)

4.4.1 文件存儲(chǔ)機(jī)制(todo)

4.4.2 文件清洗策略(todo)

4.5 高效讀寫(xiě)數(shù)據(jù)

到了這里,關(guān)于Kafka(生產(chǎn)者)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 大數(shù)據(jù)開(kāi)發(fā)之Kafka(概述、快速入門(mén)、生產(chǎn)者)

    大數(shù)據(jù)開(kāi)發(fā)之Kafka(概述、快速入門(mén)、生產(chǎn)者)

    Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。 發(fā)布/訂閱:消息的發(fā)布者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接收感興趣的消息。 目前企業(yè)中比較常見(jiàn)的消息隊(duì)列產(chǎn)品主要有Kafka、ActiveM

    2024年01月19日
    瀏覽(20)
  • Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    1.1、至少一次 至少一次(At Least Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,kafka集群至少接收到一次數(shù)據(jù)。 至少一次的條件: ACK級(jí)別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2 1.2、最多一次 最多一次(At Most Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,

    2024年02月01日
    瀏覽(18)
  • 第3、4章 Kafka 生產(chǎn)者 和 消費(fèi)者 ——向 Kafka 寫(xiě)入數(shù)據(jù) 和讀取數(shù)據(jù)

    第3、4章 Kafka 生產(chǎn)者 和 消費(fèi)者 ——向 Kafka 寫(xiě)入數(shù)據(jù) 和讀取數(shù)據(jù)

    重要的特性: 消息通過(guò) 隊(duì)列來(lái)進(jìn)行交換 每條消息僅會(huì)傳遞給一個(gè)消費(fèi)者 消息傳遞有先后順序,消息被消費(fèi)后從隊(duì)列刪除(除非使用了消息優(yōu)先級(jí)) 生產(chǎn)者或者消費(fèi)者可以動(dòng)態(tài)加入 傳送模型: 異步即發(fā)即棄:生產(chǎn)者發(fā)送一條消息,不會(huì)等待收到一個(gè)響應(yīng) 異步請(qǐng)求、應(yīng)答:

    2024年02月20日
    瀏覽(21)
  • Kafka3.0.0版本——生產(chǎn)者數(shù)據(jù)有序與亂序

    Kafka3.0.0版本——生產(chǎn)者數(shù)據(jù)有序與亂序

    單分區(qū)內(nèi),數(shù)據(jù)有序。如下圖partion0、partion1、partion2分區(qū)內(nèi),各自分區(qū)內(nèi)的數(shù)據(jù)有序。 2.1、kafka1.x版本之前保證數(shù)據(jù)單分區(qū)有序的條件 kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下: 2.2、kafka1.x版本及以后保證數(shù)據(jù)單分區(qū)有序的條件 未開(kāi)啟冪等性 開(kāi)啟冪等性 2.3、kafka1

    2023年04月27日
    瀏覽(29)
  • SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

    SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

    目錄 一、Spark Streaming概述 二、添加依賴 三、配置log4j 1.依賴下載好后打開(kāi)IDEA最左側(cè)的外部庫(kù) 2.找到spark-core 3.找到apache.spark目錄 4.找到log4j-defaults.properties文件 5.將該文件放在資源目錄下,并修改文件名 6.修改log4j.properties第19行的內(nèi)容 四、Spark Streaming讀取Socket數(shù)據(jù)流 1.代碼編

    2023年04月27日
    瀏覽(19)
  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_(dá)原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認(rèn)配置,以及配置的參數(shù),開(kāi)啟網(wǎng)絡(luò)線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進(jìn)行消息key, value序列化 ???? 4. 進(jìn)行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • 三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生產(chǎn)者的 大體過(guò)程 初始化過(guò)程中會(huì)新建很多對(duì)象,本文暫先分享部分對(duì)象 1.分區(qū)器---Partitioner partitioner 2.重試時(shí)間---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.攔截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    瀏覽(32)
  • kafka-保證數(shù)據(jù)不重復(fù)-生產(chǎn)者開(kāi)啟冪等性和事務(wù)的作用?

    kafka-保證數(shù)據(jù)不重復(fù)-生產(chǎn)者開(kāi)啟冪等性和事務(wù)的作用?

    適用于消息在寫(xiě)入到服務(wù)器日志后,由于網(wǎng)絡(luò)故障,生產(chǎn)者沒(méi)有及時(shí)收到服務(wù)端的 ACK 消息,生產(chǎn)者誤以為消息沒(méi)有持久化到服務(wù)端,導(dǎo)致生產(chǎn)者重復(fù)發(fā)送該消息,造成了消息的重復(fù)現(xiàn)象,而冪等性就是為了解決該問(wèn)題。 通過(guò)3個(gè)值的唯一性去重: PID:生產(chǎn)者ID 分區(qū)號(hào) seq:?jiǎn)?/p>

    2024年02月14日
    瀏覽(17)
  • Apache Kafka - 重識(shí)Kafka生產(chǎn)者

    Apache Kafka - 重識(shí)Kafka生產(chǎn)者

    Kafka 生產(chǎn)者是 Apache Kafka 中的一個(gè)重要組件,它負(fù)責(zé)將數(shù)據(jù)發(fā)送到 Kafka 集群中。在實(shí)時(shí)數(shù)據(jù)處理和流式處理應(yīng)用程序中,Kafka 生產(chǎn)者扮演著非常重要的角色。 這里我們將介紹 Kafka 生產(chǎn)者的概念、工作原理以及如何使用 Kafka 生產(chǎn)者。 Kafka 生產(chǎn)者是一種用于將數(shù)據(jù)發(fā)送到 Kafk

    2024年02月05日
    瀏覽(22)
  • [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問(wèn)題

    [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問(wèn)題 Kafka是一個(gè)高度可擴(kuò)展的分布式流平臺(tái),用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流處理應(yīng)用程序。作為一個(gè)廣泛使用的消息代理系統(tǒng),Kafka在數(shù)據(jù)傳輸方面表現(xiàn)出色,但是在極端情況下,它可能會(huì)出現(xiàn)生產(chǎn)者阻塞的問(wèn)題。這可能會(huì)導(dǎo)致

    2024年02月11日
    瀏覽(20)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包