国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

這篇具有很好參考價(jià)值的文章主要介紹了探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

1. 多線程安全

1.1. 生產(chǎn)者是多線程安全的么?

1.1. 消費(fèi)者是多線程安全的么?

2. 消費(fèi)者規(guī)避多線程安全方案

2.1. 每個(gè)線程維護(hù)一個(gè)kafkaConsumer

2.2. [單/多]kafkaConsumer實(shí)例 + 多worker線程

2.3.方案優(yōu)缺點(diǎn)對(duì)比


1. 多線程安全

1.1. 生產(chǎn)者是多線程安全的么?

????????Kafka生產(chǎn)者是線程安全的,可以在多個(gè)線程中共享一個(gè)Kafka生產(chǎn)者實(shí)例。這是因?yàn)镵afka生產(chǎn)者實(shí)例內(nèi)部使用了一些同步機(jī)制來(lái)保證線程安全,例如使用了線程安全的隊(duì)列來(lái)緩存消息,使用了同步鎖來(lái)保護(hù)共享資源的訪問(wèn)等。

????????同時(shí),Kafka生產(chǎn)者的send()方法是非阻塞的,可以在多個(gè)線程中并發(fā)調(diào)用,不會(huì)阻塞線程。Kafka生產(chǎn)者還提供了異步發(fā)送和同步發(fā)送兩種發(fā)送方式,可以根據(jù)實(shí)際需求選擇不同的發(fā)送方式。

然而,如果多個(gè)線程共享同一個(gè)Kafka生產(chǎn)者實(shí)例,需要注意以下幾點(diǎn):

  1. 同一個(gè)線程中不要同時(shí)調(diào)用send()方法和flush()方法,可能會(huì)導(dǎo)致消息發(fā)送順序不一致。

  2. 不同線程中調(diào)用send()方法時(shí),需要注意消息的順序,可以使用Kafka的分區(qū)機(jī)制來(lái)保證消息的順序。

  3. 如果多個(gè)線程發(fā)送的消息都是針對(duì)同一個(gè)主題或分區(qū),可能會(huì)導(dǎo)致消息的重復(fù)發(fā)送或丟失。因此,建議在多線程情況下,使用Kafka的分區(qū)機(jī)制,將消息發(fā)送到不同的分區(qū)中,以避免消息的重復(fù)和丟失。

????????綜上所述,Kafka生產(chǎn)者是線程安全的,可以在多個(gè)線程中共享一個(gè)Kafka生產(chǎn)者實(shí)例,但需要注意消息的順序和分區(qū)的使用,以保證消息的可靠性和順序性。

1.1. 消費(fèi)者是多線程安全的么?

????????Kafka消費(fèi)者是非線程安全的,主要原因是因?yàn)椋篕afka消費(fèi)者使用了內(nèi)部的狀態(tài)來(lái)跟蹤消費(fèi)進(jìn)度和偏移量。這種狀態(tài)包括消費(fèi)者的位置,消費(fèi)者的偏移量,以及消費(fèi)者的訂閱主題和分區(qū)等信息。如果多個(gè)線程同時(shí)訪問(wèn)同一個(gè)Kafka消費(fèi)者實(shí)例,就會(huì)導(dǎo)致這些狀態(tài)信息的不一致,從而導(dǎo)致消費(fèi)進(jìn)度出現(xiàn)錯(cuò)誤。

2. 消費(fèi)者規(guī)避多線程安全方案

背景:Kafka消費(fèi)者中具有poll()方法,該方法是一個(gè)阻塞方法,如果在同一個(gè)線程中多次調(diào)用poll()方法,會(huì)導(dǎo)致消費(fèi)者阻塞在poll()方法中,無(wú)法及時(shí)消費(fèi)新到達(dá)的消息。因此,為了提高Kafka消費(fèi)者的并發(fā)性能,通常使用多個(gè)消費(fèi)者線程來(lái)消費(fèi)+處理消息。為了保證Kafka消費(fèi)者的線程安全性,通常采用以下2種方案。

方案的本質(zhì):一個(gè)線程只能綁定一個(gè)消費(fèi)者實(shí)例(不能多個(gè)線程共用一個(gè)消費(fèi)者實(shí)例)

2.1. 每個(gè)線程創(chuàng)建一個(gè)Consumer

探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

?1. 創(chuàng)建多個(gè)線程,去消費(fèi)topic

2. 每個(gè)線程綁定固定數(shù)量的分區(qū)(最好的情況是一個(gè)消費(fèi)者綁定一個(gè)分區(qū))

探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

????????代碼比較簡(jiǎn)單,唯一需要關(guān)注的點(diǎn):創(chuàng)建的消費(fèi)者線程和分區(qū)的數(shù)量可能不相等,此時(shí),可以采用「分區(qū)副本自動(dòng)分配策略」,該策略會(huì)將消費(fèi)者線程和分區(qū)綁定在一起。

場(chǎng)景討論:

????????按照現(xiàn)在的代碼,假設(shè)消費(fèi)者線程獲取了分區(qū)A 100個(gè)msg,然后處理,若還未處理完,此時(shí),offset未提交。因?yàn)榇藭r(shí)offset還未提交,消費(fèi)者線程還會(huì)去分區(qū)A調(diào)用poll,獲取到的msg依然會(huì)是剛才的100個(gè)msg。存在消費(fèi)重復(fù)的場(chǎng)景,因此,消費(fèi)者需要做好冪等處理。

? ? ? ? 上面的代碼,不會(huì)出現(xiàn)這個(gè)場(chǎng)景:消費(fèi)者獲取了消息1,2,3,3處理完成了,執(zhí)行了提交偏移,但是消息1還未提交的場(chǎng)景。

代碼示例:

package com.bie.kafka.kafkaThrea;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 *
 *       1、KafkaConsumer是非線程安全的,KafkaProducer是線程安全的。
 *       2、該案例是創(chuàng)建多個(gè)線程,每個(gè)線程維護(hù)一個(gè)KafkaConsumer實(shí)例
 *           用戶創(chuàng)建多個(gè)線程消費(fèi)topic數(shù)據(jù),使用分區(qū)副本自動(dòng)分配策略,將消費(fèi)者線程與分區(qū)進(jìn)行綁定
 *       3、ConsumerRunnable,消費(fèi)線程類,執(zhí)行真正的消費(fèi)任務(wù)
 */
public class ConsumerRunnable implements Runnable {

    // 每個(gè)線程維護(hù)私有的kafkaConsumer實(shí)例
    private final KafkaConsumer<String, String> consumer;

    /**
     * 默認(rèn)每個(gè)消費(fèi)者的配置參數(shù)初始化
     *
     * @param brokerList
     * @param groupId
     * @param topic
     */
    public ConsumerRunnable(String brokerList, String groupId, String topic) {
        // 帶參數(shù)的構(gòu)造方法
        Properties props = new Properties();
        // kafka的列表
        props.put("bootstrap.servers", brokerList);
        // 消費(fèi)者組編號(hào)
        props.put("group.id", groupId);
        // 自動(dòng)提交
        props.put("enable.auto.commit", true);
        // 提交提交每個(gè)一秒鐘
        props.put("auto.commit.interval.ms", "1000");
        // 反序列化key
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化value
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 將配置信息進(jìn)行初始化操作
        this.consumer = new KafkaConsumer<>(props);
        // 定義響應(yīng)的主題信息topic:使用分區(qū)副本自動(dòng)分配策略
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        // 消費(fèi)者保持一直消費(fèi)的狀態(tài)
        while (true) {
            // 將獲取到消費(fèi)的信息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
            // 遍歷出每個(gè)消費(fèi)的消息
            for (ConsumerRecord<String, String> record : records) {
                // 處理消息:deal msg
            }
            // 提交offset
        }
    }
}
package com.bie.kafka.kafkaThrea;

import java.util.ArrayList;
import java.util.List;

/**
 *       1、消費(fèi)線程管理類,創(chuàng)建多個(gè)線程類執(zhí)行消費(fèi)任務(wù)
 */
public class ConsumerGroup {

    // 消費(fèi)者群組,多消費(fèi)者。
    private List<ConsumerRunnable> consumers;

    public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
        // 初始化消費(fèi)者組
        consumers = new ArrayList<>(consumerNum);
        // 初始化消費(fèi)者,創(chuàng)建多少個(gè)消費(fèi)者
        for (int i = 0; i < consumerNum; i++) {
            // 根據(jù)消費(fèi)者構(gòu)造方法,創(chuàng)建消費(fèi)者實(shí)例
            ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
            // 將創(chuàng)建的消費(fèi)者實(shí)例添加到消費(fèi)者組中
            consumers.add(consumerRunnable);
        }
    }

    public void execute() {
        // 將消費(fèi)者組里面的消費(fèi)者遍歷出來(lái)
        for (ConsumerRunnable task : consumers) {
            // 創(chuàng)建一個(gè)消費(fèi)者線程,并且啟動(dòng)該線程
            new Thread(task).start();
        }
    }
}
package com.bie.kafka.kafkaThrea;


public class ConsumerMain {

    public static void main(String[] args) {
        // kafka即broker列表
        String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
        // group組名稱
        String groupId = "group1";
        // topic主題名稱
        String topic = "topic1";
        // 消費(fèi)者的數(shù)量
        int consumerNum = 3;
        // 通過(guò)構(gòu)造器創(chuàng)建出一個(gè)對(duì)象
        ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
        // 執(zhí)行execute的方法,創(chuàng)建出ConsumerRunnable消費(fèi)者實(shí)例。多線程多消費(fèi)者實(shí)例
        consumerGroup.execute();
    }
}

上面代碼,在實(shí)際工程應(yīng)用中存在問(wèn)題,描述見(jiàn)下:

  1. 每個(gè)消費(fèi)者線程,先執(zhí)行poll拉取一批batchsize命令后
  2. 批量處理這批消息
  3. 提交offset

????????步驟2,處理batchsize的消息,可能中間的某一個(gè)失敗了,但是步驟3提交了整體的offset,會(huì)導(dǎo)致失敗的消息丟失了。

解決方案:處理每個(gè)消息的流程中,增加重試機(jī)制(本地消息表),保證該消息能執(zhí)行成功

2.2. 消費(fèi)者組?+ worker線程池

探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

?與2.1方案一的區(qū)別在于,將「消息的獲取」與「消息的處理」解耦開(kāi)

1. 消息的獲?。壕S護(hù)一個(gè)or多個(gè)kafkaConsumer實(shí)例,獲取消息,獲取到消息后,將消息丟到消息處理線程池中

2. 消息的處理:創(chuàng)建一個(gè)線程池,里面存放了worker線程,每個(gè)worker線程處理獲取到的消息

探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

ConsumerWorker類

????????worker線程:執(zhí)行msg的處理,更新offsets信息

package huxi.test.consumer.multithreaded;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
 
import java.util.List;
import java.util.Map;
 
public class ConsumerWorker<K, V> implements Runnable {
 
    private final ConsumerRecords<K, V> records;
    private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
    public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.records = record;
        this.offsets = offsets;
    }
 
    @Override
    public void run() {
        for (TopicPartition partition : records.partitions()) {
            // 獲取到分區(qū)的消息記錄
            List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
            // 遍歷獲取到的消息記錄
            for (ConsumerRecord<K, V> record : partitionRecords) {
                // 消息處理邏輯
            }
 
            /* 下面操作,是更新各個(gè)分區(qū)的offset信息到offsets變量,并沒(méi)有真正的提交位移 */
            /* 真正的更新位移操作,不是在worker線程,而是在消費(fèi)者線程 */

            // 待更新的位移
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            // 同步鎖,鎖住offsets位移
            synchronized (offsets) {
                // 如果offsets位移不包含partition這個(gè)key信息,就將位移信息設(shè)置到map集合里面
                if (!offsets.containsKey(partition)) {
                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                } else {
                    // 否則,offsets位移包含partition這個(gè)key信息,獲取到offsets的位置信息
                    long curr = offsets.get(partition).offset();
                    if (curr <= lastOffset + 1) { // 如果獲取到的位置信息小于等于上一次位移信息大小,將這個(gè)partition的位置信息設(shè)置到map集合中。并保存到broker中。
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    }
                }
            }
        }
    }
}

?ConsumerThreadHandler類

????????一個(gè)主線程,定時(shí)去poll消息,然后將消息投遞到worker線程中,最后,當(dāng)offsets信息發(fā)生變更后,提交offset

package huxi.test.consumer.multithreaded;
 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
 
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class ConsumerThreadHandler<K, V> {
 
    // KafkaConsumer實(shí)例
    private final KafkaConsumer<K, V> consumer;
    // ExecutorService實(shí)例
    private ExecutorService executors;
    // 位移信息offsets
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
    public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
        // 構(gòu)造kafkaConsumer配置
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false"); // 非自動(dòng)提交位移信息
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        // 創(chuàng)建kafkaConsumer,賦值給成員變量consumer
        consumer = new KafkaConsumer<>(props);

        // 消費(fèi)者訂閱消息,并實(shí)現(xiàn)重平衡rebalance
        // rebalance監(jiān)聽(tīng)器,創(chuàng)建一個(gè)匿名內(nèi)部類。使用rebalance監(jiān)聽(tīng)器前提是使用消費(fèi)者組(consumer group)。
        // 監(jiān)聽(tīng)器最常見(jiàn)用法就是手動(dòng)提交位移到第三方存儲(chǔ)以及在rebalance前后執(zhí)行一些必要的審計(jì)操作。
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            // 在coordinator開(kāi)啟新一輪rebalance前onPartitionsRevoked方法會(huì)被調(diào)用。
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(offsets); // 提交位移
            }

            // rebalance完成后會(huì)調(diào)用onPartitionsAssigned方法。
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                offsets.clear(); // 清除位移信息
            }
        });
    }
 
    /**
     * 消費(fèi)主方法
     * @param threadNumber  線程池中線程數(shù)
     */
    public void consume(int threadNumber) {
        // 創(chuàng)建一個(gè)worker線程池,線程數(shù)量為threadNumber個(gè)
        executors = new ThreadPoolExecutor(
                threadNumber,
                threadNumber,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 只有一個(gè)消費(fèi)者線程
        try {
            // 消費(fèi)者一直處于等待狀態(tài),等待消息消費(fèi)
            while (true) {
                // 從主題中獲取消息
                ConsumerRecords<K, V> records = consumer.poll(1000L);
                // 如果獲取到的消息不為空
                if (!records.isEmpty()) {
                    // submit: 將一批msg交給worker線程處理,消息處理完后,更新offsets信息
                    executors.submit(new ConsumerWorker<>(records, offsets));
                }
                // 調(diào)用提交位移信息
                commitOffsets();
            }
        } catch (WakeupException e) {
            // swallow this exception
        } finally {
            commitOffsets();  // 調(diào)用提交位移信息
            consumer.close(); // 關(guān)閉consumer
        }
    }
 
    private void commitOffsets() {
        Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
        // 保證線程安全、同步鎖,鎖住offsets
        synchronized (offsets) {
            // 判斷如果offsets位移信息為空,直接返回,節(jié)省同步鎖對(duì)offsets的鎖定的時(shí)間
            if (offsets.isEmpty()) {
                return;
            }
            // 如果offsets位移信息不為空,將位移信息offsets放到集合中,方便同步
            unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
            // 清除位移信息offsets
            offsets.clear();
        }
        // 將封裝好的位移信息unmodfiedMap集合進(jìn)行同步提交
        // 手動(dòng)提交位移信息
        consumer.commitSync(unmodfiedMap);
    }
 
    public void close() {
        consumer.wakeup();
        // 關(guān)閉ExecutorService實(shí)例
        executors.shutdown();
    }
}

Main類

????????包裝了所有的工具類,啟動(dòng)整個(gè)程序

package huxi.test.consumer.multithreaded;
 
public class Main {
 
    public static void main(String[] args) {
        // broker列表、topic、group id
        String brokerList = "localhost:9092";
        String topic = "test-topic";
        String groupID = "test-group";
        
        // 1. 根據(jù)ConsumerThreadHandler構(gòu)造方法,創(chuàng)建出消費(fèi)者h(yuǎn)andler
        final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
        final int cpuCount = Runtime.getRuntime().availableProcessors();
 
        // 創(chuàng)建線程的匿名內(nèi)部類
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 執(zhí)行consume,在此線程中執(zhí)行消費(fèi)者消費(fèi)消息。
                handler.consume(cpuCount); 
            }
        };
        new Thread(runnable).start();  // 3. 該函數(shù)會(huì)調(diào)用上面的run()方法
 
        try {
            // 20秒后自動(dòng)停止該測(cè)試程序
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // swallow this exception
        }
        System.out.println("Starting to close the consumer...");
        handler.close();
    }
}  

2.3.方案優(yōu)缺點(diǎn)對(duì)比

優(yōu)點(diǎn) 缺點(diǎn)
方案1

實(shí)現(xiàn)簡(jiǎn)單

速度較快,因?yàn)闊o(wú)線程交互開(kāi)銷

方便位移管理

易于維護(hù)分區(qū)之間的消息消費(fèi)順序

socket連接開(kāi)銷大

consumer受限于topic的分區(qū)數(shù),擴(kuò)展性差

broker端處理負(fù)載高(因?yàn)榘l(fā)往broker的請(qǐng)求較多)

reblance可能性增大

方案2

消息獲取和處理解耦

可獨(dú)立擴(kuò)展consumer和worker數(shù),伸縮性好

實(shí)現(xiàn)負(fù)載

難于維護(hù)分區(qū)內(nèi)的消息順序

處理鏈路變長(zhǎng),導(dǎo)致位移管理困難

worker線程異??赡軐?dǎo)致消費(fèi)數(shù)據(jù)丟失文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-426212.html

到了這里,關(guān)于探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 筆記:配置多個(gè)kafka生產(chǎn)者和消費(fèi)者

    如果只有一個(gè)kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對(duì)應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個(gè)kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費(fèi)者。 適用場(chǎng)景:需要消費(fèi)來(lái)源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka生產(chǎn)者和消費(fèi)者(python版)

    生產(chǎn)者 消費(fèi)者 消費(fèi)者中的組名主要用戶針對(duì)主題的偏移量進(jìn)行更改,也涉及到主題中分區(qū)的問(wèn)題, kafka工具類 此工具類基本上拿過(guò)去就可以用 疑問(wèn) 當(dāng)消費(fèi)者鏈接kafka時(shí)發(fā)現(xiàn)topic沒(méi)有未讀的消息怎樣退出呢,默認(rèn)是在一直等待,但是我期望沒(méi)有要讀的消息的時(shí)候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Java輕松使用Kafka生產(chǎn)者,消費(fèi)者

    Java輕松使用Kafka生產(chǎn)者,消費(fèi)者 一、環(huán)境說(shuō)明 項(xiàng)目中需要下面的依賴: ( 版本自定義 ) 2. yml配置文件設(shè)置 1. 簡(jiǎn)單生產(chǎn)者的書(shū)寫(xiě): 1. 簡(jiǎn)單消費(fèi)者的書(shū)寫(xiě): ? 注:多消費(fèi)者時(shí),需要對(duì)應(yīng)kafka中配置的分區(qū);多少的Partition就有多少個(gè)消費(fèi)者,以免資源浪費(fèi)

    2024年02月15日
    瀏覽(28)
  • Kafka官方生產(chǎn)者和消費(fèi)者腳本簡(jiǎn)單使用

    怎樣使用Kafka官方生產(chǎn)者和消費(fèi)者腳本進(jìn)行消費(fèi)生產(chǎn)和消費(fèi)?這里假設(shè)已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見(jiàn)到測(cè)試kafka對(duì)應(yīng)topic了.

    2024年02月04日
    瀏覽(23)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號(hào)從0開(kāi)始計(jì)算 Partition:分區(qū)數(shù),該主題有3個(gè)分區(qū) Replica:副本數(shù),該主題有3個(gè)副本 Leader:副本數(shù)中的主的序號(hào),生產(chǎn)消費(fèi)的對(duì)象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會(huì)報(bào)錯(cuò) 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    執(zhí)行topic刪除命令時(shí),出現(xiàn)提示 這條命令其實(shí)并不執(zhí)行刪除動(dòng)作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時(shí)也提醒用戶一定要提前打開(kāi)delete.topic.enable開(kāi)關(guān),否則刪除動(dòng)作是不會(huì)執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(28)
  • Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)

    Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)

    Apache Kafka 作為分布式流處理平臺(tái),其架構(gòu)中的生產(chǎn)者和消費(fèi)者是核心組件,負(fù)責(zé)實(shí)現(xiàn)高效的消息生產(chǎn)和消費(fèi)。本文將深入剖析 Kafka 架構(gòu)中生產(chǎn)者和消費(fèi)者的工作原理、核心概念以及高級(jí)功能。 1 發(fā)送消息到 Kafka Kafka 生產(chǎn)者負(fù)責(zé)將消息發(fā)布到指定的主題。以下是一個(gè)簡(jiǎn)單的生

    2024年02月03日
    瀏覽(50)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽(tīng)消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽(tīng)消息)

    簡(jiǎn)單來(lái)說(shuō),就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來(lái)看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(46)
  • kafka配置大全broker、topic、生產(chǎn)者和消費(fèi)者等配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包