国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Kafka】高級(jí)特性:生產(chǎn)者

這篇具有很好參考價(jià)值的文章主要介紹了【Kafka】高級(jí)特性:生產(chǎn)者。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

消息發(fā)送

消息生產(chǎn)流程

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

整個(gè)流程如下:

  1. Producer創(chuàng)建時(shí),會(huì)創(chuàng)建一個(gè)Sender線程并設(shè)置為守護(hù)線程。
  2. 生產(chǎn)消息時(shí),內(nèi)部其實(shí)是異步流程;生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時(shí)創(chuàng)建)。
  3. 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達(dá)到batch.size或者linger.ms達(dá)到上限,哪個(gè)先達(dá)到就算哪個(gè)。
  4. 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失敗原因允許重試,那么客戶端內(nèi)部會(huì)對(duì)該消息進(jìn)行重試。
  5. 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者。
  6. 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調(diào)返回。

ProducerRecord

在生產(chǎn)發(fā)送消息前,會(huì)將信息封裝成ProducerRecord對(duì)象。主要由以下幾部分組成:

private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;

其中主要有要發(fā)送的Topic名稱,要發(fā)送至那個(gè)分區(qū),以及要發(fā)送的數(shù)據(jù)和key。

其他的都比較好理解,key的作用是如果key存在的話,就會(huì)對(duì)key進(jìn)行hash,然后根據(jù)不同的結(jié)果發(fā)送至不同的分區(qū),這樣當(dāng)有相同的key時(shí),所有相同的key都會(huì)發(fā)送到同一個(gè)分區(qū),我們之前也提到,所有的新消息都會(huì)被添加到分區(qū)的尾部,進(jìn)而保證了數(shù)據(jù)的順序性。

序列化器

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

由于Kafka中的數(shù)據(jù)都是字節(jié)數(shù)組,在將消息發(fā)送到Kafka之前需要先將數(shù)據(jù)序列化為字節(jié)數(shù)組。

序列化器的作用就是用于序列化要發(fā)送的消息的。

Kafka使用org.apache.kafka.common.serialization.Serializer 接口用于定義序列化器,將泛型指定類型的數(shù)據(jù)轉(zhuǎn)換為字節(jié)數(shù)組,如下:

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

其中kafka提供了許多實(shí)現(xiàn)類。

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

除了上述提供的,還可以自定義序列化器,只要實(shí)現(xiàn)Serializer接口即可。

假如我們有如下實(shí)體類:

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

序列化類:

import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 
import java.io.UnsupportedEncodingException;
import java.nio.Buffer; 
import java.nio.ByteBuffer; 
import java.util.Map;


public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // do nothing
    }

    @Override
    public byte[] serialize(String topic, User data) {
        try {
            // 如果數(shù)據(jù)是null,則返回null 
            if (data == null) return null;
            Integer userId = data.getUserId();
            String username = data.getUsername();
            int length = 0;
            byte[] bytes = null;
            if (null != username) {
                bytes = username.getBytes("utf-8");
                length = bytes.length;
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(userId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("序列化數(shù)據(jù)異常");
        }
    }

    @Override
    public void close() {
        // do nothing
    }
}

分區(qū)器

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

分區(qū)器來計(jì)算消息該發(fā)送到哪個(gè)分區(qū)中。

默認(rèn)(DefaultPartitioner)分區(qū)計(jì)算:

  1. 如果record提供了分區(qū)號(hào),則使用record提供的分區(qū)號(hào)
  2. 如果record沒有提供分區(qū)號(hào),則使用key的序列化后的值的hash值對(duì)分區(qū)數(shù)量取模
  3. 如果record沒有提供分區(qū)號(hào),也沒有提供key,則使用輪詢的方式分配分區(qū)號(hào)。

也可以自定義分區(qū)器,需要

  1. 首先開發(fā)Partitioner接口的實(shí)現(xiàn)類
  2. 在KafkaProducer中進(jìn)行設(shè)置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)

攔截器

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

Producer攔截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于實(shí)現(xiàn)Client端的定制化控制邏輯。

對(duì)于Producer而言,Interceptor使得用戶在消息發(fā)送前以及Producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。同時(shí),Producer允許用戶指定多個(gè)Interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。

注意攔截器發(fā)生異常拋出的異常會(huì)被忽略;此外,也要注意一種情況:如果某個(gè)攔截器依賴上一個(gè)攔截器的結(jié)果,但是當(dāng)上一個(gè)攔截器異常,則該攔截器可能也不會(huì)正常工作,因?yàn)樗邮艿降氖巧弦粋€(gè)成功返回的結(jié)果,而不是期望的上一個(gè)攔截器結(jié)果。

Intercetpor的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  • onSend(ProducerRecord):該方法封裝進(jìn)KafkaProducer.send方法中,即運(yùn)行在用戶主線程中。Producer確保在消息被序列化以計(jì)算分區(qū)前調(diào)用該方法。用戶可以在該方法中對(duì)消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會(huì)影響目標(biāo)分區(qū)的計(jì)算。
  • onAcknowledgement(RecordMetadata, Exception):該方法會(huì)在消息被應(yīng)答之前或消息發(fā)送失敗時(shí)調(diào)用,并且通常都是在Producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運(yùn)行在Producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會(huì)拖慢Producer的消息發(fā)送效率。
  • close:關(guān)閉Interceptor,主要用于執(zhí)行一些資源清理工作。

如前所述,Interceptor可能被運(yùn)行在多個(gè)線程中,因此在具體實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全。另外倘若指定了多個(gè)Interceptor,則Producer將按照指定順序調(diào)用它們。

自定義攔截器步驟:

  1. 實(shí)現(xiàn)ProducerInterceptor接口
  2. 在KafkaProducer的設(shè)置中設(shè)置自定義的攔截器

案例:

消息實(shí)體類:

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

還是用上面的序列化器。

自定義攔截器1:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class InterceptorOne implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("攔截器1 -- go");


        // 消息發(fā)送的時(shí)候,經(jīng)過攔截器,調(diào)用該方法

        // 要發(fā)送的消息內(nèi)容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();


        // 攔截器攔下來之后根據(jù)原來消息創(chuàng)建的新的消息
        // 此處對(duì)原消息沒有做任何改動(dòng)
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                topic,
                partition,
                timestamp,
                key,
                value,
                headers
        );
        // 傳遞新的消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("攔截器1 -- back");
        // 消息確認(rèn)或異常的時(shí)候,調(diào)用該方法,該方法中不應(yīng)實(shí)現(xiàn)較重的任務(wù)
        // 會(huì)影響kafka生產(chǎn)者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        final Object classContent = configs.get("classContent");
        System.out.println(classContent);
    }
}

自定義攔截器2:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class InterceptorTwo implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("攔截器2 -- go");


        // 消息發(fā)送的時(shí)候,經(jīng)過攔截器,調(diào)用該方法

        // 要發(fā)送的消息內(nèi)容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();


        // 攔截器攔下來之后根據(jù)原來消息創(chuàng)建的新的消息
        // 此處對(duì)原消息沒有做任何改動(dòng)
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                topic,
                partition,
                timestamp,
                key,
                value,
                headers
        );
        // 傳遞新的消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("攔截器2 -- back");
        // 消息確認(rèn)或異常的時(shí)候,調(diào)用該方法,該方法中不應(yīng)實(shí)現(xiàn)較重的任務(wù)
        // 會(huì)影響kafka生產(chǎn)者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        final Object classContent = configs.get("classContent");
        System.out.println(classContent);
    }
}

正常那個(gè)運(yùn)行會(huì)打印:

攔截器1 -- go
攔截器2 -- go
攔截器1 -- back
攔截器2 -- back

即發(fā)送和回調(diào)都是按照鏈的順序正序來的。

生產(chǎn)者原理剖析

kafka整個(gè)消息發(fā)送流程如下圖:

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

生產(chǎn)消息的所有工作分給了2個(gè)線程協(xié)作完成:一個(gè)是主線程(負(fù)責(zé)消息的預(yù)處理),第二個(gè)是發(fā)送線程(sender線程負(fù)責(zé)將發(fā)送消息以及接受發(fā)送的結(jié)果)。

主線程

主線程:負(fù)責(zé)消息創(chuàng)建,攔截器,序列化器,分區(qū)器等操作,并將消息追加到消息收集器消息累加器(RecoderAccumulator)中。

消息累加器

參考:https://mp.weixin.qq.com/s/n8iCJWA13Xz-haZnB8zYFg

消息累加器的作用是緩存消息以便sender線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗提升性能。

消息累加器的緩存大小可以通過生產(chǎn)者參數(shù) buffer.memory 配置,默認(rèn)為32MB。

若主線程發(fā)送消息的速度超過sender線程發(fā)送消息的速度,會(huì)導(dǎo)致消息累加器被填滿,這時(shí)候再調(diào)用生產(chǎn)者客戶端的send方法會(huì)被阻塞,若阻塞超過60秒(由參數(shù)max.block.ms控制),則會(huì)拋出異常 BufferExhaustedException 。

消息收集器RecoderAccumulator每個(gè)分區(qū)都維護(hù)了一個(gè)Deque<ProducerBatch> 類型的雙端隊(duì)列。

ProducerBatch 可以理解為是 ProducerRecord 的集合,批量發(fā)送有利于提升吞吐量,降低網(wǎng)絡(luò)影響。

主線程發(fā)送的數(shù)據(jù)由這樣的結(jié)構(gòu)保存:首先按照 Topic 進(jìn)行劃分,每個(gè) Topic 會(huì)有一個(gè) MapkeyTopic,valueTopicInfo);之后,按照分區(qū)進(jìn)行劃分,TopicInfo 里也有一個(gè) Mapkey:分區(qū)號(hào),valueDeque<ProducerBatch>),每個(gè)雙端隊(duì)列會(huì)保存多個(gè)消息批次。當(dāng)有消息發(fā)送時(shí),會(huì)從對(duì)應(yīng) Topic 、對(duì)應(yīng)分區(qū)的雙端隊(duì)列的尾部取出一個(gè)批次,判斷是否可以將消息追加到后面。這種結(jié)構(gòu)的目的在于:

  1. 使用字節(jié)將更加緊湊,節(jié)約空間
  2. 多個(gè)小的消息組成一個(gè)批次一起發(fā)送,減少網(wǎng)絡(luò)請求次數(shù)提升吞吐量。因?yàn)?sender 線程發(fā)送消息的基本單位也是批次,它會(huì)從雙端隊(duì)列的頭部取數(shù)據(jù)發(fā)送。

ProducerBatch 的大小與 batch.size 參數(shù)(默認(rèn)16KB)密切相關(guān)。此外,Kafka 生產(chǎn)者使用 BufferPool 實(shí)現(xiàn) ByteBuffer 從而實(shí)現(xiàn)對(duì)內(nèi)存的復(fù)用。該緩存池只針對(duì)特定大?。?batch.size指定)的 ByteBuffer進(jìn)行管理,對(duì)于消息過大的緩存,不能做到重復(fù)利用。

消息累加器的基本結(jié)構(gòu)如下圖所示,紅色+綠色區(qū)域總大小32MB,一個(gè)池中單位 ByteBuffer 大小16KB。

【Kafka】高級(jí)特性:生產(chǎn)者,# kafka,kafka,消息中間件,分布式

假設(shè)剛啟動(dòng)新插入一條消息,對(duì)應(yīng)的 Topic 、對(duì)應(yīng)的 Deque<ProducerBatch> 為空,這時(shí)執(zhí)行allocate方法嘗試開辟空間,方法主要過程如下:

  1. 如果申請空間的大小大于最大空間(buffer.memory 默認(rèn)32MB),則直接拋出異常;
  2. 操作緩存池之前嘗試獲取可重入鎖,若獲取的空間(size)正好等于每個(gè)批次預(yù)設(shè)大小(batch.size 默認(rèn)16KB),則直接從Deque<ByteBuffer>中取出第一個(gè) ByteBuffer 返回;
  3. 若獲取的空間(size)大于批次預(yù)設(shè)大小,計(jì)算剩余的空閑空間,即池中空閑空間+池外空閑空間(nonPooledAvailableMemory)。如果剩余的空閑空間大于size,則進(jìn)行第4步;如果小于size,則進(jìn)行第5步;
  4. 直接使用池外空閑空間分配,若不夠再取池內(nèi)空閑空間,最后返回。
  5. 將當(dāng)前線程加入到等待隊(duì)列(waiters)的尾部,如果等待超時(shí)也沒有足夠的空間,則拋出異常;若中途被喚醒,則進(jìn)行下一步;
  6. 中途喚醒后有兩種情況,當(dāng)釋放的空間正好等于一個(gè)批次大小且自己沒有累計(jì)獲得空間,則獲取后返回;否則累計(jì)獲取釋放空間,滿足后才會(huì)返回。

發(fā)送線程

sender 線程從消息累加器獲取準(zhǔn)備好可以發(fā)送消息(等待時(shí)候是否超過linger.ms參數(shù)設(shè)置的時(shí)間、或批次個(gè)數(shù)大于1或第一個(gè)批次已滿)后,就可以準(zhǔn)備消費(fèi)消息了:

  • 遍歷每個(gè) Topic 下的分區(qū)批次,根據(jù)分區(qū) leader,將其處理為 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker節(jié)點(diǎn)。
  • 進(jìn)一步將<Node, List<ProducerBatch>轉(zhuǎn)化為<Node, Request>形式,此時(shí)才可以向服務(wù)端發(fā)送數(shù)據(jù)。
  • 在發(fā)送之前,Sender線程將消息以 Map<NodeId, Deque<Request>> 的形式保存到InFlightRequests 中進(jìn)行緩存,可以通過其獲取 leastLoadedNode ,即當(dāng)前Node中負(fù)載壓力最小的一個(gè),以實(shí)現(xiàn)消息的盡快發(fā)出。保存到InFlightRequests 中的目的是緩存已經(jīng)發(fā)出去但沒收到響應(yīng)的請求,NodeId 對(duì)應(yīng)一個(gè) broker 節(jié)點(diǎn) id ,也就是一個(gè)連接,每個(gè)連接最多堆積的未完成請求為5個(gè)(max.in.flight.requests.per.connection參數(shù)配置)。

生產(chǎn)者參數(shù)

除了前面說過的必要的5個(gè)參數(shù),如下:

  1. bootstrap.servers:配置生產(chǎn)者如何與broker建立連接。該參數(shù)設(shè)置的是初始化參數(shù)。如果生產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個(gè)部分broker的地址,而不是全部,當(dāng)生產(chǎn)者連接上此處指定的broker之后,在通過該連接發(fā)現(xiàn)集群中的其他節(jié)點(diǎn)。
  2. key.serializer:要發(fā)送信息的key數(shù)據(jù)的序列化類。設(shè)置的時(shí)候可以寫類名,也可以使用該類的Class對(duì)象。
  3. value.serializer:要發(fā)送消息的value數(shù)據(jù)的序列化類。設(shè)置的時(shí)候可以寫類名,也可以使用該類的Class對(duì)象。
  4. acks:默認(rèn)值:all。
    • acks=0:生產(chǎn)者不等待broker對(duì)消息的確認(rèn),只要將消息放到緩沖區(qū),就認(rèn)為消息已經(jīng)發(fā)送完成。該情形不能保證broker是否真的收到了消息,retries配置也不會(huì)生效。發(fā)送的消息的返回的消息偏移量永遠(yuǎn)是-1。
    • acks=1:表示消息只需要寫到主分區(qū)即可,然后就響應(yīng)客戶端,而不等待副本分區(qū)的確認(rèn)。在該情形下,如果主分區(qū)收到消息確認(rèn)之后就宕機(jī)了,而副本分區(qū)還沒來得及同步該消息,則該消息丟失。
    • acks=all:leader分區(qū)會(huì)等待所有的ISR副本分區(qū)確認(rèn)記錄。該處理保證了只要有一個(gè)ISR副本分區(qū)存活,消息就不會(huì)丟失。這是Kafka最強(qiáng)的可靠性保證,等效于 acks=-1。
  5. retries:retries重試次數(shù)。當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。跟客戶端收到錯(cuò)誤時(shí)重發(fā)一樣。如果設(shè)置了重試,還想保證消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否則在重試此失敗消息的時(shí)候,其他的消息可能發(fā)送成功了。

還有一些比較重要的參數(shù):

  • client.id :生產(chǎn)者發(fā)送請求的時(shí)候傳遞給broker的id字符串。 用于在broker的請求日志中追蹤什么應(yīng)用發(fā)送了什么消息。 一般該id是跟業(yè)務(wù)有關(guān)的字符串。

  • retry.backoff.ms :在向一個(gè)指定的主題分區(qū)重發(fā)消息的時(shí)候,重試之間的等待時(shí)間。 比如3次重試,每次重試之后等待該時(shí)間長度,再接著重試。在一些失敗的場 景,避免了密集循環(huán)的重新發(fā)送請求。 long型值,默認(rèn)100。

  • request.timeout.ms :客戶端等待請求響應(yīng)的最大時(shí)長。如果服務(wù)端響應(yīng)超時(shí),則會(huì)重發(fā)請求,除非達(dá)到重試次數(shù)。該設(shè)置應(yīng)該比 replica.lag.time.max.ms 要大,以免在服務(wù)器延遲時(shí)間內(nèi)重發(fā)消息。int類型值,默認(rèn): 30000。

  • buffer.memory :生產(chǎn)者可以用來緩存等待發(fā)送到服務(wù)器的記錄的總內(nèi)存字節(jié)。如果記錄的發(fā)送 速度超過了將記錄發(fā)送到服務(wù)器的速度,則生產(chǎn)者將阻塞 max.block.ms 的時(shí) 間,此后它將引發(fā)異常。此設(shè)置應(yīng)大致對(duì)應(yīng)于生產(chǎn)者將使用的總內(nèi)存,但并非 生產(chǎn)者使用的所有內(nèi)存都用于緩沖。一些額外的內(nèi)存將用于壓縮(如果啟用了 壓縮)以及維護(hù)運(yùn)行中的請求。long型數(shù)據(jù)。默認(rèn)值:33554432。

  • batch.size :當(dāng)多個(gè)消息發(fā)送到同一個(gè)分區(qū)的時(shí)候,生產(chǎn)者嘗試將多個(gè)記錄作為一個(gè)批來處理。批處理提高了客戶端和服務(wù)器的處理效率。 該配置項(xiàng)以字節(jié)為單位控制默認(rèn)批的大小。 所有的批小于等于該值。 發(fā)送給broker的請求將包含多個(gè)批次,每個(gè)分區(qū)一個(gè),并包含可發(fā)送的數(shù) 據(jù)。如果該值設(shè)置的比較小,會(huì)限制吞吐量(設(shè)置為0會(huì)完全禁用批處理)。如果 設(shè)置的很大,又有一點(diǎn)浪費(fèi)內(nèi)存,因?yàn)镵afka會(huì)永遠(yuǎn)分配這么大的內(nèi)存來參與 到消息的批整合中。

  • linger.ms:該配置設(shè)置了一個(gè)延遲,生產(chǎn)者不會(huì)立即將消息發(fā)送到broker,而是等待這么一段時(shí)間以累積消息,然 后將這段時(shí)間之內(nèi)的消息作為一個(gè)批次發(fā)送。該設(shè)置是批處理的另一個(gè)上限: 一旦批消息達(dá)到了 batch.size 指定的值,消息批會(huì)立即發(fā)送,如果積累的消 息字節(jié)數(shù)達(dá)不到 batch.size 的值,可以設(shè)置該毫秒值,等待這么長時(shí)間之 后,也會(huì)發(fā)送消息批。該屬性默認(rèn)值是0(沒有延遲)。如果設(shè)置 linger.ms=5 ,則在一個(gè)請求發(fā)送之前先等待5ms。long型值,默認(rèn):0。

  • max.request.size :單個(gè)請求的最大字節(jié)數(shù)。該設(shè)置會(huì)限制單個(gè)請求中消息批的消息個(gè)數(shù),以免單 個(gè)請求發(fā)送太多的數(shù)據(jù)。服務(wù)器有自己的限制批大小的設(shè)置,與該配置可能不 一樣。int類型值,默認(rèn)1048576。

  • interceptor.classes :指定在生產(chǎn)者接收到該消息,向Kafka集群傳輸之前,由序列化器處理之前,配置的攔截器對(duì)消息進(jìn)行處理。

  • partitioner.class :實(shí)現(xiàn)了接口 org.apache.kafka.clients.producer.Partitioner 的分區(qū) 器實(shí)現(xiàn)類,默認(rèn)為DefaultPartitioner。

  • send.buffer.bytes :TCP發(fā)送數(shù)據(jù)的時(shí)候使用的緩沖區(qū)(SO_SNDBUF)大小。如果設(shè)置為0,則使 用操作系統(tǒng)默認(rèn)的。

  • receive.buffer.bytes:TCP接收緩存(SO_RCVBUF),如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)的值。 int類型值,默認(rèn)32768。

  • security.protocol :跟broker通信的協(xié)議:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。string類型值,默認(rèn):PLAINTEXT。

  • max.block.ms:控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的 時(shí)長。當(dāng)緩存滿了或元數(shù)據(jù)不可用的時(shí)候,這些方法阻塞。在用戶提供的序列化器和分區(qū)器的阻塞時(shí)間不計(jì)入。long型值,默認(rèn):60000。

  • connections.max.idle.ms:當(dāng)連接空閑時(shí)間達(dá)到這個(gè)值,就關(guān)閉連接。long型數(shù)據(jù),默認(rèn):540000

  • max.in.flight.requests.per.connection :單個(gè)連接上未確認(rèn)請求的最大數(shù)量。達(dá)到這個(gè)數(shù)量,客戶端阻塞。如果該值大 于1,且存在失敗的請求,在重試的時(shí)候消息順序不能保證。 int類型值,默認(rèn)5。

  • reconnect.backoff.max.ms:對(duì)于每個(gè)連續(xù)的連接失敗,每臺(tái)主機(jī)的退避將成倍增加,直至達(dá)到此最大值。 在計(jì)算退避增量之后,添加20%的隨機(jī)抖動(dòng)以避免連接風(fēng)暴。 long型值,默認(rèn)1000。

  • reconnect.backoff.ms :嘗試重連指定主機(jī)的基礎(chǔ)等待時(shí)間。避免了到該主機(jī)的密集重連。該退避時(shí)間 應(yīng)用于該客戶端到broker的所有連接。 long型值,默認(rèn)50。

  • compression.type :生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是none,也就是不壓縮。 支持的值:none、gzip、snappy和lz4。 壓縮是對(duì)于整個(gè)批來講的,所以批處理的效率也會(huì)影響到壓縮的比例。文章來源地址http://www.zghlxwxcb.cn/news/detail-820370.html

到了這里,關(guān)于【Kafka】高級(jí)特性:生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 深入解析 Kafka生產(chǎn)者:關(guān)鍵特性與最佳實(shí)踐

    引言 Apache Kafka作為一個(gè)高度可擴(kuò)展且具有高效性的消息中間件,已經(jīng)成為現(xiàn)代大數(shù)據(jù)生態(tài)系統(tǒng)中的核心組件之一。在本文中,我們將專注于Kafka中的一個(gè)重要角色——生產(chǎn)者(Producer),探討其核心功能、工作原理及其關(guān)鍵配置項(xiàng),旨在幫助讀者更好地理解和優(yōu)化Kafka生產(chǎn)者的

    2024年03月17日
    瀏覽(33)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    生產(chǎn)者客戶端代碼 KafkaProducer 通過解析 producer.propeties 文件里面的屬性來構(gòu)造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、 RecordAccumulator消息累加器 、 元信息更新器 、啟動(dòng)發(fā)送請求的后臺(tái)線程 生產(chǎn)者元信息更新器 我們之前有講過. 客戶端都會(huì)保存集群的元信息,例如

    2023年04月09日
    瀏覽(31)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(28)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

    生產(chǎn)者發(fā)送消息流程參考圖1: 先從創(chuàng)建一個(gè)ProducerRecord對(duì)象開始,其中需要包含目標(biāo)主題和要發(fā)送的內(nèi)容。另外,還可以指定鍵、分區(qū)、時(shí)間戳或標(biāo)頭。在發(fā)送ProducerRecord對(duì)象時(shí),生產(chǎn)者需要先把鍵和值對(duì)象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 接下來,如果沒有顯式

    2024年02月13日
    瀏覽(28)
  • Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    生產(chǎn)者通過 producerRecord 對(duì)象封裝消息主題、消息的value(內(nèi)容)、timestamp(時(shí)間戳)等 生產(chǎn)者通過 send() 方法發(fā)送消息,send()方法會(huì)經(jīng)過如下幾步 1. 首先將消息交給 攔截器(Interceptor) 處理, 攔截器對(duì)生產(chǎn)者而言,對(duì)所有消息都是生效的,攔截器也支持鏈?zhǔn)骄幊蹋ㄘ?zé)任器鏈)的

    2024年02月16日
    瀏覽(24)
  • kafka服務(wù)端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設(shè)置為5242880,即5MB,可以根據(jù)實(shí)際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個(gè)消息最大字節(jié)數(shù),根據(jù)實(shí)際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費(fèi)者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    簡單來說,就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(46)
  • 【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞

    Kafka是常用的消息中間件。在Spring Boot項(xiàng)目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時(shí),為了不影響主業(yè)務(wù)流程,會(huì)采用 異步 發(fā)送的方式,如下所示。 本以為采用異步發(fā)送,必然不會(huì)影響到主業(yè)務(wù)流程。但實(shí)際使用時(shí)發(fā)現(xiàn),在第一次發(fā)送消息時(shí),如果Kafka Broker連接失敗,

    2023年04月13日
    瀏覽(26)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會(huì)阻塞等待服務(wù)器的響應(yīng)。如果發(fā)送失敗,生產(chǎn)者會(huì)拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯(cuò)誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進(jìn)行

    2024年02月06日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包