国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

這篇具有很好參考價值的文章主要介紹了分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1. Kafka 生產(chǎn)者

不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應用程序。

Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多個主題(topic)。生產(chǎn)者可以將消息發(fā)送到指定的主題,也可以根據(jù)分區(qū)策略將消息發(fā)送到多個分區(qū)中。生產(chǎn)者可以以異步或同步方式發(fā)送消息,并且可以配置消息的可靠性和持久性等屬性。在 Kafka 中,生產(chǎn)者是消息的源頭,它們將消息發(fā)送到 Kafka 集群中,供消費者消費。

2. kafaka 命令行操作

① 啟動 Zookeeper 集群:

[root@master01 bin]# pwd
/root/ch/soft/zk/zk-01/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-02/bin
[root@master01 bin]# ./zkServer.sh start
[root@master01 bin]# pwd
/root/ch/soft/zk/zk-03/bin
[root@master01 bin]# ./zkServer.sh start

② 啟動 kafka 集群:

[root@master01 kafka01]# pwd
/root/ch/soft/kafka/kafka01
[root@master01 kafka01]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka02]# pwd
/root/ch/soft/kafka/kafka02
[root@master01 kafka02]# bin/kafka-server-start.sh config/server.properties
[root@master01 kafka03]# pwd
/root/ch/soft/kafka/kafka03
[root@master01 kafka03]# bin/kafka-server-start.sh config/server.properties

③ 創(chuàng)建主題 test:

[root@master01 kafka01]#  bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
Topic: test     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: test     Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
Topic: test     Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0

④ 生產(chǎn)者發(fā)送消息到主題test:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic test
>hello
>你好,kafka!
>

⑤ 消費者消費主題test的消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
hello
你好,kafka!

3. kafka 生產(chǎn)者發(fā)送消息流程

分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式,【分布式-消息隊列Kafka】,分布式,kafka,linq

先從創(chuàng)建一個ProducerRecord對象開始,其中需要包含目標主題和要發(fā)送的內(nèi)容。另外,還可以指定鍵、分區(qū)、時間戳或標頭。在發(fā)送ProducerRecord對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。

接下來,如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基于ProducerRecord對象的鍵選擇一個分區(qū)。選好分區(qū)以后,生產(chǎn)者就知道該往哪個主題和分區(qū)發(fā)送這條消息了。緊接著,該消息會被添加到一個消息批次里,這個批次里的所有消息都將被發(fā)送給同一個主題和分區(qū)。有一個獨立的線程負責把這些消息批次發(fā)送給目標broker。

broker在收到這些消息時會返回一個響應。如果消息寫入成功,就返回一個RecordMetaData對象,其中包含了主題和分區(qū)信息,以及消息在分區(qū)中的偏移量。如果消息寫入失敗,則會返回一個錯誤。生產(chǎn)者在收到錯誤之后會嘗試重新發(fā)送消息,重試幾次之后如果還是失敗,則會放棄重試,并返回錯誤信息。

4. Kafka 生產(chǎn)者的創(chuàng)建

要向Kafka寫入消息,首先需要創(chuàng)建一個生產(chǎn)者對象,并設(shè)置一些屬性。Kafka生產(chǎn)者有3個必須設(shè)置的屬性。

① bootstrap.servers

broker的地址。可以由多個host:port組成,生產(chǎn)者用它們來建立初始的Kafka集群連接。它不需要包含所有的broker地址,因為生產(chǎn)者在建立初始連接之后可以從給定的broker那里找到其他broker的信息。不過還是建議至少提供兩個broker地址,因為一旦其中一個停機,則生產(chǎn)者仍然可以連接到集群。

② key.serializer

一個類名,用來序列化消息的鍵。broker 希望接收到的消息的鍵和值都是字節(jié)數(shù)組。生產(chǎn)者可以把任意Java對象作為鍵和值發(fā)送給broker,但它需要知道如何把這些Java對象轉(zhuǎn)換成字節(jié)數(shù)組。key.serializer 必須被設(shè)置為一個實現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口的類,生產(chǎn)者會用這個類把鍵序列化成字節(jié)數(shù)組。Kafka客戶端默認提供了ByteArraySerializer、StringSerializer和IntegerSerializer等,如果你只使用常見的幾種Java對象類型,就沒有必要實現(xiàn)自己的序列化器。

需要注意的是,必須設(shè)置key.serializer這個屬性,盡管你可能只需要將值發(fā)送給Kafka。如果只需要發(fā)送值,則可以將Void作為鍵的類型,然后將這個屬性設(shè)置為VoidSerializer。

③ value.serializer

一個類名,用來序列化消息的值。與設(shè)置key.serializer屬性一樣,需要將value.serializer設(shè)置成可以序列化消息值對象的類。

public class CustomProducer01 {
    public static void main(String[] args) {
        // kafka生產(chǎn)者屬性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 創(chuàng)建kafka生產(chǎn)者 
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    }
}  

5. Kafka 生產(chǎn)者發(fā)送消息

實例化好生產(chǎn)者對象后,接下來就可以開始發(fā)送消息了。KafkaProducer 的 send() 方法用于向 Kafka 集群發(fā)送消息。該方法的語法如下:

public interface Producer<K, V> extends Closeable {
    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

其中,ProducerRecord<K, V> 表示要發(fā)送的消息記錄,K 和 V 分別表示鍵和值的類型。send() 方法返回一個 Future 對象,表示異步發(fā)送消息的結(jié)果。

發(fā)送消息主要有以下3種方式:

① 發(fā)送并忘記

把消息發(fā)送給服務(wù)器,但并不關(guān)心它是否成功送達。大多數(shù)情況下,消息可以成功送達,因為Kafka是高可用的,而且生產(chǎn)者有自動嘗試重發(fā)的機制。但是,如果發(fā)生了不可重試的錯誤或超時,那么消息將會丟失,應用程序?qū)⒉粫盏饺魏涡畔⒒虍惓!?/p>

② 同步發(fā)送

一般來說,生產(chǎn)者是異步的——我們調(diào)用send()方法發(fā)送消息,它會返回一個Future對象??梢哉{(diào)用get()方法等待Future完成,這樣就可以在發(fā)送下一條消息之前知道當前消息是否發(fā)送成功。

③ 異步發(fā)送

調(diào)用send()方法,并指定一個回調(diào)函數(shù),當服務(wù)器返回響應時,這個函數(shù)會被觸發(fā)。

1. 發(fā)送即忘記

發(fā)送即忘記,生產(chǎn)者發(fā)送消息后不會等待服務(wù)器的響應,直接發(fā)送下一條消息。它只管往Kafka中發(fā)送消息而并不關(guān)心消息是否正確到達。在大多數(shù)情況下,這種發(fā)送方式?jīng)]有什么問題,不過在某些時候(比如發(fā)生不可重試異常時)會造成消息的丟失。這種發(fā)送方式的性能最高,可靠性也最差。

public class CustomProducer01 {
    private static final String brokerList "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生產(chǎn)者屬性配置
        Properties properties = initConfig();
        // kafka生產(chǎn)者發(fā)送消息,默認是異步發(fā)送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka!");
        try{
            // 發(fā)送消息
            kafkaProducer.send(producerRecord);
        }catch (Exception e){
            e.printStackTrace();
        }
        // 關(guān)閉資源
        kafkaProducer.close();
    }
}

cmd命令行窗口開啟 kafka 消息者,觀察消費者是否接收到消息:

[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!

2. 同步發(fā)送

同步發(fā)送消息很簡單,當Kafka返回錯誤或重試次數(shù)達到上限時,生產(chǎn)者可以捕獲到異常。這里需要考慮性能問題。根據(jù)Kafka集群繁忙程度的不同,broker可能需要2毫秒或更長的時間來響應請求。如果采用同步發(fā)送方式,那么發(fā)送線程在這段時間內(nèi)就只能等待,什么也不做,甚至都不發(fā)送其他消息,這將導致糟糕的性能。因此,同步發(fā)送方式通常不會被用在生產(chǎn)環(huán)境中(但會經(jīng)常被用在示例代碼中)。

send() 方法本身就是異步的,send() 方法返回的Future對象可以使調(diào)用方稍后獲得發(fā)送的結(jié)果。在執(zhí)行send() 方法之后可以調(diào)用 get() 方法來阻塞等待Kafka的響應,直到消息發(fā)送成功,或者發(fā)生異常。如果發(fā)生異常,那么就需要捕獲異常并交由外層邏輯處理。

Future 接口源碼:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口是Java中用于表示異步計算結(jié)果的接口。它定義了一些方法,用于查詢異步計算是否完成、獲取計算結(jié)果等操作。

  • cancel方法用于取消異步計算;
  • isCancelled方法用于判斷異步計算是否已經(jīng)被取消;
  • isDone方法用于判斷異步計算是否已經(jīng)完成。
  • get方法用于獲取異步計算的結(jié)果,如果計算還沒有完成,則該方法會阻塞直到計算完成。如果計算被取消,則該方法會拋出CancellationException異常。如果計算拋出異常,則該方法會拋出ExecutionException異常。
  • get(long timeout, TimeUnit unit)方法與get方法類似,但是它會在指定的時間內(nèi)等待計算完成,如果超時則會拋出TimeoutException異常。

Future 表示一個任務(wù)的生命周期,并提供了相應的方法來判斷任務(wù)是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。既然KafkaProducer.send() 方法的返回值是一個Future類型的對象,那么完全可以用Java語言層面的技巧來豐富應用的實現(xiàn),比如使用Future中的 get(long timeout,TimeUnit unit)方法實現(xiàn)可超時的阻塞。

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生產(chǎn)者屬性配置
        Properties properties = initConfig();
        // kafka生產(chǎn)者發(fā)送消息,默認是異步發(fā)送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,同步發(fā)送!");
        try{
            // 發(fā)送消息
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // 獲取異步計算的結(jié)果,如果計算還沒有完成,則該方法會阻塞直到計算完成
            RecordMetadata recordMetadata = future.get();
            System.out.println("metadata.topic() = " + recordMetadata.topic());
        }catch (Exception e){
            e.printStackTrace();
        }
        // 關(guān)閉資源
        kafkaProducer.close();
    }
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步發(fā)送!

調(diào)用Future.get()方法等待Kafka響應。如果消息沒有發(fā)送成功,那么這個方法將拋出一個異常。如果沒有發(fā)生錯誤,那么我們將得到一個RecordMetadata對象,并能從中獲取消息的偏移量和其他元數(shù)據(jù)。

KafkaProducer一般會出現(xiàn)兩種錯誤。一種是可重試錯誤,這種錯誤可以通過重發(fā)消息來解決。例如,對于連接錯誤,只要再次建立連接就可以解決。對于“not leader for partition”(非分區(qū)首領(lǐng))錯誤,只要重新為分區(qū)選舉首領(lǐng)就可以解決,此時元數(shù)據(jù)也會被刷新??梢酝ㄟ^配置啟用KafkaProducer的自動重試機制。如果在多次重試后仍無法解決問題,則應用程序會收到重試異常。另一種錯誤則無法通過重試解決,比如“Message size too large”(消息太大)。對于這種錯誤,KafkaProducer不會進行任何重試,而會立即拋出異常。

3. 異步發(fā)送

假設(shè)一條消息在應用程序和Kafka集群之間往返需要10毫秒。如果在發(fā)送完每條消息后都需要等待響應,那么發(fā)送100條消息將需要1秒。如果只發(fā)送消息但不需要等待響應,那么發(fā)送100條消息所需要的時間就會少很多。大多數(shù)時候,并不需要等待響應——盡管Kafka會把消息的目標主題、分區(qū)信息和偏移量返回給客戶端,但對客戶端應用程序來說可能不是必需的。不過,當消息發(fā)送失敗,需要拋出異常、記錄錯誤日志或者把消息寫入“錯誤消息”文件以便日后分析診斷時,就需要用到這些信息了。為了能夠在異步發(fā)送消息時處理異常情況,生產(chǎn)者提供了回調(diào)機制。

生產(chǎn)者發(fā)送消息后不會等待服務(wù)器的響應,而是通過回調(diào)函數(shù)來處理服務(wù)器的響應?;卣{(diào)函數(shù)會在 producer 收到 ack 時調(diào)用,該方法有兩個參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說明消息發(fā)送成功,如果 Exception 不為 null,說明消息發(fā)送失敗。

注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        // kafka生產(chǎn)者屬性配置
        Properties properties = initConfig();
        // kafka生產(chǎn)者發(fā)送消息,默認是異步發(fā)送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "你好,kafka,異步發(fā)送帶返回值!");
        try{
            // 發(fā)送消息
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 說明消息發(fā)送成功
                    if(e==null){
                        System.out.println("metadata.topic() = " + recordMetadata.topic());
                        System.out.println("metadata.partition() = " + recordMetadata.partition());
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
        // 關(guān)閉資源
        kafkaProducer.close();
    }
}
[root@master01 kafka01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic test --from-beginning
你好,kafka!
你好,kafka,同步發(fā)送!
你好,kafka,異步發(fā)送帶回調(diào)函數(shù)!

Kafka生產(chǎn)者異步發(fā)送消息時,可以通過指定回調(diào)函數(shù)來處理發(fā)送結(jié)果。當消息發(fā)送完成后,回調(diào)函數(shù)會被調(diào)用,以通知應用程序消息發(fā)送的結(jié)果。具體來說,當生產(chǎn)者成功發(fā)送消息時,回調(diào)函數(shù)會被傳遞一個RecordMetadata對象,該對象包含了發(fā)送消息的相關(guān)信息,如消息所在的分區(qū)、消息在分區(qū)中的偏移量等。如果發(fā)送消息失敗,則回調(diào)函數(shù)會被傳遞一個非空的Exception對象,以指示發(fā)送失敗的原因。

注意:回調(diào)的執(zhí)行將在生產(chǎn)者主線程中進行,如果有兩條消息被發(fā)送給同一個分區(qū),則這可以保證它們的回調(diào)是按照發(fā)送的順序執(zhí)行的。這就要求回調(diào)的執(zhí)行要快,避免生產(chǎn)者出現(xiàn)延遲或影響其他消息的發(fā)送。不建議在回調(diào)中執(zhí)行阻塞操作,阻塞操作應該被放在其他線程中執(zhí)行。

6. Kafka 消息對象 ProducerRecord

① ProducerRecord 成員變量:

public class ProducerRecord<K, V> {
    // 消息要發(fā)送到的主題
    private final String topic;
    // 消息要發(fā)送到的分區(qū)號,如果為null,則由Kafka自動選擇分區(qū)
    private final Integer partition;
    // 消息的鍵
    private final K key;
    // 消息的值
    private final V value;
    // 消息的時間戳,如果為null,則使用當前時間戳
    private final Long timestamp;
    // 消息的頭部信息
    private final Headers headers;
    
    // .....
}
  • topic和partition字段分別代表消息要發(fā)往的主題和分區(qū)號。
  • key是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計算分區(qū)號進而可以讓消息發(fā)往特定的分區(qū)。前面提及消息以主題為單位進行歸類,而這個key可以讓消息再進行二次歸類,同一個key的消息會被劃分到同一個分區(qū)中。
  • value是指消息體,一般不為空,如果為空則表示特定的消息。
  • timestamp是指消息的時間戳,它有CreateTime和LogAppendTime兩種類型,前者表示消息創(chuàng)建的時間,后者表示消息追加到日志文件的時間。

② ProducerRecord 構(gòu)造函數(shù):文章來源地址http://www.zghlxwxcb.cn/news/detail-637296.html

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }

    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
}

到了這里,關(guān)于分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 分布式消息隊列Kafka(四)- 消費者

    分布式消息隊列Kafka(四)- 消費者

    1.Kafka消費方式 2.Kafka消費者工作流程 (1)總體工作流程 (2)消費者組工作流程 3.消費者API (1)單個消費者消費 實現(xiàn)代碼 (2)單個消費者指定分區(qū)消費 代碼實現(xiàn): (3)消費者組消費 復制上面CustomConsumer三個,同時去訂閱統(tǒng)一個主題,消費數(shù)據(jù),發(fā)現(xiàn)一個分區(qū)只能被一個

    2023年04月26日
    瀏覽(33)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    01. 創(chuàng)建消費者 在讀取消息之前,需要先創(chuàng)建一個KafkaConsumer對象。創(chuàng)建KafkaConsumer對象與創(chuàng)建KafkaProducer對象非常相似——把想要傳給消費者的屬性放在Properties對象里。 為簡單起見,這里只提供4個必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • 分布式應用之zookeeper集群+消息隊列Kafka

    分布式應用之zookeeper集群+消息隊列Kafka

    ? ? ? ?ZooKeeper是一個分布式的,開放源碼的分布式應用程序協(xié)調(diào)服務(wù),是Google的Chubby一個開源的實現(xiàn),是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務(wù)的軟件,提供的功能包括:配置維護、域名服務(wù)、分布式同步、組服務(wù)等。為分布式框架提供協(xié)調(diào)服務(wù)的

    2024年02月06日
    瀏覽(139)
  • zookeeper+kafka分布式消息隊列集群的部署

    zookeeper+kafka分布式消息隊列集群的部署

    目錄 一、zookeeper 1.Zookeeper 定義 2.Zookeeper 工作機制 3.Zookeeper 特點 4.Zookeeper 數(shù)據(jù)結(jié)構(gòu) 5.Zookeeper 應用場景 (1)統(tǒng)一命名服務(wù) (2)統(tǒng)一配置管理 (3)統(tǒng)一集群管理 (4)服務(wù)器動態(tài)上下線 6.Zookeeper 選舉機制 (1)第一次啟動選舉機制 (2)非第一次啟動選舉機制 7.部署zookeepe

    2024年02月14日
    瀏覽(25)
  • 分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

    分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

    01. Kafka 分區(qū)位移 對于Kafka中的分區(qū)而言,它的每條消息都有唯一的offset,用來表示消息在分區(qū)中對應的位置。偏移量從0開始,每個新消息的偏移量比前一個消息的偏移量大1。 每條消息在分區(qū)中的位置信息由一個叫位移(Offset)的數(shù)據(jù)來表征。分區(qū)位移總是從 0 開始,假設(shè)一

    2024年02月12日
    瀏覽(27)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準備 創(chuàng)建主題 test 有5個分區(qū),準備 3 個消費者并進行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 【簡單認識zookeeper+kafka分布式消息隊列集群的部署】

    【簡單認識zookeeper+kafka分布式消息隊列集群的部署】

    Zookeeper是一個開源的分布式的,為分布式框架提供協(xié)調(diào)服務(wù)的Apache項目。 Zookeeper從設(shè)計模式角度來理解:是一個基于觀察者模式設(shè)計的分布式服務(wù)管理框架,它負責存儲和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper就將負責通知已

    2024年02月13日
    瀏覽(24)
  • 分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    1. Kafka 消費者是什么? 消費者負責訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費理念中還有一層消費組的概念,每個消費者都有一個對應的消費組。當消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    最簡單的提交方式是讓消費者自動提交偏移量,自動提交 offset 的相關(guān)參數(shù): enable.auto.commit:是否開啟自動提交 offset 功能,默認為 true; auto.commit.interval.ms:自動提交 offset 的時間間隔,默認為5秒; 如果 enable.auto.commit 被設(shè)置為true,那么每過5秒,消費者就會自動提交 poll() 返

    2024年02月12日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包