
前言
前面說(shuō)了很多Kafka的性能優(yōu)點(diǎn),有些童鞋要說(shuō)了,這Kafka在企業(yè)開(kāi)發(fā)或者企業(yè)級(jí)應(yīng)用中要怎么用呢?今天咱們就來(lái)簡(jiǎn)單探究一下。
1、 使用 Kafka 進(jìn)行消息的異步處理
Kafka 提供了一個(gè)可靠的消息傳遞機(jī)制,使得企業(yè)能夠?qū)⒉煌M件之間的通信解耦,實(shí)現(xiàn)高效的異步處理。在企業(yè)級(jí)應(yīng)用中,可以通過(guò)以下步驟來(lái)使用 Kafka 進(jìn)行消息的異步處理:
-
創(chuàng)建一個(gè)或多個(gè)主題(topic)用于存儲(chǔ)消息。主題可以按照業(yè)務(wù)邏輯進(jìn)行劃分,每個(gè)主題可以有多個(gè)分區(qū)(partition)。 -
生產(chǎn)者(Producer)將消息發(fā)送到指定的主題中。 -
消費(fèi)者(Consumer)從主題訂閱消息,并將其處理邏輯與生產(chǎn)者解耦。消費(fèi)者可以根據(jù)需求選擇不同的消費(fèi)模式,如訂閱所有消息或只訂閱特定分區(qū)的消息。 -
消費(fèi)者可以將處理結(jié)果發(fā)送到其他系統(tǒng),或者將消息轉(zhuǎn)發(fā)到其他 Kafka 主題中進(jìn)行進(jìn)一步處理。
通過(guò)使用 Kafka 進(jìn)行消息的異步處理,企業(yè)可以實(shí)現(xiàn)高效、可伸縮的系統(tǒng)架構(gòu),并且降低各個(gè)組件之間的耦合程度。
2、 Kafka 的消息轉(zhuǎn)發(fā)和備份機(jī)制
Kafka 借助其分布式的架構(gòu)和復(fù)制機(jī)制,實(shí)現(xiàn)了消息的轉(zhuǎn)發(fā)和備份,確保數(shù)據(jù)的可靠性和持久性:
-
消息轉(zhuǎn)發(fā):Kafka 通過(guò)將消息分發(fā)到多個(gè)分區(qū)來(lái)實(shí)現(xiàn)消息的轉(zhuǎn)發(fā),每個(gè)分區(qū)可以由多個(gè)消費(fèi)者訂閱。分區(qū)之間的消息轉(zhuǎn)發(fā)通過(guò)消費(fèi)者群組協(xié)調(diào)器(Consumer Group Coordinator)來(lái)實(shí)現(xiàn),協(xié)調(diào)器負(fù)責(zé)將消息均勻地分發(fā)給消費(fèi)者。 -
備份機(jī)制:Kafka 將每個(gè)分區(qū)的消息進(jìn)行副本(Replica)備份,并將副本分布在不同的 Broker 節(jié)點(diǎn)上。如果某個(gè) Broker 節(jié)點(diǎn)發(fā)生故障,可以通過(guò)副本在其他節(jié)點(diǎn)上進(jìn)行數(shù)據(jù)的恢復(fù),確保數(shù)據(jù)的可靠性和持久性。
通過(guò)消息轉(zhuǎn)發(fā)和備份機(jī)制,Kafka 實(shí)現(xiàn)了高可用性和數(shù)據(jù)冗余,保證了數(shù)據(jù)流的可靠性和持久性。
3、 Kafka Connect 和 Kafka Streams 的用途和特性
-
Kafka Connect:是 Kafka 提供的一個(gè)工具,用于將外部系統(tǒng)和 Kafka 進(jìn)行連接。通過(guò) Kafka Connect,企業(yè)可以輕松地實(shí)現(xiàn)數(shù)據(jù)的導(dǎo)入和導(dǎo)出,與各種數(shù)據(jù)源(如數(shù)據(jù)庫(kù)、文件系統(tǒng))進(jìn)行集成,并且可以自定義開(kāi)發(fā) Connectors,與特定的數(shù)據(jù)源進(jìn)行交互。Kafka Connect 實(shí)現(xiàn)了高性能、可伸縮的數(shù)據(jù)傳輸,并且提供了故障恢復(fù)和數(shù)據(jù)轉(zhuǎn)換等功能。
使用 Kafka Connect 在 Java 中有兩種方式:Standalone 模式和分布式模式。
-
Standalone 模式:
import?org.apache.kafka.connect.runtime.ConnectorConfig;
import?org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import?org.apache.kafka.connect.runtime.Connect;
import?java.util.Properties;
public?class?KafkaConnectStandaloneApp?{
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?創(chuàng)建配置
????????Properties?props?=?new?Properties();
????????props.setProperty(StandaloneConfig.BOOTSTRAP_SERVERS_CONFIG,?"localhost:9092");
????????props.setProperty(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG,?"org.apache.kafka.connect.json.JsonConverter");
????????props.setProperty(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG,?"org.apache.kafka.connect.json.JsonConverter");
????????
????????//?創(chuàng)建?Standalone?模式的?Kafka?Connect
????????Connect?connect?=?new?Connect(new?StandaloneConfig(props));
????????connect.start();?//?啟動(dòng)?Kafka?Connect
????????Thread.sleep(5000);?//?等待一段時(shí)間
????????
????????//?停止?Kafka?Connect
????????connect.stop();
????}
}
-
分布式模式:
import?org.apache.kafka.connect.runtime.ConnectorConfig;
import?org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import?org.apache.kafka.connect.runtime.Connect;
import?java.util.Properties;
public?class?KafkaConnectDistributedApp?{
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?創(chuàng)建配置
????????Properties?props?=?new?Properties();
????????props.setProperty(DistributedConfig.BOOTSTRAP_SERVERS_CONFIG,?"localhost:9092");
????????
????????//?創(chuàng)建分布式模式的?Kafka?Connect
????????Connect?connect?=?new?Connect(new?DistributedConfig(props));
????????connect.start();?//?啟動(dòng)?Kafka?Connect
????????Thread.sleep(5000);?//?等待一段時(shí)間
????????
????????//?停止?Kafka?Connect
????????connect.stop();
????}
}
注意:上述示例代碼中的配置項(xiàng)可以根據(jù)實(shí)際需要進(jìn)行調(diào)整,例如連接到的 Kafka 服務(wù)器地址,序列化器等。
2. Kafka Streams:是一個(gè)輕量級(jí)的流處理庫(kù),用于對(duì) Kafka 主題的數(shù)據(jù)進(jìn)行實(shí)時(shí)處理和轉(zhuǎn)換。通過(guò) Kafka Streams,企業(yè)可以構(gòu)建實(shí)時(shí)的數(shù)據(jù)處理應(yīng)用程序,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)計(jì)算、流合并、按鍵分組和聚合等功能。
Kafka Streams 提供了高性能的流處理和事件驅(qū)動(dòng)的架構(gòu),并且與 Kafka 生態(tài)系統(tǒng)的其他組件無(wú)縫集成,提供了可擴(kuò)展、容錯(cuò)的流處理解。引入jar包文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-681146.html
<dependencies>
????<dependency>
????????<groupId>org.apache.kafka</groupId>
????????<artifactId>kafka-streams</artifactId>
????????<version>2.8.0</version>
????</dependency>
</dependencies>
import?org.apache.kafka.clients.consumer.ConsumerConfig;
import?org.apache.kafka.clients.producer.ProducerConfig;
import?org.apache.kafka.common.serialization.Serdes;
import?org.apache.kafka.streams.KafkaStreams;
import?org.apache.kafka.streams.StreamsBuilder;
import?org.apache.kafka.streams.StreamsConfig;
import?org.apache.kafka.streams.kstream.Consumed;
import?org.apache.kafka.streams.kstream.Printed;
import?org.apache.kafka.streams.kstream.Produced;
import?java.util.Properties;
public?class?KafkaStreamsApp?{
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建配置
????????Properties?props?=?new?Properties();
????????props.put(StreamsConfig.APPLICATION_ID_CONFIG,?"kafka-streams-app");
????????props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,?"localhost:9092");
????????props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,?Serdes.String().getClass());
????????props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,?Serdes.String().getClass());
????????props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest");
????????//?創(chuàng)建流構(gòu)建器
????????StreamsBuilder?builder?=?new?StreamsBuilder();
????????//?從輸入主題接收數(shù)據(jù)
????????builder.stream("input-topic",?Consumed.with(Serdes.String(),?Serdes.String()))
????????????????.peek((k,?v)?->?System.out.println("Received:?key="?+?k?+?",?value="?+?v))
????????????????.to("output-topic",?Produced.with(Serdes.String(),?Serdes.String()));
????????//?創(chuàng)建?Kafka?Streams?應(yīng)用程序
????????KafkaStreams?streams?=?new?KafkaStreams(builder.build(),?props);
????????//?啟動(dòng)應(yīng)用程序
????????streams.start();
????????//?添加關(guān)閉鉤子以優(yōu)雅地關(guān)閉應(yīng)用程序
????????Runtime.getRuntime().addShutdownHook(new?Thread(streams::close));
????}
}
作者|小年輕在奮斗文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-681146.html
到了這里,關(guān)于Kafka在企業(yè)級(jí)應(yīng)用中的實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!