1、kafka簡(jiǎn)介
Kafka是一個(gè)流行的分布式消息系統(tǒng),它的核心是一個(gè)由多個(gè)節(jié)點(diǎn)組成的分布式集群。在Kafka中,數(shù)據(jù)被分割成多個(gè)小塊,并通過(guò)一些復(fù)雜的算法在節(jié)點(diǎn)之間傳遞。這些小塊被稱為Kafka Topic。
2、topic知識(shí)
一個(gè)Topic是一組具有相同主題的消息。可以將Topic看作是一個(gè)數(shù)據(jù)倉(cāng)庫(kù),在這個(gè)倉(cāng)庫(kù)中存儲(chǔ)著具有相同主題的數(shù)據(jù)。比如,一個(gè)Topic可以存儲(chǔ)所有關(guān)于“股票”的數(shù)據(jù),另一個(gè)Topic可以存儲(chǔ)所有關(guān)于“天氣”的數(shù)據(jù)。
Kafka Topic的設(shè)計(jì)非常簡(jiǎn)單,但是它的功能卻非常強(qiáng)大。Kafka Topics可以實(shí)現(xiàn)數(shù)據(jù)的發(fā)布、訂閱和消費(fèi)。在發(fā)布數(shù)據(jù)時(shí),可以將數(shù)據(jù)放到一個(gè)Topic中,其他節(jié)點(diǎn)可以訂閱這個(gè)Topic,并且獲取其中的數(shù)據(jù)。在訂閱數(shù)據(jù)時(shí),可以將一個(gè)Topic的地址放到消費(fèi)者的地址中,這樣消費(fèi)者就可以獲取到該Topic中的數(shù)據(jù)。
Kafka Topis的數(shù)據(jù)結(jié)構(gòu)非常特殊,它是一個(gè)由多個(gè)分區(qū)組成的集合。每個(gè)分區(qū)都是一個(gè)獨(dú)立的數(shù)據(jù)流,并且可以使用不同的策略來(lái)處理數(shù)據(jù)的分配和復(fù)制。這種數(shù)據(jù)結(jié)構(gòu)可以提高數(shù)據(jù)的可靠性和安全性,并且可以支持大規(guī)模的數(shù)據(jù)傳輸。
Kafka Topic的分區(qū)結(jié)構(gòu)非常重要,它可以將數(shù)據(jù)分成多個(gè)部分,并且可以使用不同的策略來(lái)處理數(shù)據(jù)的分配和復(fù)制。每個(gè)分區(qū)都有一個(gè)唯一的標(biāo)識(shí)符,叫做分區(qū)ID。可以使用不同的分區(qū)ID來(lái)創(chuàng)建多個(gè)分區(qū),每個(gè)分區(qū)可以存儲(chǔ)不同的數(shù)據(jù)。
3、簡(jiǎn)單使用
在使用Kafka Topics時(shí),需要注意一些事項(xiàng)。首先,要?jiǎng)?chuàng)建一個(gè)Topic,并且指定該Topic的主題和相關(guān)參數(shù)。其次,要?jiǎng)?chuàng)建一些消費(fèi)者,并且將它們添加到該Topic的訂閱列表中。最后,當(dāng)數(shù)據(jù)被發(fā)布到Topic中時(shí),消費(fèi)者會(huì)自動(dòng)訂閱這個(gè)Topic,并且獲取其中的數(shù)據(jù)。
首先,您需要在項(xiàng)目中添加 Kafka 依賴項(xiàng):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
然后,您需要編寫一個(gè)生產(chǎn)者,以將消息發(fā)布到指定的主題中:
在創(chuàng)建Topic時(shí),可以指定該Topic的分區(qū)數(shù)和每個(gè)分區(qū)的大小。分區(qū)數(shù)表示要將數(shù)據(jù)分成多少個(gè)部分,每個(gè)部分可以使用不同的策略來(lái)處理數(shù)據(jù)的分配和復(fù)制。每個(gè)分區(qū)的大小表示每個(gè)部分可以存儲(chǔ)多少數(shù)據(jù)。
package com.yinfeng.test.demo.kafka;
import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author admin
* @date 2023/7/2 19:02
* @description
*/
public class KafkaProducerDemo {
@SneakyThrows
public static void main(String[] args) {
Properties props = new Properties();
// Kafka 集群地址
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 發(fā)送3條消息
for (int i = 0; i < 3; i++) {
ProducerRecord<String, String> record1 = new ProducerRecord<>("test", "key"+i, "hello"+i);
producer.send(record1, (metadata, exception) -> {
System.out.println("消息發(fā)送成功 topic="+metadata.topic()+", msg=>" + record1.value());
});
}
// kafka異步發(fā)送,延時(shí)等待執(zhí)行完成
Thread.sleep(5000);
}
}
當(dāng)數(shù)據(jù)被發(fā)布到Topic中時(shí),可以將數(shù)據(jù)放到一個(gè)Topic中,其他節(jié)點(diǎn)可以訂閱這個(gè)Topic,并且獲取其中的數(shù)據(jù)。訂閱一個(gè)Topic的過(guò)程可以用以下代碼表示:
在消費(fèi)Topic中的數(shù)據(jù)時(shí),需要指定要消費(fèi)的主題名稱和消費(fèi)者的地址。消費(fèi)者的地址包括一個(gè)主機(jī)名和一個(gè)端口號(hào),以及一個(gè)唯一的標(biāo)識(shí)符,叫做消費(fèi)者ID。消費(fèi)者ID可以使用環(huán)境變量來(lái)設(shè)置,也可以在消費(fèi)者的地址中直接指定。
package com.yinfeng.test.demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author admin
* @date 2023/7/2 19:02
* @description
*/
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka 集群地址
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Collections.singleton("test"));
// 循環(huán)拉取消息
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
在上面的代碼中,我們首先創(chuàng)建了一個(gè)Kafka集群,然后創(chuàng)建了一個(gè)Topic,并且指定了該Topic的分區(qū)ID。接著,我們創(chuàng)建了一個(gè)Kafka集群,并且指定了該Topic的分區(qū)ID。接著,我們創(chuàng)建了一個(gè)消費(fèi)者,并且將該消費(fèi)者添加到該Topic的訂閱列表中。最后,我們使用該消費(fèi)者來(lái)消費(fèi)該Topic中的數(shù)據(jù)。
在消費(fèi)數(shù)據(jù)時(shí),我們使用了Kafka提供的ConsumerRecords類來(lái)獲取數(shù)據(jù)。我們首先使用該類的poll方法來(lái)獲取一個(gè)消費(fèi)者的數(shù)據(jù),然后使用該類的其他方法來(lái)對(duì)數(shù)據(jù)進(jìn)行處理。
在設(shè)置消費(fèi)者的偏移量時(shí),我們使用了Kafka提供的OffsetRequest類來(lái)向Kafka集群中提交消費(fèi)者的偏移量。我們首先創(chuàng)建了一個(gè)OffsetRequest對(duì)象,然后使用該類的setOffset方法來(lái)將該對(duì)象設(shè)置為要求的偏移量。最后,我們調(diào)用該類的commitSync方法來(lái)提交該偏移量。不過(guò)由于我們?cè)O(shè)置自動(dòng)提交,所以這步可以不用操作。
4、注意事項(xiàng)
在使用Kafka Topics時(shí),還需要注意一些其他的事項(xiàng)。
例如,在創(chuàng)建Topic時(shí),可以指定該Topic的備份策略,以確保數(shù)據(jù)的可靠性和安全性。備份策略包括多種不同的方法,如備份到本地文件、備份到數(shù)據(jù)庫(kù)、備份到其他Kafka集群等。
另外,在使用Kafka Topics時(shí),還可以使用Kafka提供的一些API和工具來(lái)對(duì)Topic進(jìn)行操作和管理。例如,可以使用Kafka提供的AdminClient來(lái)管理Kafka集群中的所有Topic,可以使用Kafka提供的ConsumerGroupClient來(lái)管理Kafka集群中的所有ConsumerGroup,可以使用Kafka提供的KafkaConsumer來(lái)消費(fèi)Kafka集群中的數(shù)據(jù)等。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-656313.html
總之,Kafka Topics是Kafka中非常重要的一個(gè)概念,它可以實(shí)現(xiàn)數(shù)據(jù)的發(fā)布、訂閱和消費(fèi)。在使用Kafka Topics時(shí),需要注意一些事項(xiàng),以確保數(shù)據(jù)的可靠性和安全性。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-656313.html
到了這里,關(guān)于(五)kafka從入門到精通之topic介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!