国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

(五)kafka從入門到精通之topic介紹

這篇具有很好參考價(jià)值的文章主要介紹了(五)kafka從入門到精通之topic介紹。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1、kafka簡(jiǎn)介

Kafka是一個(gè)流行的分布式消息系統(tǒng),它的核心是一個(gè)由多個(gè)節(jié)點(diǎn)組成的分布式集群。在Kafka中,數(shù)據(jù)被分割成多個(gè)小塊,并通過(guò)一些復(fù)雜的算法在節(jié)點(diǎn)之間傳遞。這些小塊被稱為Kafka Topic。

2、topic知識(shí)

一個(gè)Topic是一組具有相同主題的消息。可以將Topic看作是一個(gè)數(shù)據(jù)倉(cāng)庫(kù),在這個(gè)倉(cāng)庫(kù)中存儲(chǔ)著具有相同主題的數(shù)據(jù)。比如,一個(gè)Topic可以存儲(chǔ)所有關(guān)于“股票”的數(shù)據(jù),另一個(gè)Topic可以存儲(chǔ)所有關(guān)于“天氣”的數(shù)據(jù)。

Kafka Topic的設(shè)計(jì)非常簡(jiǎn)單,但是它的功能卻非常強(qiáng)大。Kafka Topics可以實(shí)現(xiàn)數(shù)據(jù)的發(fā)布、訂閱和消費(fèi)。在發(fā)布數(shù)據(jù)時(shí),可以將數(shù)據(jù)放到一個(gè)Topic中,其他節(jié)點(diǎn)可以訂閱這個(gè)Topic,并且獲取其中的數(shù)據(jù)。在訂閱數(shù)據(jù)時(shí),可以將一個(gè)Topic的地址放到消費(fèi)者的地址中,這樣消費(fèi)者就可以獲取到該Topic中的數(shù)據(jù)。

Kafka Topis的數(shù)據(jù)結(jié)構(gòu)非常特殊,它是一個(gè)由多個(gè)分區(qū)組成的集合。每個(gè)分區(qū)都是一個(gè)獨(dú)立的數(shù)據(jù)流,并且可以使用不同的策略來(lái)處理數(shù)據(jù)的分配和復(fù)制。這種數(shù)據(jù)結(jié)構(gòu)可以提高數(shù)據(jù)的可靠性和安全性,并且可以支持大規(guī)模的數(shù)據(jù)傳輸。

Kafka Topic的分區(qū)結(jié)構(gòu)非常重要,它可以將數(shù)據(jù)分成多個(gè)部分,并且可以使用不同的策略來(lái)處理數(shù)據(jù)的分配和復(fù)制。每個(gè)分區(qū)都有一個(gè)唯一的標(biāo)識(shí)符,叫做分區(qū)ID。可以使用不同的分區(qū)ID來(lái)創(chuàng)建多個(gè)分區(qū),每個(gè)分區(qū)可以存儲(chǔ)不同的數(shù)據(jù)。

3、簡(jiǎn)單使用

在使用Kafka Topics時(shí),需要注意一些事項(xiàng)。首先,要?jiǎng)?chuàng)建一個(gè)Topic,并且指定該Topic的主題和相關(guān)參數(shù)。其次,要?jiǎng)?chuàng)建一些消費(fèi)者,并且將它們添加到該Topic的訂閱列表中。最后,當(dāng)數(shù)據(jù)被發(fā)布到Topic中時(shí),消費(fèi)者會(huì)自動(dòng)訂閱這個(gè)Topic,并且獲取其中的數(shù)據(jù)。

首先,您需要在項(xiàng)目中添加 Kafka 依賴項(xiàng):

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>

然后,您需要編寫一個(gè)生產(chǎn)者,以將消息發(fā)布到指定的主題中:

在創(chuàng)建Topic時(shí),可以指定該Topic的分區(qū)數(shù)和每個(gè)分區(qū)的大小。分區(qū)數(shù)表示要將數(shù)據(jù)分成多少個(gè)部分,每個(gè)部分可以使用不同的策略來(lái)處理數(shù)據(jù)的分配和復(fù)制。每個(gè)分區(qū)的大小表示每個(gè)部分可以存儲(chǔ)多少數(shù)據(jù)。

package com.yinfeng.test.demo.kafka;

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaProducerDemo {
    @SneakyThrows
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 發(fā)送3條消息
        for (int i = 0; i < 3; i++) {
            ProducerRecord<String, String> record1 = new ProducerRecord<>("test", "key"+i, "hello"+i);
            producer.send(record1, (metadata, exception) -> {
                System.out.println("消息發(fā)送成功 topic="+metadata.topic()+", msg=>" + record1.value());
            });
        }

        // kafka異步發(fā)送,延時(shí)等待執(zhí)行完成
        Thread.sleep(5000);

    }
}

kafka的topic,kafka專區(qū),java消息中間件筆記,kafka,分布式,大數(shù)據(jù),云原生,java,原力計(jì)劃

當(dāng)數(shù)據(jù)被發(fā)布到Topic中時(shí),可以將數(shù)據(jù)放到一個(gè)Topic中,其他節(jié)點(diǎn)可以訂閱這個(gè)Topic,并且獲取其中的數(shù)據(jù)。訂閱一個(gè)Topic的過(guò)程可以用以下代碼表示:

在消費(fèi)Topic中的數(shù)據(jù)時(shí),需要指定要消費(fèi)的主題名稱和消費(fèi)者的地址。消費(fèi)者的地址包括一個(gè)主機(jī)名和一個(gè)端口號(hào),以及一個(gè)唯一的標(biāo)識(shí)符,叫做消費(fèi)者ID。消費(fèi)者ID可以使用環(huán)境變量來(lái)設(shè)置,也可以在消費(fèi)者的地址中直接指定。

package com.yinfeng.test.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my_group");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

        consumer.subscribe(Collections.singleton("test"));

        // 循環(huán)拉取消息
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }

    }
}

kafka的topic,kafka專區(qū),java消息中間件筆記,kafka,分布式,大數(shù)據(jù),云原生,java,原力計(jì)劃

在上面的代碼中,我們首先創(chuàng)建了一個(gè)Kafka集群,然后創(chuàng)建了一個(gè)Topic,并且指定了該Topic的分區(qū)ID。接著,我們創(chuàng)建了一個(gè)Kafka集群,并且指定了該Topic的分區(qū)ID。接著,我們創(chuàng)建了一個(gè)消費(fèi)者,并且將該消費(fèi)者添加到該Topic的訂閱列表中。最后,我們使用該消費(fèi)者來(lái)消費(fèi)該Topic中的數(shù)據(jù)。

在消費(fèi)數(shù)據(jù)時(shí),我們使用了Kafka提供的ConsumerRecords類來(lái)獲取數(shù)據(jù)。我們首先使用該類的poll方法來(lái)獲取一個(gè)消費(fèi)者的數(shù)據(jù),然后使用該類的其他方法來(lái)對(duì)數(shù)據(jù)進(jìn)行處理。

在設(shè)置消費(fèi)者的偏移量時(shí),我們使用了Kafka提供的OffsetRequest類來(lái)向Kafka集群中提交消費(fèi)者的偏移量。我們首先創(chuàng)建了一個(gè)OffsetRequest對(duì)象,然后使用該類的setOffset方法來(lái)將該對(duì)象設(shè)置為要求的偏移量。最后,我們調(diào)用該類的commitSync方法來(lái)提交該偏移量。不過(guò)由于我們?cè)O(shè)置自動(dòng)提交,所以這步可以不用操作。

4、注意事項(xiàng)

在使用Kafka Topics時(shí),還需要注意一些其他的事項(xiàng)。

例如,在創(chuàng)建Topic時(shí),可以指定該Topic的備份策略,以確保數(shù)據(jù)的可靠性和安全性。備份策略包括多種不同的方法,如備份到本地文件、備份到數(shù)據(jù)庫(kù)、備份到其他Kafka集群等。

另外,在使用Kafka Topics時(shí),還可以使用Kafka提供的一些API和工具來(lái)對(duì)Topic進(jìn)行操作和管理。例如,可以使用Kafka提供的AdminClient來(lái)管理Kafka集群中的所有Topic,可以使用Kafka提供的ConsumerGroupClient來(lái)管理Kafka集群中的所有ConsumerGroup,可以使用Kafka提供的KafkaConsumer來(lái)消費(fèi)Kafka集群中的數(shù)據(jù)等。

總之,Kafka Topics是Kafka中非常重要的一個(gè)概念,它可以實(shí)現(xiàn)數(shù)據(jù)的發(fā)布、訂閱和消費(fèi)。在使用Kafka Topics時(shí),需要注意一些事項(xiàng),以確保數(shù)據(jù)的可靠性和安全性。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-656313.html

到了這里,關(guān)于(五)kafka從入門到精通之topic介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka刪除topic消息的三種方式

    kafka刪除topic消息的三種方式 方法一:快速配置刪除法(確保topic數(shù)據(jù)不要了) 1.kafka啟動(dòng)之前,在server.properties配置delete.topic.enable=true 2.執(zhí)行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具刪除 注意:如果kafka啟動(dòng)之前沒(méi)有配置delete.topic.e

    2024年02月16日
    瀏覽(26)
  • Docker 安裝kafka 并創(chuàng)建topic 進(jìn)行消息通信

    Docker 安裝kafka 并創(chuàng)建topic 進(jìn)行消息通信

    ????????Apache Kafka是一個(gè)分布式流處理平臺(tái),用于構(gòu)建高性能、可擴(kuò)展的實(shí)時(shí)數(shù)據(jù)流應(yīng)用程序。本文將介紹如何使用Docker容器化技術(shù)來(lái)安裝和配置Apache Kafka。 1、kafka安裝必須先安裝Zookpper 2、下載鏡像 3、查看下載好的鏡像 4、啟動(dòng)Kafka 5、查看是否創(chuàng)建好Kafka容器 6、進(jìn)入到

    2024年03月15日
    瀏覽(18)
  • Kafka消息中間件(Kafka與MQTT區(qū)別)

    Kafka消息中間件(Kafka與MQTT區(qū)別)

    Kafka是一個(gè)分布式流處理平臺(tái),它可以快速地處理大量的數(shù)據(jù)流。Kafka的核心原理是基于 發(fā)布/訂閱 模式的消息隊(duì)列。Kafka允許多個(gè)生產(chǎn)者將數(shù)據(jù)寫入主題(topic)中,同時(shí)也允許多個(gè)消費(fèi)者從主題中讀取數(shù)據(jù)。 Kafka重要原理 Kafka的設(shè)計(jì)原則之一是高可用性和可擴(kuò)展性,因此它

    2024年02月03日
    瀏覽(30)
  • 消息中間件(二)——kafka

    消息中間件(二)——kafka

    在大數(shù)據(jù)中,會(huì)使用到大量的數(shù)據(jù)。面對(duì)這些海量的數(shù)據(jù),我們一是需要做到能夠 收集 這些數(shù)據(jù),其次是要能夠 分析和處理 這些海量數(shù)據(jù)。在此過(guò)程中,需要一套消息系統(tǒng)。 Kafka專門為分 布式高吞吐量 系統(tǒng)設(shè)計(jì)。作為一個(gè)消息代理的替代品,Kafka往往做的比其他消息中間

    2024年02月07日
    瀏覽(29)
  • 消息中間件 —— 初識(shí)Kafka

    消息中間件 —— 初識(shí)Kafka

    1.1.1、為什么要有消息隊(duì)列? 1.1.2、消息隊(duì)列 消息 Message 網(wǎng)絡(luò)中的兩臺(tái)計(jì)算機(jī)或者兩個(gè)通訊設(shè)備之間傳遞的數(shù)據(jù)。例如說(shuō):文本、音樂(lè)、視頻等內(nèi)容。 隊(duì)列 Queue 一種特殊的線性表(數(shù)據(jù)元素首尾相接),特殊之處在于只允許在首部刪除元素和在尾部追加元素(FIFO)。 入隊(duì)、出

    2024年02月13日
    瀏覽(24)
  • 【中間件】消息中間件之Kafka

    一、概念介紹 Apache Kafka是一個(gè)分布式流處理平臺(tái),用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流應(yīng)用。它可以處理網(wǎng)站、應(yīng)用或其他來(lái)源產(chǎn)生的大量數(shù)據(jù)流,并能實(shí)時(shí)地將這些數(shù)據(jù)流傳輸?shù)搅硪粋€(gè)系統(tǒng)或應(yīng)用中進(jìn)行處理。 核心概念: Topic(主題) :消息的分類,用于區(qū)分不同的業(yè)務(wù)消息。

    2024年01月20日
    瀏覽(43)
  • 消息中間件之Kafka(一)

    消息中間件之Kafka(一)

    高性能的消息中間件,在大數(shù)據(jù)的業(yè)務(wù)場(chǎng)景下性能比較好,kafka本身不維護(hù)消息位點(diǎn),而是交由Consumer來(lái)維護(hù),消息可以重復(fù)消費(fèi),并且內(nèi)部使用了零拷貝技術(shù),性能比較好 Broker持久化消息時(shí)采用了MMAP的技術(shù),Consumer拉取消息時(shí)使用的sendfile技術(shù) Kafka是最初由Linkedin公司開(kāi)發(fā),

    2024年01月20日
    瀏覽(53)
  • 消息中間件之Kafka(二)

    消息中間件之Kafka(二)

    1.1 為什么要對(duì)topic下數(shù)據(jù)進(jìn)行分區(qū)存儲(chǔ)? 1.commit log文件會(huì)受到所在機(jī)器的文件系統(tǒng)大小的限制,分區(qū)之后可以將不同的分區(qū)放在不同的機(jī)器上, 相當(dāng)于對(duì)數(shù)據(jù)做了分布式存儲(chǔ),理論上一個(gè)topic可以處理任意數(shù)量的數(shù)據(jù) 2.提高并行度 1.2 如何在多個(gè)partition中保證順序消費(fèi)? 方案一

    2024年01月21日
    瀏覽(29)
  • kafka 基礎(chǔ)概念、命令行操作(查看所有topic、創(chuàng)建topic、刪除topic、查看某個(gè)Topic的詳情、修改分區(qū)數(shù)、發(fā)送消息、消費(fèi)消息、 查看消費(fèi)者組 、更新消費(fèi)者的偏移位置)

    kafka 基礎(chǔ)概念、命令行操作(查看所有topic、創(chuàng)建topic、刪除topic、查看某個(gè)Topic的詳情、修改分區(qū)數(shù)、發(fā)送消息、消費(fèi)消息、 查看消費(fèi)者組 、更新消費(fèi)者的偏移位置)

    kafka官網(wǎng) Broker ??一臺(tái)kafka服務(wù)器就是一個(gè)broker,可容納多個(gè)topic。一個(gè)集群由多個(gè)broker組成; Producer ??生產(chǎn)者,即向kafka的broker-list發(fā)送消息的客戶端; Consumer ??消費(fèi)者,即向kafka的broker-list訂閱消息的客戶端; Consumer Group ??消費(fèi)者組是 邏輯上的一個(gè)訂閱者 ,由多個(gè)

    2024年02月01日
    瀏覽(121)
  • Kafka - 主題Topic與消費(fèi)者消息Offset日志記錄機(jī)制

    Kafka - 主題Topic與消費(fèi)者消息Offset日志記錄機(jī)制

    可以根據(jù)業(yè)務(wù)類型,分發(fā)到不同的Topic中,對(duì)于每一個(gè)Topic,下面可以有多個(gè)分區(qū)(Partition)日志文件: kafka 下的Topic的多個(gè)分區(qū),每一個(gè)分區(qū)實(shí)質(zhì)上就是一個(gè)隊(duì)列,將接收到的消息暫時(shí)存儲(chǔ)到隊(duì)列中,根據(jù)配置以及消息消費(fèi)情況來(lái)對(duì)隊(duì)列消息刪除。 可以這么來(lái)理解Topic,Partitio

    2024年02月03日
    瀏覽(23)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包