国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)

這篇具有很好參考價(jià)值的文章主要介紹了java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1.生產(chǎn)者推送數(shù)據(jù)

常用參數(shù)

bootstrap.servers:Kafka集群中的Broker列表,格式為host1:port1,host2:port2,…。生產(chǎn)者會從這些Broker中選擇一個(gè)可用的Broker作為消息發(fā)送的目標(biāo)Broker。

acks:Broker對消息的確認(rèn)模式??蛇x值為0、1、all。0表示生產(chǎn)者不會等待Broker的任何確認(rèn)消息;1表示生產(chǎn)者會等待Broker的Leader副本確認(rèn)消息;all表示生產(chǎn)者會等待所有副本都確認(rèn)消息。確認(rèn)模式越高,可靠性越高,但延遲也越大。

retries:消息發(fā)送失敗時(shí)的重試次數(shù)。默認(rèn)值為0,表示不進(jìn)行重試??梢詫⑵湓O(shè)置為大于0的值,例如3,表示最多重試3次。

batch.size:消息批量發(fā)送的大小。當(dāng)生產(chǎn)者累積到一定數(shù)量的消息時(shí),會將其打包成一個(gè)批次一次性發(fā)送給Broker。默認(rèn)值為16384字節(jié),即16KB。

linger.ms:消息發(fā)送的延遲時(shí)間。生產(chǎn)者會等待一定的時(shí)間,以便將更多的消息打包成一個(gè)批次一次性發(fā)送給Broker。默認(rèn)值為0,表示立即發(fā)送。設(shè)置較大的值可以提高吞吐量,但可能會增加消息的延遲。

buffer.memory:生產(chǎn)者可用于緩存消息的內(nèi)存大小。默認(rèn)值為33554432字節(jié),即32MB。如果生產(chǎn)者生產(chǎn)消息的速度快于發(fā)送消息的速度,可能會導(dǎo)致緩存溢出。可以調(diào)整該參數(shù)來適應(yīng)生產(chǎn)者的生產(chǎn)速度。

key.serializer:Key的序列化器。Kafka消息可以包含Key和Value,Key和Value都需要進(jìn)行序列化。該參數(shù)指定Key的序列化器。

value.serializer:Value的序列化器。該參數(shù)指定Value的序列化器。

max.block.ms:生產(chǎn)者在發(fā)送消息之前等待Broker元數(shù)據(jù)信息的最長時(shí)間。如果在該時(shí)間內(nèi)無法獲取到Broker元數(shù)據(jù)信息,則會拋出TimeoutException異常。默認(rèn)值為60000毫秒,即60秒。

compression.type:消息壓縮類型??蛇x值為none、gzip、snappy、lz4。默認(rèn)值為none,表示不進(jìn)行壓縮。壓縮可以減少消息的傳輸大小,提高網(wǎng)絡(luò)帶寬的利用率,但會增加CPU的消耗。

interceptor.classes:消息攔截器列表??梢灾付ǘ鄠€(gè)消息攔截器對消息進(jìn)行加工處理。例如,可以在消息中添加時(shí)間戳、添加消息來源等信息。
以上參數(shù)只是一部分,Kafka生產(chǎn)者還有更多參數(shù)可以進(jìn)行配置。需要根據(jù)實(shí)際情況選擇合適的參數(shù)進(jìn)行配置。

例子

下面是一個(gè)單例模式配置 kafka生產(chǎn)者的例子(避免多次創(chuàng)建實(shí)例,減少資源的消耗)

public class SingletonKafkaProducerExample {
    private static SingletonKafkaProducerExample instance;
    private static Producer<String, String> producer;
    private SingletonKafkaProducerExample() {
    //參數(shù)設(shè)置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:端口");
        props.put("acks", "all");
        props.put("max.block.ms",120000);//默認(rèn)60s
        props.put("retries", 3)//默認(rèn)0;
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("request.timeout.ms",60*1000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
//sasl認(rèn)證 (根據(jù)實(shí)際情況看是否配置)
		props.put("security.protocol", "SASL_PLAINTEXT");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';");
		producer = new KafkaProducer<>(props);
		logger.info("kafka連接成功");
    }
    public static SingletonKafkaProducerExample getInstance() {
        if (instance == null) {
            synchronized (SingletonKafkaProducerExample.class) {
                if (instance == null) {
                    instance = new SingletonKafkaProducerExample();
                }
            }
        }
        return instance;
    }
    public void sendMessage(String topic, String key, String value) {
        try {
        //這里也可以不用設(shè)置key和partition,例如不設(shè)置分區(qū) 系統(tǒng)會使用輪詢算法自動匹配partition
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            
            Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("發(fā)送消息到" + metadata.topic() + "失?。? + exception.getMessage());
                } else {
                    System.out.println("發(fā)送消息到" + metadata.topic() + "成功:partition=" + metadata.partition() + ", offset=" + metadata.offset());
                }
            });
            future.get(); // 等待返回?cái)?shù)據(jù)
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("發(fā)送消息失?。? + e.getMessage());
        } 
    }
    public void closeProducer() {
        producer.close();
    }
}


以上參數(shù)配置只是案例,實(shí)際參數(shù)配置需要根據(jù)業(yè)務(wù)情況自己設(shè)置
下面是生產(chǎn)的方法介紹:

close(): 關(guān)閉生產(chǎn)者,釋放相關(guān)資源。
close(Duration timeout): 在指定的超時(shí)時(shí)間內(nèi)關(guān)閉生產(chǎn)者,釋放相關(guān)資源。
initTransactions(): 初始化事務(wù),啟用事務(wù)支持。
beginTransaction(): 開始事務(wù)。
send(ProducerRecord<K, V> record): 發(fā)送一條消息記錄到指定的主題。
send(ProducerRecord<K, V> record, Callback callback): 發(fā)送一條消息記錄,并附帶一個(gè)回調(diào)函數(shù)用于異步處理發(fā)送結(jié)果。
send(ProducerRecord<K, V> record, ProducerCallback<T> callback): 發(fā)送一條消息記錄,并使用自定義的回調(diào)函數(shù)處理發(fā)送結(jié)果。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId): 將消費(fèi)者組的偏移量提交給事務(wù)。
partitionsFor(String topic): 獲取指定主題的分區(qū)信息。
metrics(): 獲取生產(chǎn)者的度量指標(biāo)信息。
flush(): 將所有已掛起的消息立即發(fā)送到Kafka服務(wù)器,等待服務(wù)器確認(rèn)后再返回。
commitTransaction(): 提交當(dāng)前事務(wù)。
abortTransaction(): 中止當(dāng)前事務(wù)。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata): 將消費(fèi)者組的偏移量和消費(fèi)者組元數(shù)據(jù)提交給事務(wù)。

可能遇見的問題

1.多個(gè)topic發(fā)送消息的時(shí)候總有1.2發(fā)送失敗 報(bào)Failed to update metadata after 60000ms
這種情況出現(xiàn)的原因可能是Kafka集群中Broker的元數(shù)據(jù)信息還沒有被更新到Kafka客戶端中,導(dǎo)致Kafka客戶端無法連接到指定的Broker。

解決

增加等待時(shí)間:可以通過設(shè)置max.block.ms屬性來增加等待時(shí)間
提高重試次數(shù):可以通過設(shè)置retries屬性來提高重試次數(shù)
檢查Broker配置
檢查網(wǎng)絡(luò)連接
檢查Kafka版本

如果下面3個(gè)都沒問題,就增加等待時(shí)間和重試次數(shù)。本人遇到這樣的問題解決了

消費(fèi)者 推送數(shù)據(jù)

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置消費(fèi)者參數(shù)
		Properties props = new Properties();
		/*
		bootstrap.servers
		Kafka集群中Broker的地址列表,格式為"hostname:port",例如:"localhost:9092"??梢耘渲枚鄠€(gè)Broker,用逗號分隔。
		*/
		props.put("bootstrap.servers", "ip:port");
		/*
		group.id
		消費(fèi)者組的名稱,同一個(gè)消費(fèi)者組中的消費(fèi)者會共享消費(fèi)消息的責(zé)任。例如:"test"。
		*/
		props.put("group.id", "test");
		/*
		enable.auto.commit
		是否自動提交偏移量,默認(rèn)為true。如果為false,則需要手動提交偏移量。
		*/
		props.put("enable.auto.commit", "true");
		/*
		session.timeout.ms
		消費(fèi)者會話超時(shí)時(shí)間(毫秒),如果消費(fèi)者在該時(shí)間內(nèi)沒有向Kafka Broker發(fā)送心跳,則會被認(rèn)為已經(jīng)失效。默認(rèn)10000毫秒。
		*/
		props.put("session.timeout.ms", "30000");
		/*
		auto.offset.reset
		如果消費(fèi)者在初始化時(shí)沒有指定偏移量或指定的偏移量不存在,則從哪個(gè)位置開始消費(fèi),默認(rèn)latest,即從最新的消息開始消費(fèi)。其他可選值為earliest和none。
		*/
		props.put("auto.offset.reset", "earliest");
		/*
		
		key.deserializer
		key的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
		*/
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		/*
		value.deserializer
		value的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
		*/
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		/*
		max.poll.records
		每次拉取消息的最大記錄數(shù),默認(rèn)500條。
		*/
		props.put("max.poll.records", "10000");
		/*
		fetch.min.bytes
		每次拉取的最小字節(jié)數(shù),默認(rèn)1字節(jié)。
		
		fetch.max.bytes
		每次拉取的最大字節(jié)數(shù),默認(rèn)52428800字節(jié),即50MB。
		
		fetch.max.wait.ms
		最長等待時(shí)間(毫秒),如果在該時(shí)間內(nèi)沒有拉取到任何消息,則返回空結(jié)果。默認(rèn)500毫秒。
		*/
		props.put("fetch.min.bytes", "1024");
		props.put("fetch.max.bytes", "1048576");
		props.put("fetch.max.wait.ms", "500");
		
		/*
		max.partition.fetch.bytes
		每個(gè)分區(qū)最大拉取字節(jié)數(shù),默認(rèn)1048576字節(jié),即1MB。
		*/
		props.put("max.partition.fetch.bytes", "1024");
		/*
		connections.max.idle.ms
		最大空閑連接時(shí)間(毫秒),超過該時(shí)間則連接被認(rèn)為已經(jīng)過期并關(guān)閉。默認(rèn)540000毫秒,即9分鐘。
		*/
		props.put("connections.max.idle.ms", "540000");
		
		/*
		request.timeout.ms
		請求超時(shí)時(shí)間(毫秒),如果在該時(shí)間內(nèi)沒有收到Broker的響應(yīng),則認(rèn)為請求失敗。默認(rèn)30000毫秒。
		*/
		props.put("request.timeout.ms", "40000");
		/*
		retry.backoff.ms
		重試等待時(shí)間(毫秒),如果請求失敗,則等待一段時(shí)間后再次重試。默認(rèn)500毫秒。
		*/
		props.put("retry.backoff.ms", "500");
		
		/*
		security.protocol
		安全協(xié)議類型,例如SSL或SASL_SSL。
		
		ssl.keystore.location
		SSL證書的路徑和名稱。
		
		ssl.keystore.password
		SSL證書的密碼。
		
		ssl.truststore.location
		SSL信任證書庫的路徑和名稱。
		
		ssl.truststore.password
		SSL信任證書庫的密碼。
		*/
		props.put("security.protocol", "SSL");
		props.put("ssl.keystore.location", "/path/to/keystore");
		props.put("ssl.keystore.password", "password");
		props.put("ssl.truststore.location", "/path/to/truststore");
		props.put("ssl.truststore.password", "password");
        // 創(chuàng)建Kafka消費(fèi)者實(shí)例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 訂閱主題
        consumer.subscribe(Arrays.asList("my-topic"));
        // 創(chuàng)建線程池
        ExecutorService executor = Executors.newFixedThreadPool(6);
        // 消費(fèi)消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // 獲取消息所在分區(qū)的編號
                int partition = record.partition();
                // 將消息提交給對應(yīng)的線程進(jìn)行處理
                executor.submit(new MessageHandler(record.value(), partition));
            }
        }
    }
    // 消息處理器
    static class MessageHandler implements Runnable {
        private final String message;
        private final int partition;
        public MessageHandler(String message, int partition) {
            this.message = message;
            this.partition = partition;
        }
        @Override
        public void run() {
            // 對消息進(jìn)行處理
            System.out.printf("Partition %d: Message received: %s%n", partition, message);
        }
    }
}

以上參數(shù)根據(jù)自己需求填寫
可以根據(jù)分區(qū) 使用多線程執(zhí)行

下面是消費(fèi)者的方法講解文章來源地址http://www.zghlxwxcb.cn/news/detail-562168.html

subscribe(Collection<String> topics): 訂閱一個(gè)或多個(gè)主題,開始消費(fèi)這些主題中的消息。
unsubscribe(): 取消訂閱當(dāng)前已經(jīng)訂閱的所有主題,停止消費(fèi)消息。
poll(Duration timeout):Kafka服務(wù)器拉取一批消息記錄,該方法會阻塞指定的超時(shí)時(shí)間,等待服務(wù)器返回消息。如果在超時(shí)時(shí)間內(nèi)沒有收到消息,則返回空記錄。
commitSync(): 同步方式提交消費(fèi)者的消費(fèi)偏移量(offset),表示消息已成功消費(fèi)。
commitSync(Duration timeout): 在指定的超時(shí)時(shí)間內(nèi)同步提交消費(fèi)者的消費(fèi)偏移量。
commitAsync(): 異步方式提交消費(fèi)者的消費(fèi)偏移量,不等待提交結(jié)果。
commitAsync(OffsetCommitCallback callback): 異步方式提交消費(fèi)者的消費(fèi)偏移量,并在提交完成后執(zhí)行回調(diào)函數(shù)。
seek(TopicPartition partition, long offset): 將消費(fèi)者的偏移量(offset)設(shè)置為指定分區(qū)的指定偏移量,以便從指定位置開始消費(fèi)消息。
seekToBeginning(Collection<TopicPartition> partitions): 將消費(fèi)者的偏移量設(shè)置為指定分區(qū)的最早可用偏移量,重新從分區(qū)起始位置開始消費(fèi)消息。
seekToEnd(Collection<TopicPartition> partitions): 將消費(fèi)者的偏移量設(shè)置為指定分區(qū)的最新可用偏移量,繼續(xù)消費(fèi)分區(qū)中尚未消費(fèi)的消息。
seekByTimestamp(Map<TopicPartition, Long> timestampsToSearch): 根據(jù)時(shí)間戳搜索偏移量,并將消費(fèi)者的偏移量設(shè)置為找到的偏移量。
assignment(): 獲取當(dāng)前分配給消費(fèi)者的所有分區(qū)。
pause(Collection<TopicPartition> partitions): 暫停指定分區(qū)的消息消費(fèi),消費(fèi)者將不再繼續(xù)接收這些分區(qū)的消息。
resume(Collection<TopicPartition> partitions): 恢復(fù)被暫停的指定分區(qū)的消息消費(fèi),使消費(fèi)者可以繼續(xù)接收這些分區(qū)的消息。
close(): 關(guān)閉消費(fèi)者,釋放相關(guān)資源。

到了這里,關(guān)于java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka系列之:Kafka生產(chǎn)者和消費(fèi)者

    Kafka系列之:Kafka生產(chǎn)者和消費(fèi)者

    batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會發(fā)送數(shù)據(jù),默認(rèn)16K。 linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時(shí)間到了之后就會發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值是0ms,表示沒有延遲。 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)羅盤應(yīng)答。 1:生產(chǎn)者發(fā)送過來的

    2023年04月09日
    瀏覽(24)
  • kafka生產(chǎn)者和消費(fèi)者配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的broker里尋找其它的broker。 **key

    2024年02月12日
    瀏覽(23)
  • Kafka生產(chǎn)者與消費(fèi)者api示例

    Kafka生產(chǎn)者與消費(fèi)者api示例

    ? 一個(gè)正常的生產(chǎn)邏輯需要具備以下幾個(gè)步驟 配置生產(chǎn)者參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例 構(gòu)建待發(fā)送的消息 發(fā)送消息 關(guān)閉生產(chǎn)者實(shí)例 采用默認(rèn)分區(qū)方式將消息散列的發(fā)送到各個(gè)分區(qū)當(dāng)中 ? ?對于properties配置的第二種寫法,相對來說不會出錯,簡單舉例: ? 1.kafka的生產(chǎn)者可

    2024年02月07日
    瀏覽(24)
  • 筆記:配置多個(gè)kafka生產(chǎn)者和消費(fèi)者

    如果只有一個(gè)kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個(gè)kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費(fèi)者。 適用場景:需要消費(fèi)來源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka生產(chǎn)者和消費(fèi)者(python版)

    生產(chǎn)者 消費(fèi)者 消費(fèi)者中的組名主要用戶針對主題的偏移量進(jìn)行更改,也涉及到主題中分區(qū)的問題, kafka工具類 此工具類基本上拿過去就可以用 疑問 當(dāng)消費(fèi)者鏈接kafka時(shí)發(fā)現(xiàn)topic沒有未讀的消息怎樣退出呢,默認(rèn)是在一直等待,但是我期望沒有要讀的消息的時(shí)候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Kafka官方生產(chǎn)者和消費(fèi)者腳本簡單使用

    怎樣使用Kafka官方生產(chǎn)者和消費(fèi)者腳本進(jìn)行消費(fèi)生產(chǎn)和消費(fèi)?這里假設(shè)已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見到測試kafka對應(yīng)topic了.

    2024年02月04日
    瀏覽(23)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號從0開始計(jì)算 Partition:分區(qū)數(shù),該主題有3個(gè)分區(qū) Replica:副本數(shù),該主題有3個(gè)副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費(fèi)的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會報(bào)錯 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    執(zhí)行topic刪除命令時(shí),出現(xiàn)提示 這條命令其實(shí)并不執(zhí)行刪除動作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時(shí)也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • 探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費(fèi)者是多線程安全的么? 2. 消費(fèi)者規(guī)避多線程安全方案 2.1. 每個(gè)線程維護(hù)一個(gè)kafkaConsumer 2.2. [單/多]kafkaConsumer實(shí)例 + 多worker線程 2.3.方案優(yōu)缺點(diǎn)對比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個(gè)線程中共享一個(gè)

    2023年04月26日
    瀏覽(24)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包