国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka原理&架構(gòu)深入

這篇具有很好參考價值的文章主要介紹了kafka原理&架構(gòu)深入。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

1. 下載安裝

https://www.cnblogs.com/zhangzhonghui/articles/12444070.html
kafka配置詳解
若kafka運(yùn)行在內(nèi)網(wǎng)服務(wù)器允許外網(wǎng)訪問,例如內(nèi)網(wǎng)ip: 172.10.22.134,外網(wǎng)ip: 9.70.168.130
進(jìn)行如下配置:

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://9.70.168.130:9092

外網(wǎng)訪問時使用9.70.168.130:9092訪問即可

2. 命令行命令

cd kafka安裝目錄

cd ~/kafka
  1. 后臺啟動
# zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# kafka
bin/kafka-server-start.sh -daemon config/server.properties
  1. 停止
bin/kafka-server-stop.sh
  1. topic
#創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic your_topic
# 查看topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 查看特定topic
bin/kafka-topics.sh --zookeeper localhost:2181 --topic your_topic --describe
  1. producer
# 發(fā)送數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic your_topic
  1. 查看 consumer-groups
# 新版
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
# 舊版
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

# 新版特定consumer-groups
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9292 --group YOUR_GROUP_ID --describe

# 舊版特定consumer-groups
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group YOUR_GROUP_ID --describe

  1. 開啟consumer消費(fèi)某個topic(從頭消費(fèi))
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC --from-beginning

3. 概述

3.1 定義

Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列,主要應(yīng)用于大數(shù)據(jù)實(shí)時處理領(lǐng)域。Kafka對消息保存時根據(jù)Topic進(jìn)行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實(shí)例組成,每個實(shí)例(server)成為broker。

3.2 基本架構(gòu)

kafka原理&架構(gòu)深入
kafka原理&架構(gòu)深入
1)Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
2)Consumer :消息消費(fèi)者,向kafka broker取消息的客戶端
3)Topic :可以理解為一個隊列。
4) Consumer Group (CG):消費(fèi)者組,由多個consumer組成。消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個消費(fèi)者組,即消費(fèi)者組是邏輯上的一個訂閱者。
5)Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
6)Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
7)Replica:副本,為保證集群中的某個節(jié)點(diǎn)發(fā)生故障時,該節(jié)點(diǎn)上的partition數(shù)據(jù)不丟失,且kafka仍然能夠繼續(xù)工作,kafka提供了副本機(jī)制,一個topic的每個分區(qū)都有若干個副本,一個leader和若干個follower。
8)leader:每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象都是leader。
9)follower:每個分區(qū)多個副本中的“從”,實(shí)時從leader中同步數(shù)據(jù),保持和leader數(shù)據(jù)的同步。leader發(fā)生故障時,某個follower會成為新的follower。
10)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka

4. 架構(gòu)深入

4.1 生產(chǎn)者

4.1.1 分區(qū)

1)分區(qū)的原因

  1. 方便在集群中擴(kuò)展,每個Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
  2. 可以提高并發(fā),因為可以以Partition為單位讀寫了。

2)分區(qū)的原則
我們需要將producer發(fā)送的數(shù)據(jù)封裝成一個ProducerRecord對象。
kafka原理&架構(gòu)深入

  1. 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
  2. 沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值;
  3. 既沒有 partition 值又沒有 key 值的情況下,第一次調(diào)用時隨機(jī)生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增),將這個值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說的 round-robin 算法。

4.1.2 數(shù)據(jù)可靠性保證

為保證producer發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的topic,topic的每個partition收到producer發(fā)送的數(shù)據(jù)后,都需要向producer發(fā)送ack(acknowledgement確認(rèn)收到),如果producer收到ack,就會進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。
kafka原理&架構(gòu)深入
1)副本數(shù)據(jù)同步策略

方案 優(yōu)點(diǎn) 缺點(diǎn)
半數(shù)以上完成同步,就發(fā)送ack 延遲低 選舉新的leader時,為了容忍n臺節(jié)點(diǎn)的故障,需要2n+1個副本
全部完成同步,才發(fā)送ack 選舉新的leader時,容忍n臺節(jié)點(diǎn)的故障,只需要n+1個副本 延遲高

Kafka選擇了第二種方案,原因如下:

  1. 同樣為了容忍n臺節(jié)點(diǎn)的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分區(qū)都有大量的數(shù)據(jù),第一種方案會造成大量數(shù)據(jù)的冗余。
  2. 雖然第二種方案的網(wǎng)絡(luò)延遲會比較高,但網(wǎng)絡(luò)延遲對Kafka的影響較小。

2)ISR

采用第二種方案之后,設(shè)想以下情景:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),但有一個follower,因為某種故障,遲遲不能與leader進(jìn)行同步,那leader就要一直等下去,直到它完成同步,才能發(fā)送ack。這個問題怎么解決呢?

Leader維護(hù)了一個動態(tài)的in-sync replica set (ISR),即leader保持同步的follower集合。當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會給follower發(fā)送ack。如果follower長時間未向leader同步數(shù)據(jù),則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定。Leader發(fā)生故障之后,就會從ISR中選舉新的leader。之前舊版本還有保留 replica.lag.max.messages,但后續(xù)被刪除了, 因為假如某個時刻數(shù)據(jù)量很大,follower還沒反應(yīng)過來就被踢了,但follower本身是健康的,這顯然是不合理的。

3)ack應(yīng)答機(jī)制

對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等ISR中的follower全部接收成功。所以Kafka為用戶提供了三種可靠性級別,用戶根據(jù)對可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。
acks:

  • 0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經(jīng)返回,當(dāng)broker故障時有可能丟失數(shù)據(jù)

  • 1:producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障,那么將會丟失數(shù)據(jù)
    kafka原理&架構(gòu)深入

  • -1(all):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成數(shù)據(jù)重復(fù)
    kafka原理&架構(gòu)深入
    4)故障處理細(xì)節(jié)
    kafka原理&架構(gòu)深入
    (1)follower故障
    follower發(fā)生故障后會被臨時踢出ISR,待該follower恢復(fù)后,follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向leader進(jìn)行同步。等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后,就可以重新加入ISR了。
    (2)leader故障
    leader發(fā)生故障之后,會從ISR中選出一個新的leader,之后,為保證多個副本之間的數(shù)據(jù)一致性,其余的follower會先將各自的log文件高于HW的部分截掉,然后從新的leader同步數(shù)據(jù)。

注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。數(shù)據(jù)的不丟失或不重復(fù)由ack來決定

4.1.3 Exactly Once語義

對于某些比較重要的消息,我們需要保證exactly once語義,即保證每條消息被發(fā)送且僅被發(fā)送一次。
在0.11版本之后,Kafka引入了冪等性機(jī)制(idempotent),配合acks = -1時的at least once語義,實(shí)現(xiàn)了producer到broker的exactly once語義(消費(fèi)者的exactly once后續(xù)再說)。

使用時,只需將enable.idempotence屬性設(shè)置為true,kafka自動將acks屬性設(shè)為-1

在Kafka中冪等性指相同的多條數(shù)據(jù)只會保存一條,那如何確認(rèn)是相同的數(shù)據(jù)?kafka中對于每條數(shù)據(jù)都有個id,id由producerId+SequenceNumber組成,當(dāng)數(shù)據(jù)到達(dá)kafka后,id將會被暫時緩存起來,若此時producer沒有收到ack會重發(fā)數(shù)據(jù),發(fā)現(xiàn)數(shù)據(jù)跟緩存中的數(shù)據(jù)id是一致的則不持久化
kafka原理&架構(gòu)深入

Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");

4.1.4 發(fā)送消息流程

Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送的過程中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulator。main線程將消息發(fā)送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。
kafka原理&架構(gòu)深入
相關(guān)參數(shù):

  • batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會發(fā)送數(shù)據(jù)。
  • linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.time之后就會發(fā)送數(shù)據(jù)。
  • buffer.memory:RecordAccumulator緩沖區(qū)大小
package com.atguigu.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數(shù)
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
        	// 同步方式 -- get()
        	// producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
        	// 異步:不帶回調(diào)函數(shù)
            // producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
            // 異步:帶回調(diào)函數(shù)
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {

                //回調(diào)函數(shù),該方法會在Producer收到ack時調(diào)用,為異步調(diào)用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

4.2 broker

4.2.1 日志結(jié)構(gòu)

Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向topic的。
topic是邏輯上的概念,而partition是物理上的概念,每個partition對應(yīng)于一個log文件,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。Producer生產(chǎn)的數(shù)據(jù)會被不斷追加到該log文件末端,且每條數(shù)據(jù)都有自己的offset。消費(fèi)者組中的每個消費(fèi)者,都會實(shí)時記錄自己消費(fèi)到了哪個offset,以便出錯恢復(fù)時,從上次的位置繼續(xù)消費(fèi)。

kafka原理&架構(gòu)深入

由于生產(chǎn)者生產(chǎn)的消息會不斷追加到log文件末尾,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,將每個partition分為多個segment。每個segment對應(yīng)兩個文件——“.index”文件和“.log”文件。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。例如,first這個topic有三個分區(qū),則其對應(yīng)的文件夾為first-0,first-1,first-2,每個文件夾下有如下文件:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址,index和log文件以當(dāng)前segment的第一條消息的offset命名
kafka原理&架構(gòu)深入

4.2.2 存儲策略

無論消息是否被消費(fèi),kafka都會保留所有消息,當(dāng)然可對消息進(jìn)行壓縮或者刪除。
有兩種策略可以刪除舊數(shù)據(jù):
1)基于時間:log.retention.hours=168
2)基于大?。簂og.retention.bytes=1073741824
需要注意的是,因為Kafka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。

4.2.3 Controller & ZooKeeper

Kafka集群中有一個broker會被選舉為Controller(先到先得),負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作,Controller的管理工作都是依賴于Zookeeper的。
以下為partition的leader選舉過程:
kafka原理&架構(gòu)深入
(1) 到zookeeper中的brokers/ids/注冊節(jié)點(diǎn)信息 [0,1,2]
(2) 從ISR中獲取選取leader 0,到/brokers/topics/first/partitions/0/state更新 topic的partition信息
(3) leader 0發(fā)生故障,通知 /brokers/ids/節(jié)點(diǎn),更新為 [1,2]
(4) Controller監(jiān)聽到節(jié)點(diǎn)發(fā)生變化,則重新獲取ISD,選舉新leader,再更新leader及ISR

4.2.4 高效讀寫數(shù)據(jù)

1)順序?qū)懘疟P
Kafka的producer生產(chǎn)數(shù)據(jù),要寫入到log文件中,寫的過程是一直追加到文件末端,為順序?qū)?。官網(wǎng)有數(shù)據(jù)表明,同樣的磁盤,順序?qū)懩艿降?00M/s,而隨機(jī)寫只有100k/s。這與磁盤的機(jī)械機(jī)構(gòu)有關(guān),順序?qū)懼钥?,是因為其省去了大量磁頭尋址的時間。

2) Page Cache
為了優(yōu)化讀寫性能,Kafka 利用了操作系統(tǒng)本身的 Page Cache。數(shù)據(jù)通過mmap內(nèi)存映射的方式直接寫入page cache,定時刷新臟頁到磁盤。消費(fèi)者拉取消息時,如果數(shù)據(jù)在page cache中,甚至能不需要去讀磁盤io。讀操作可直接在 Page Cache 內(nèi)進(jìn)行。如果消費(fèi)和生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤(直接通過 Page Cache)交換數(shù)據(jù)。

3)零拷貝技術(shù)
kafka原理&架構(gòu)深入
傳統(tǒng)路徑:File -> Page Cache -> Application Cache -> Socket Cache -> NIC
Kafka零拷貝過程:File -> Page Cache -> NIC
為啥可以進(jìn)行零拷貝?因為kafka寫進(jìn)去.log文件的數(shù)據(jù)就是經(jīng)過序列化的,讀取出來的數(shù)據(jù)就不要經(jīng)過用戶空間的處理了,減少了不必要的拷貝次數(shù)和用戶態(tài)和內(nèi)核態(tài)的切換,大大減少讀取的時延

參考:kafka的零拷貝

4) 分區(qū)分段+稀疏索引
Kafka 的 message 是按 topic分 類存儲的,topic 中的數(shù)據(jù)又是按照一個一個的 partition 即分區(qū)存儲到不同 broker 節(jié)點(diǎn)。每個 partition 對應(yīng)了操作系統(tǒng)上的一個文件夾,partition 實(shí)際上又是按照segment分段存儲的。通過這種分區(qū)分段的設(shè)計,Kafka 的 message 消息實(shí)際上是分布式存儲在一個一個小的 segment 中的,每次文件操作也是直接操作的 segment。為了進(jìn)一步的查詢優(yōu)化,Kafka 又默認(rèn)為分段后的數(shù)據(jù)文件建立了索引文件,就是文件系統(tǒng)上的.index文件。這種分區(qū)分段+索引的設(shè)計,不僅提升了數(shù)據(jù)讀取的效率,同時也提高了數(shù)據(jù)操作的并行度。

5) 批量讀寫
生產(chǎn)者可以借助累加器,批量發(fā)送消息,消費(fèi)者也可以批量拉取消費(fèi)

4.3 消費(fèi)者

4.3.1 消費(fèi)方式

push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因為消息發(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
所以consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費(fèi)者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。針對這一點(diǎn),Kafka的消費(fèi)者在消費(fèi)數(shù)據(jù)時會傳入一個時長參數(shù)timeout,如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),consumer會等待一段時間之后再返回,這段時長即為timeout。

4.3.2 分區(qū)分配策略

一個consumer group中有多個consumer,一個 topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費(fèi)。
Kafka有三種消費(fèi)者分區(qū)分配策略:
1.RoundRobin
2.Range
3.Sticky

kafka在0.11版本引入了Sticky分區(qū)分配策略,它的兩個主要目的是:

  • 分區(qū)的分配要盡可能的均勻,分配給消費(fèi)者者的主題分區(qū)數(shù)最多相差一個;
  • 分區(qū)的分配盡可能的與上次分配的保持相同。

當(dāng)兩者發(fā)生沖突時,第一個目標(biāo)優(yōu)先于第二個目標(biāo)。

參考:kafka的消費(fèi)者分區(qū)分配策略

4.3.3 offset的維護(hù)

由于consumer在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,consumer恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以consumer需要實(shí)時記錄自己消費(fèi)到了哪個offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
Kafka 0.9版本之前,consumer默認(rèn)將offset保存在Zookeeper中,從0.9版本開始,consumer默認(rèn)將offset保存在Kafka一個內(nèi)置的topic中,該topic為__consumer_offsets

4.3.4 Eactly Once語義

需要保證消費(fèi)數(shù)據(jù)和提交offset是原子性的

4.4 攔截器

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實(shí)現(xiàn)clients端的定制化控制邏輯。
對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機(jī)會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用。
(2)onSend(ProducerRecord):
該方法封裝進(jìn)KafkaProducer.send方法中,即它運(yùn)行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算。
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息從RecordAccumulator成功發(fā)送到Kafka Broker之后,或者在發(fā)送過程中失敗時調(diào)用。并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運(yùn)行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率。
(4)close:
關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運(yùn)行在多個線程中,因此在具體實(shí)現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。

案例:
在消息發(fā)送前將時間戳信息加到消息value的最前部

package com.atguigu.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

	@Override
	public void configure(Map<String, ?> configs) {

	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		// 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + "," + record.value().toString());
	}
	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

	}

	@Override
	public void close() {

	}
}

主程序

package com.atguigu.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

	public static void main(String[] args) throws Exception {
		// 1 設(shè)置配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop102:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 構(gòu)建攔截鏈
		List<String> interceptors = new ArrayList<>();
		interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		 
		String topic = "first";
		Producer<String, String> producer = new KafkaProducer<>(props);
		
		// 3 發(fā)送消息
		for (int i = 0; i < 10; i++) {
			
		    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
		    producer.send(record);
		}
		 
		// 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法
		producer.close();
	}
}

5. 面試題

  1. Kafka中的ISR、AR又代表什么?
    ISR:與leader保持同步的follower集合
    AR:分區(qū)的所有副本

  2. Kafka中的HW、LEO等分別代表什么?
    LEO:沒個副本的最后條消息的offset
    HW:一個分區(qū)中所有副本最小的offset

  3. Kafka中是怎么體現(xiàn)消息順序性的?
    每個分區(qū)內(nèi),每條消息都有一個offset,故只能保證分區(qū)內(nèi)有序。

  4. Kafka中的分區(qū)器、序列化器、攔截器是否了解?它們之間的處理順序是什么?
    攔截器 -> 序列化器 -> 分區(qū)器
    但攔截器中主要有兩個方法,onSend() 和 onAcknowledgement(),onSend()是在消息被序列化以及計算分區(qū)前調(diào)用該方法,onAcknowledgement()成功發(fā)送到Kafka Broker之后,或者在發(fā)送過程中失敗時調(diào)用

  5. Kafka生產(chǎn)者客戶端的整體結(jié)構(gòu)是什么樣子的?使用了幾個線程來處理?分別是什么?
    兩個線程:主線程和sender線程

  6. “消費(fèi)組中的消費(fèi)者個數(shù)如果超過topic的分區(qū),那么就會有消費(fèi)者消費(fèi)不到數(shù)據(jù)”這句話是否正確?
    正確

  7. 消費(fèi)者提交消費(fèi)位移時提交的是當(dāng)前消費(fèi)到的最新消息的offset還是offset+1?
    offset+1

  8. 有哪些情形會造成重復(fù)消費(fèi)?
    生產(chǎn)者:ack=-1時,leader和follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成數(shù)據(jù)重復(fù)。
    消費(fèi)者:先消費(fèi)后提交offset

  9. 那些情景會造成消息漏消費(fèi)?
    生產(chǎn)者:ack=0,broker還沒完成寫入 或 ack=-1,leader落盤返回ack,但follower還沒完成同步leader就掛了
    消費(fèi)者:先提交offset,后消費(fèi),有可能造成數(shù)據(jù)的重復(fù)

  10. 當(dāng)你使用kafka-topics.sh創(chuàng)建(刪除)了一個topic之后,Kafka背后會執(zhí)行什么邏輯?
    1)會在zookeeper中的/brokers/topics節(jié)點(diǎn)下創(chuàng)建一個新的topic節(jié)點(diǎn),如:/brokers/topics/first
    2)觸發(fā)Controller的監(jiān)聽程序
    3)kafka Controller 負(fù)責(zé)topic的創(chuàng)建工作,并更新metadata cache

  11. topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?
    可以增加
    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3

  12. topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?
    不可以減少,被刪除的分區(qū)數(shù)據(jù)難以處理(該分區(qū)的數(shù)據(jù)該如何被其他分區(qū)消費(fèi)?如何分配?從哪里開始消費(fèi)?等一系列難以處理的問題,所以kafka不支持)。

  13. Kafka有內(nèi)部的topic嗎?如果有是什么?有什么所用?
    __consumer_offsets,保存消費(fèi)者offset

  14. Kafka分區(qū)分配的概念?
    一個topic多個分區(qū),一個消費(fèi)者組多個消費(fèi)者,故需要將分區(qū)分配個消費(fèi)者(roundrobin、range、Sticky)

  15. 簡述Kafka的日志目錄結(jié)構(gòu)?
    每個分區(qū)對應(yīng)一個文件夾,文件夾的命名為topic-0,topic-1,內(nèi)部為.log和.index文件

  16. 聊一聊Kafka Controller的作用?
    負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。

  17. Kafka中有那些地方需要選舉?這些地方的選舉策略又有哪些?
    partition leader(ISR),controller(先到先得)

  18. 失效副本是指什么?有那些應(yīng)對措施?
    不能及時與leader同步,暫時踢出ISR,恢復(fù)后讀取磁盤中持久化的HW,從HW開始消費(fèi),等其追上leader之后再重新加入

  19. Kafka的那些設(shè)計讓它有如此高的性能?
    分區(qū),順序?qū)懘疟P,0-copy

  20. Kafka如何保證Exactly Once語義
    生產(chǎn)者:ack=-1并且開啟冪等性,enable.idempotence屬性設(shè)置為true(該選項會自動將ack設(shè)置為-1)
    消費(fèi)者:保證消費(fèi)和提交offset是原子性的

  21. 如何提升吞吐量?

  • 提升生產(chǎn)吞吐量
    (1)buffer.memory:發(fā)送消息的緩沖區(qū)大小,默認(rèn)值是 32m,可以增加到 64m。
    (2)batch.size:默認(rèn)是 16k。如果 batch 設(shè)置太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;
    如果 batch 太大,會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時。
    (3)linger.ms,這個值默認(rèn)是 0,意思就是消息必須立即被發(fā)送。一般設(shè)置一個 5-100
    毫秒。如果 linger.ms 設(shè)置的太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;如果 linger.ms 太長,
    會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時。
    (4)compression.type:默認(rèn)是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不
    錯的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會加大 producer 端的 CPU 開銷。

  • 增加分區(qū)

  • 消費(fèi)者提高吞吐量
    (1)增大 fetch.max.bytes 大?。J(rèn)是 50m) – 批量消費(fèi)每次最多消費(fèi)多少條消息
    (2)增大 max.partition.fetch.bytes(默認(rèn)是 1m) – 一次fetch請求中,從一個partition中取得的records最大大小
    (3)增大 max.poll.records 大小(默認(rèn)是 500 條) – 一次fetch請求中從一個broker中取得records的最大大小

  • 增加下游消費(fèi)者處理能力文章來源地址http://www.zghlxwxcb.cn/news/detail-500415.html

到了這里,關(guān)于kafka原理&架構(gòu)深入的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 架構(gòu)師系列- 消息中間件(13)-kafka深入應(yīng)用

    架構(gòu)師系列- 消息中間件(13)-kafka深入應(yīng)用

    1)配置文件 ?2)啟動信息 4.2.1 發(fā)送類型 KafkaTemplate調(diào)用send時默認(rèn)采用異步發(fā)送,如果需要同步獲取發(fā)送結(jié)果,調(diào)用get方法 詳細(xì)代碼參考:AsyncProducer.java 消費(fèi)者使用:KafkaConsumer.java 1)同步發(fā)送 通過swagger發(fā)送,控制臺可以正常打印send result swagger訪問地址:http://localhost:808

    2024年04月29日
    瀏覽(25)
  • kafka架構(gòu)和原理詳解

    kafka架構(gòu)和原理詳解

    Apache Kafka 是一個分布式流數(shù)據(jù)平臺,用于高吞吐量、持久性、可擴(kuò)展的發(fā)布和訂閱消息。它具有高度的可靠性,被廣泛用于構(gòu)建實(shí)時數(shù)據(jù)流處理、日志收集和數(shù)據(jù)管道等應(yīng)用。 1. 主題(Topic): 主題是消息的邏輯分類 生產(chǎn)者將消息發(fā)布到特定的主題中,而消費(fèi)者可以訂閱一

    2024年02月10日
    瀏覽(28)
  • Kafka架構(gòu)原理(超級詳細(xì))

    Kafka架構(gòu)原理(超級詳細(xì))

    1 ) Kafka 是開源 消息系統(tǒng) 2 )最初由 LinkedIn 公司開發(fā),2011年開源,2012年10月從 Apache 畢業(yè)。 項目目標(biāo)是:為處理實(shí)時數(shù)據(jù),提供一個統(tǒng)一、高通量、低等待的平臺。 3 ) Kafka 是一個分布式消息隊列。 Kafka 對消息根據(jù) Topic 進(jìn)行歸類。發(fā)送消息 Producer,接收消息 Consumer kafka

    2024年02月08日
    瀏覽(22)
  • Kafka架構(gòu)原理(三)

    Kafka架構(gòu)原理(三)

    3.1 整體架構(gòu)圖 一個典型的kafka集群中包含若干個Producer,若干個Broker,若干個Consumer,以及一個zookeeper集群; kafka通過zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時進(jìn)行Rebalance(負(fù)載均衡);Producer使用push模式將消息發(fā)布到Broker;Consumer使用pull模式從Broker中訂

    2024年02月05日
    瀏覽(15)
  • Kafka原理、部署與實(shí)踐——深入理解Kafka的工作原理和使用場景,全面介紹Kafka在實(shí)際生產(chǎn)環(huán)境中的部署

    作者:禪與計算機(jī)程序設(shè)計藝術(shù) 隨著互聯(lián)網(wǎng)的發(fā)展,網(wǎng)站的流量呈爆炸性增長,傳統(tǒng)的基于關(guān)系型數(shù)據(jù)庫的數(shù)據(jù)處理無法快速響應(yīng)。而NoSQL技術(shù)如HBase、MongoDB等被廣泛應(yīng)用于分布式數(shù)據(jù)存儲與處理,卻沒有提供像關(guān)系型數(shù)據(jù)庫一樣的ACID特性、JOIN操作及完整性約束。因此,很

    2024年02月09日
    瀏覽(19)
  • 深入Kafka核心設(shè)計與實(shí)踐原理讀書筆記第二章

    深入Kafka核心設(shè)計與實(shí)踐原理讀書筆記第二章

    配置生產(chǎn)者客戶端參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例。 構(gòu)建待發(fā)送的消息。 發(fā)送消息 關(guān)閉實(shí)列 參數(shù)說明 bootstrap.servers :用來指定生產(chǎn)者客戶端鏈接Kafka集群搜需要的broker地址清單,具體格式 host1:port1,host2:port2,可以設(shè)置一個或多個地址中間,號分割,參數(shù)默認(rèn) 空串。 這里要注意

    2023年04月08日
    瀏覽(54)
  • 深入理解 PostgreSQL 的架構(gòu)和內(nèi)部工作原理

    深入理解 PostgreSQL 的架構(gòu)和內(nèi)部工作原理

    ???? 博主 libin9iOak帶您 Go to New World.??? ?? 個人主頁——libin9iOak的博客?? ?? 《面試題大全》 文章圖文并茂??生動形象??簡單易學(xué)!歡迎大家來踩踩~?? ?? 《IDEA開發(fā)秘籍》學(xué)會IDEA常用操作,工作效率翻倍~?? ???? 希望本文能夠給您帶來一定的幫助??文章粗淺,敬

    2024年02月16日
    瀏覽(20)
  • 深入理解 Flink(一)Flink 架構(gòu)設(shè)計原理

    深入理解 Flink(一)Flink 架構(gòu)設(shè)計原理

    深入理解 Flink 系列文章已完結(jié),總共八篇文章,直達(dá)鏈接: 深入理解 Flink (一)Flink 架構(gòu)設(shè)計原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容錯深入分析 深入理解 Flink (三)Flink 內(nèi)核基礎(chǔ)設(shè)施源碼級原理詳解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    瀏覽(19)
  • MyBatis 架構(gòu)與原理深入解析,面試隨便問!

    MyBatis 架構(gòu)與原理深入解析,面試隨便問!

    作者:七寸知架構(gòu) 鏈接:https://www.jianshu.com/p/ec40a82cae28 本文主要講解JDBC怎么演變到Mybatis的漸變過程, 重點(diǎn)講解了為什么要將JDBC封裝成Mybaits這樣一個持久層框架 。再而論述Mybatis作為一個數(shù)據(jù)持久層框架本身有待改進(jìn)之處。 我們先看看我們最熟悉也是最基礎(chǔ)的通過JDBC查詢數(shù)

    2024年02月09日
    瀏覽(21)
  • 深入探討Eureka的三級緩存架構(gòu)與緩存運(yùn)行原理

    在當(dāng)今的軟件開發(fā)領(lǐng)域,分布式系統(tǒng)已經(jīng)成為了必 不可少的一部分。而在分布式系統(tǒng)中,服務(wù)的注冊與發(fā)現(xiàn)是其中的重要組成部分之一。Netflix開源的Eureka便是一款優(yōu)秀的服務(wù)發(fā)現(xiàn)框架,它采用了三級緩存架構(gòu)來提供高效的服務(wù)發(fā)現(xiàn)與注冊功能。本文將深入探討Eureka的三級緩

    2024年02月11日
    瀏覽(39)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包