国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

這篇具有很好參考價值的文章主要介紹了三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

概述
本文主要是分享Kafka初始化生產(chǎn)者的大體過程

初始化過程中會新建很多對象,本文暫先分享部分對象

1.分區(qū)器---Partitioner partitioner

2.重試時間---long retryBackoffMs

3.序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer

4.攔截器---List<ProducerInterceptor<K, V>> interceptorList

5.累加器---RecordAccumulator accumulator

6.元數(shù)據(jù)---ProducerMetadata metadata

7.創(chuàng)建sender線程---Sender sender

生產(chǎn)者初始化代碼示例:

后續(xù)分享這些配置會被用到什么地方,生成上述哪些對象

// 這是構(gòu)建kafka生產(chǎn)者的[示例代碼]
// 設(shè)置屬性
Properties properties = new Properties();
// 指定連接的kafka服務(wù)器的地址,配置多臺的服務(wù)  用,分割, 其中一個宕機,生產(chǎn)者 依然可以連上(集群)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[kafka server ip]:[kafka server port]");
// 1.分區(qū)器---Partitioner partitioner
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
// 2.重試時間---long retryBackoffMs
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10L);
// 3.key和value的序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 4.攔截器---List<ProducerInterceptor<K, V>> interceptorList
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyInterceptor.class);
// 構(gòu)建kafka生產(chǎn)者對象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

一、分區(qū)器

對應(yīng)初始化時設(shè)置的參數(shù):

ProducerConfig.PARTITIONER_CLASS_CONFIG

示意:

分區(qū)器是在發(fā)送消息時用來計算消息將要發(fā)送到哪個分區(qū)的,支持自定義分區(qū)器

// 這是kafka client 初始化生產(chǎn)者的[源碼]
// 如果沒有設(shè)置自定義分區(qū)器,則partitioner為null,會影響到后續(xù)初始化邏輯以及發(fā)送消息時的邏輯
this.partitioner = config.getConfiguredInstance(
                    ProducerConfig.PARTITIONER_CLASS_CONFIG,
                    Partitioner.class,
                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
自定義分區(qū)器示例代碼

如需使用自定義分區(qū)器,需要考慮好分區(qū)負載問題,切勿為了解決需求盲目使用自定義分區(qū);

分區(qū)不合理可能影響broker性能,也是對低負載分區(qū)資源的浪費,嚴重情況下某一分區(qū)的消費者負載過大,或某一分區(qū)broker負載過大,可能導(dǎo)致雪崩;

// 這是自定義分區(qū)器的[示例代碼]
public class MyPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int num = partitionInfos.size();
        // 與[org.apache.kafka.clients.producer.internals.DefaultPartitioner]
        // 計算分區(qū)時一樣的算法
        int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;
        return parId;
    }

    public void close() {//do nothing}
    public void configure(Map<String, ?> configs) {//do nothing}
}
分區(qū)器使用

1.用戶可以通過實現(xiàn)該接口自定義分區(qū)器,在生產(chǎn)者調(diào)用send方法發(fā)送消息時,會使用用戶自定義的分區(qū)器計算消息要發(fā)送到哪個分區(qū),自定義分區(qū)器簡單實現(xiàn)見上面代碼塊

org.apache.kafka.clients.producer.KafkaProducer#partition

2.在生產(chǎn)者調(diào)用send方法發(fā)送消息時,如果使用用戶自定義的分區(qū)器,允許在第一次將消息放入本地緩存失敗時,進行一次嘗試:重現(xiàn)分配分區(qū)和本地緩存

// 這是kafka生產(chǎn)者調(diào)用send方法發(fā)送消息時的部分[源碼]
// check if we have an in-progress batch
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
    // After taking the lock, validate that the partition hasn't changed and retry.
    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
        continue;
    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
    if (appendResult != null) {
        // 這是第一次就將消息添加到緩存后返回
        // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
        boolean enableSwitch = allBatchesFull(dq);
        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
        return appendResult;
    }
}

// 這是第一次嘗試將消息放入本地緩存失敗后,判斷如果使用戶自定義的分區(qū)器,則返回一個對象
// 該對象將在append方法的調(diào)用處進行重新計算分區(qū)并重試一次
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
    // Return a result that will cause another call to append.
    return new RecordAppendResult(null, false, false, true, 0);
}

該對象還有別的使用場景暫不介紹

二、重試時間

對應(yīng)初始化時設(shè)置的參數(shù)

ProducerConfig.RETRY_BACKOFF_MS_CONFIG

示意:

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

嘗試重試對給定主題分區(qū)的失敗請求之前等待的時間。這避免了在某些失敗場景下以緊密循環(huán)的方式重復(fù)發(fā)送請求。

重試時間的使用

1.初始化累加器時作為入?yún)?,保存到累加器字段[long retryBackoffMs]中,用于sender線程發(fā)送消息時檢測重試超時

2.初始化元信息對象[org.apache.kafka.clients.Metadata]時,放入字段[long refreshBackoffMs]中,用于更新元信息前判斷等待時間

總結(jié)該字段主要用于向服務(wù)器循環(huán)發(fā)送請求時停頓等待

三、序列化器

對應(yīng)初始化時設(shè)置的參數(shù)

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG

ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

示意:

Serializer class for key that implements the org.apache.kafka.common.serialization.Serializerinterface.
實現(xiàn)org.apache.kafka.common.serialization.Serializer接口的key序列化程序類。


Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.

實現(xiàn)org.apache.kafka.commun.serialization.Serializer接口的值的序列化程序類。

自定義序列化器
// 序列化對象
public class UserSerializer implements Serializer<User> {
    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public byte[] serialize(String topic, User data) {
        try {
            byte[] name;
            int nameSize;
            if (data == null) {
                return null;
            }
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
                //字符串的長度
                nameSize = data.getName().length();
            } else {
                name = new byte[0];
                nameSize = 0;
            }
            /*id的長度4個字節(jié),字符串的長度描述4個字節(jié),
            字符串本身的長度nameSize個字節(jié)*/
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize);
            buffer.putInt(data.getId());//4
            buffer.putInt(nameSize);//4
            buffer.put(name);//nameSize
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error serialize User:" + e);
        }
    }

    public void close() {
        //do nothing
    }
}

// 反序列化
public class UserDeserializer implements Deserializer<User> {


    public void configure(Map<String, ?> configs, boolean isKey) {
        //do nothing
    }

    public User deserialize(String topic, byte[] data) {
        try {
            if (data == null) {
                return null;
            }
            if (data.length < 8) {
                throw new SerializationException("Error data size.");
            }
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id;
            String name;
            int nameSize;
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameByte = new byte[nameSize];
            buffer.get(nameByte);
            name = new String(nameByte, "UTF-8");
            return new User(id, name);
        } catch (Exception e) {
            throw new SerializationException("Error Deserializer DemoUser." + e);
        }

    }

    public void close() {
        //do nothing
    }
}

// 消息中的實體類
public class User {
    private int id;
    private String name;

    public User(int id) {
        this.id = id;
    }

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

使用自定義序列化器時需要在消費者消費消息時使用反序列化器將消息反序列化,一般常用的就是字符串序列化器

org.apache.kafka.common.serialization.StringSerializer

四、攔截器

對應(yīng)初始化時設(shè)置的參數(shù)

ProducerConfig.INTERCEPTOR_CLASSES_CONFIG

示意:

Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors

通過實現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor接口,您可以在生產(chǎn)者接收到的記錄發(fā)布到kafka集群之前攔截這些記錄。默認情況下,沒有攔截器

自定義序攔截器
// 這是自定義序列化器[示例代碼]
public class MyInterceptor implements ProducerInterceptor<String, String> {
    private long successCount = 0L;
    private long errorCount = 0L;

    //該方法:Producer在將消息序列化和分配分區(qū)之前會調(diào)用攔截器的這個方法來對消息進行相應(yīng)的操作
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //要把發(fā)送的value都帶上時間戳
        return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers());
    }

    //發(fā)送消息情況統(tǒng)計

    //該方法:會在消息從RecordAccumulator成功發(fā)送到Kafka Broker之后,或者在發(fā)送過程中失敗時調(diào)用。并且通常都是在producer回調(diào)邏輯觸發(fā)之前
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCount++;
        } else {
            errorCount++;
        }
    }
    //該方法:可以關(guān)閉攔截器,主要用于執(zhí)行一些資源清理工作
    @Override
    public void close() {
        //producer發(fā)送數(shù)據(jù)結(jié)束并close后,會自動調(diào)用攔截器的close方法來輸出統(tǒng)計的成功和失敗次數(shù)
        System.out.println("成功次數(shù)=" + successCount);
        System.out.println("失敗次數(shù)=" + errorCount);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

五、累加器

這是Kafka中非常重要的內(nèi)部組件,主要用于緩存消息以便批量發(fā)送,從而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗并提升性能。

簡單概述:

對于生產(chǎn)者的作用是使用累加器,可以讓生產(chǎn)者不必每次發(fā)送消息就即刻推送到broker,可以將一個topic的同一分區(qū)消息寫入同一份緩存,等待sender線程批量獲取這批消息一次性發(fā)送到broker。減少生產(chǎn)者發(fā)起網(wǎng)絡(luò)調(diào)用的次數(shù)。

對broker而言的作用是,broker也可以一次性將接收到的這一批多個消息以順序IO的方式追加到文件中,提高了儲存效率。

示意:

This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
this behavior is explicitly disabled.

此類充當(dāng)一個隊列,將記錄累積到要發(fā)送到服務(wù)器的MemoryRecords實例中。
累加器使用一定量的內(nèi)存,當(dāng)內(nèi)存耗盡時,追加調(diào)用將被阻塞,除非此行為被明確禁用。

累加器核心作用

1.消息緩存:RecordAccumulator實際上是在客戶端開辟出的一塊內(nèi)存區(qū)域,主要用來緩存消息。這種緩存機制允許Sender線程后續(xù)批量發(fā)送消息,而不是每調(diào)用一次send方法就直接將消息發(fā)送給broker。
2.批量發(fā)送:當(dāng)觸發(fā)發(fā)送條件如MemoryRecords緩存或者Deque隊列滿了或者一個隊列等待時間達到配置時間,sender線程將一次性將這批消息發(fā)送給broker。這種方式可以減少網(wǎng)絡(luò)請求的數(shù)量,提高系統(tǒng)的吞吐量。這些觸發(fā)條件可參考下面參數(shù)配置的
buffer.memory,batch.size,linger.ms,max.block.ms
3.資源管理:如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,那么累加器中空間不足的話,就會導(dǎo)致生產(chǎn)者無法繼續(xù)發(fā)送消息。在這種情況下,生產(chǎn)者可以通過設(shè)置max.block.ms參數(shù)來控制是否阻塞或者拋出異常。如果max.block.ms參數(shù)的默認值為60000(即60秒),那么超過這個時間限制后,如果累加器仍然沒有足夠的空間,生產(chǎn)者將無法繼續(xù)發(fā)送消息。

關(guān)于累加器的實現(xiàn),涉及到內(nèi)部緩存管理,broker服務(wù)器元數(shù)據(jù)統(tǒng)計,sender線程交互等邏輯,后續(xù)再進行分享吧

如下這些參數(shù)都是有關(guān)累加器的重要配置,直接影響kafka生產(chǎn)者發(fā)送消息的性能

配置 解釋 默認值
buffer.memory Producer 用來緩沖等待被發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)。如果記錄發(fā)送的速度比發(fā)送到服務(wù)器的速度快, Producer 就會阻塞,如果阻塞的時間超過?max.block.ms?配置的時長,則會拋出一個異常。

這個配置與 Producer 的可用總內(nèi)存有一定的對應(yīng)關(guān)系,但并不是完全等價的關(guān)系,因為 Producer 的可用內(nèi)存并不是全部都用來緩存。一些額外的內(nèi)存可能會用于壓縮(如果啟用了壓縮),以及維護正在運行的請求。

33554432
batch.size 當(dāng)將多個記錄被發(fā)送到同一個分區(qū)時, Producer 將嘗試將記錄組合到更少的請求中。這有助于提升客戶端和服務(wù)器端的性能。這個配置控制一個批次的默認大?。ㄒ宰止?jié)為單位)。

當(dāng)記錄的大小超過了配置的字節(jié)數(shù), Producer 將不再嘗試往批次增加記錄。

發(fā)送到 broker 的請求會包含多個批次的數(shù)據(jù),每個批次對應(yīng)一個 partition 的可用數(shù)據(jù)

小的 batch.size 將減少批處理,并且可能會降低吞吐量(如果 batch.size = 0的話將完全禁用批處理)。 很大的 batch.size 可能造成內(nèi)存浪費,因為我們一般會在 batch.size 的基礎(chǔ)上分配一部分緩存以應(yīng)付額外的記錄。

16384
linger.ms producer 會將兩個請求發(fā)送時間間隔內(nèi)到達的記錄合并到一個單獨的批處理請求中。通常只有當(dāng)記錄到達的速度超過了發(fā)送的速度時才會出現(xiàn)這種情況。然而,在某些場景下,即使處于可接受的負載下,客戶端也希望能減少請求的數(shù)量。這個設(shè)置是通過添加少量的人為延遲來實現(xiàn)的;即,與其立即發(fā)送記錄, producer 將等待給定的延遲時間,以便將在等待過程中到達的其他記錄能合并到本批次的處理中。這可以認為是與 TCP 中的 Nagle 算法類似。這個設(shè)置為批處理的延遲提供了上限:一旦我們接受到記錄超過了分區(qū)的 batch.size ,Producer 會忽略這個參數(shù),立刻發(fā)送數(shù)據(jù)。但是如果累積的字節(jié)數(shù)少于 batch.size ,那么我們將在指定的時間內(nèi)“逗留”(linger),以等待更多的記錄出現(xiàn)。這個設(shè)置默認為0(即沒有延遲)。例如:如果設(shè)置linger.ms=5?,則發(fā)送的請求會減少并降低部分負載,但同時會增加5毫秒的延遲。 0
max.block.ms 該配置控制?KafkaProducer.send()KafkaProducer.partitionsFor()?允許被阻塞的時長。這些方法可能因為緩沖區(qū)滿了或者元數(shù)據(jù)不可用而被阻塞。用戶提供的序列化程序或分區(qū)程序的阻塞將不會被計算到這個超時。

六、初始化元數(shù)據(jù)

對應(yīng)初始化時設(shè)置的參數(shù)

ProducerConfig.METADATA_MAX_AGE_CONFIG

示意:

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

以毫秒為單位的時間段,在此之后,即使我們沒有看到任何主分區(qū)的變化,我們也會強制刷新元數(shù)據(jù),以主動發(fā)現(xiàn)任何新的代理或分區(qū)

---元數(shù)據(jù)刷新時間

ProducerConfig.METADATA_MAX_IDLE_CONFIG

示意:

Controls how long the producer will cache metadata for a topic that's idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic's metadata is forgotten and the next access to it will force a metadata fetch request.

控制生產(chǎn)者為空閑主題緩存元數(shù)據(jù)的時間。如果自上次生成主題以來經(jīng)過的時間超過了元數(shù)據(jù)空閑持續(xù)時間,則該主題的元數(shù)據(jù)將被遺忘,下一次對其的訪問將強制執(zhí)行元數(shù)據(jù)獲取請求。

---生產(chǎn)者客戶端為[某一主題]緩存元數(shù)據(jù)的時間,超過該時間后獲取該主題元數(shù)據(jù)將強制從broker獲取

初始化元數(shù)據(jù)信息分為兩部分,第一部分初始化ProducerMetadata對象,設(shè)置元數(shù)據(jù),topic信息緩存空閑時間如下源代碼:

this.metadata = new ProducerMetadata(retryBackoffMs,
        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
        config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
        logContext,
        clusterResourceListeners,
        Time.SYSTEM);

第二部分為加載broker節(jié)點信息Node,源代碼如下:

Node:org.apache.kafka.common.Node

this.metadata.bootstrap(addresses);
...
public synchronized void bootstrap(List<InetSocketAddress> addresses) {
    this.needFullUpdate = true;
    this.updateVersion += 1;
    this.cache = MetadataCache.bootstrap(addresses);
}
...
static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
    Map<Integer, Node> nodes = new HashMap<>();
    int nodeId = -1;
    for (InetSocketAddress address : addresses) {
        nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort()));
        nodeId--;
    }
    return new MetadataCache(null, nodes, Collections.emptyList(),
            Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
            null, Collections.emptyMap(), Cluster.bootstrap(addresses));
}

上面只是初始化元數(shù)據(jù)最外層的代碼,bootstrap是kafka生產(chǎn)者客戶端初始化broker信息緩存的入口,執(zhí)行這個方法后這個客戶端將緩存kakfa broker的節(jié)點信息,每個節(jié)點的topic信息,分片信息等緩存。在客戶端發(fā)送消息時,將通過這些緩存信息給broker發(fā)起請求,所以這塊緩存是生產(chǎn)者客戶端非常重要的部分,詳情請看:三、Kafka生產(chǎn)者4---核心組件[元數(shù)據(jù)]-Metadata-CSDN博客

七、創(chuàng)建sender線程

是一個無限循環(huán)運行在后臺的線程,會一直等待累加器中緩存的消息達到發(fā)送條件,把消息發(fā)送給Broker

發(fā)送的核心流程是:

1.從累加器中批量獲取消息并創(chuàng)建?ClientRequest對象
2.將ClientRequest對象交給NetworkClient客戶端發(fā)送
3.NetworkClient客戶端將請求放入KafkaChannel的緩存
4.NetworkClient執(zhí)行網(wǎng)絡(luò) I/O,完成請求的發(fā)送
5.NetworkClient收到響應(yīng),調(diào)用 ClientRequest 的回調(diào)函數(shù),觸發(fā)每個消息上注冊的回調(diào)函數(shù)

總結(jié):

本文大致介紹初始化kafka生產(chǎn)者的核心邏輯,創(chuàng)建的各類后續(xù)用于發(fā)送消息的對象,線程,配置信息等;

這些配置將直接影響生產(chǎn)者發(fā)送消息的性能和可靠性,如果需要在復(fù)雜應(yīng)用場景下追求兩者的平衡,需要對這些參數(shù)有深刻認識并調(diào)試驗證后上線!

kafka生產(chǎn)者配置參數(shù)中文網(wǎng)站:3. 配置 - 【布客】kafka 中文翻譯

kafka生產(chǎn)者配置參數(shù)英文網(wǎng)站:https://kafka.apache.org/documentation/#producerconfigs文章來源地址http://www.zghlxwxcb.cn/news/detail-839800.html

到了這里,關(guān)于三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 三、Kafka生產(chǎn)者

    三、Kafka生產(chǎn)者

    1 發(fā)送原理 在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker 【RecordAccumulator緩沖的結(jié)構(gòu): 每一個分區(qū)對應(yīng)一

    2024年02月12日
    瀏覽(21)
  • Kafka(生產(chǎn)者)

    Kafka(生產(chǎn)者)

    目 前 企 業(yè) 中 比 較 常 見 的 消 息 隊 列 產(chǎn) 品 主 要 有 Kafka(在大數(shù)據(jù)場景主要采用 Kafka 作為消息隊列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 傳統(tǒng)消息隊列的應(yīng)用場景 傳統(tǒng)的消息隊列的主要應(yīng)用場景包括: 緩存/消峰 、 解耦 和 異步通信 。 緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過

    2024年02月11日
    瀏覽(26)
  • 「Kafka」生產(chǎn)者篇

    「Kafka」生產(chǎn)者篇

    在消息發(fā)送的過程中,涉及到了 兩個線程 —— main 線程 和 Sender 線程 。 在 main 線程中創(chuàng)建了 一個 雙端隊列 RecordAccumulator 。 main線程將消息發(fā)送給RecordAccumulator,Sender線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。 main線程創(chuàng)建 Producer 對象,調(diào)用 send 函數(shù)發(fā)送消息,

    2024年01月19日
    瀏覽(23)
  • Kafka 生產(chǎn)者

    Kafka 生產(chǎn)者

    目錄 一、kafka生產(chǎn)者原理 二、kafka異步發(fā)送 配置kafka 創(chuàng)建對象,發(fā)送數(shù)據(jù) 帶回調(diào)函數(shù)的異步發(fā)送 同步發(fā)送 ? 三、kafka生產(chǎn)者分區(qū) 分區(qū)策略 指定分區(qū): ?指定key: 什么都不指定: 自定義分區(qū)器 四、生產(chǎn)者提高吞吐量 五、數(shù)據(jù)的可靠性 ACK應(yīng)答級別 數(shù)據(jù)完全可靠條件 可靠性

    2023年04月15日
    瀏覽(27)
  • Kafka生產(chǎn)者

    1.acks 如果acks=0,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。 缺點:如果當(dāng)中出現(xiàn)了問題,導(dǎo)致服務(wù)器沒有收到消息,那么生產(chǎn)者就無從得知,消息就丟失了 優(yōu)點:因為生產(chǎn)者不需要等待服務(wù)器的響應(yīng),所有他可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息,從而

    2024年01月19日
    瀏覽(36)
  • Kafka生產(chǎn)者相關(guān)概念

    Kafka生產(chǎn)者相關(guān)概念

    Kafka中消息是以topic進行分類的,Producer生產(chǎn)消息,Consumer消費消息,都是面向topic的。 Topic是邏輯上的概念,Partition是物理上的概念,每個Partition對應(yīng)著一個log文件,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。 寫入方式 producer采用推(push)模式將消息發(fā)布到broker,每條消息都

    2024年04月13日
    瀏覽(20)
  • 【Kafka】高級特性:生產(chǎn)者

    【Kafka】高級特性:生產(chǎn)者

    整個流程如下: Producer創(chuàng)建時,會創(chuàng)建一個Sender線程并設(shè)置為守護線程。 生產(chǎn)消息時,內(nèi)部其實是異步流程;生產(chǎn)的消息先經(jīng)過攔截器-序列化器-分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時創(chuàng)建)。 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達到batch.size或者

    2024年01月24日
    瀏覽(25)
  • Apache Kafka - 重識Kafka生產(chǎn)者

    Apache Kafka - 重識Kafka生產(chǎn)者

    Kafka 生產(chǎn)者是 Apache Kafka 中的一個重要組件,它負責(zé)將數(shù)據(jù)發(fā)送到 Kafka 集群中。在實時數(shù)據(jù)處理和流式處理應(yīng)用程序中,Kafka 生產(chǎn)者扮演著非常重要的角色。 這里我們將介紹 Kafka 生產(chǎn)者的概念、工作原理以及如何使用 Kafka 生產(chǎn)者。 Kafka 生產(chǎn)者是一種用于將數(shù)據(jù)發(fā)送到 Kafk

    2024年02月05日
    瀏覽(25)
  • [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題

    [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題 Kafka是一個高度可擴展的分布式流平臺,用于構(gòu)建實時數(shù)據(jù)管道和流處理應(yīng)用程序。作為一個廣泛使用的消息代理系統(tǒng),Kafka在數(shù)據(jù)傳輸方面表現(xiàn)出色,但是在極端情況下,它可能會出現(xiàn)生產(chǎn)者阻塞的問題。這可能會導(dǎo)致

    2024年02月11日
    瀏覽(21)
  • Spring Kafka生產(chǎn)者實現(xiàn)

    我們需要通過Spring Kafka庫,將消息推送給Kafka的topic中。這里假設(shè)Kafka的集群和用戶我們都有了。這里Kafka認證采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。 我這里不需要寫版本號,應(yīng)為我使用的Spring Boot。Spring Boot會自動幫我挑選spring-kafka應(yīng)該使用哪個版本合適

    2024年02月08日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包