国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka

這篇具有很好參考價(jià)值的文章主要介紹了Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

系列文章目錄

上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle
架構(gòu)必備能力——kafka的選型對(duì)比及應(yīng)用場(chǎng)景
Kafka存取原理與實(shí)現(xiàn)分析,打破面試難關(guān)
防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實(shí)踐



kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐
繼上一次教大家手把手安裝kafka后,今天我們直接來(lái)到入門實(shí)操教程,也就是使用SpringBoot該怎么對(duì)接和使用kafka。當(dāng)然,在一開(kāi)始我們也會(huì)比較細(xì)致的介紹一下kafka本身。那么話不多說(shuō),馬上開(kāi)始今天的學(xué)習(xí)吧

??作者簡(jiǎn)介:戰(zhàn)斧,從事金融IT行業(yè),有著多年一線開(kāi)發(fā)、架構(gòu)經(jīng)驗(yàn);愛(ài)好廣泛,樂(lè)于分享,致力于創(chuàng)作更多高質(zhì)量?jī)?nèi)容
??本文收錄于 kafka 專欄,有需要者,可直接訂閱專欄實(shí)時(shí)獲取更新
??高質(zhì)量專欄 云原生、RabbitMQ、Spring全家桶 等仍在更新,歡迎指導(dǎo)
??Zookeeper Redis dubbo docker netty等諸多框架,以及架構(gòu)與分布式專題即將上線,敬請(qǐng)期待

一、Kafka與流處理

我們先來(lái)看看比較正式的介紹:Kafka是一種流處理平臺(tái),由LinkedIn公司創(chuàng)建,現(xiàn)在是Apache下的開(kāi)源項(xiàng)目。Kafka通過(guò)發(fā)布/訂閱機(jī)制實(shí)現(xiàn)消息的異步傳輸和處理。它具有高吞吐量、低延遲、可伸縮性和可靠性等優(yōu)點(diǎn),使其成為了流處理和實(shí)時(shí)數(shù)據(jù)管道的首選解決方案

介紹其實(shí)是比較清晰的,如果你是第一次接觸“流處理”概念,我們也可以做一點(diǎn)解釋,流處理指的是對(duì)連續(xù)、實(shí)時(shí)產(chǎn)生的數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理、計(jì)算和分析的過(guò)程。

假設(shè)你正在玩一款在線游戲,其他玩家的動(dòng)作和游戲事件會(huì)實(shí)時(shí)地傳到服務(wù)器上。這些事件就形成了一條數(shù)據(jù)流。在流處理中,我們會(huì)對(duì)這條數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理,例如計(jì)算每個(gè)玩家的分?jǐn)?shù)、監(jiān)控游戲區(qū)域內(nèi)的異常情況、統(tǒng)計(jì)玩家在線時(shí)長(zhǎng)等等。這樣,游戲管理員就可以實(shí)時(shí)地監(jiān)控和管理游戲,而不需要等到游戲結(jié)束才進(jìn)行操作。
類似的,流處理還可以應(yīng)用在其他實(shí)時(shí)性要求比較高的場(chǎng)景中,例如金融交易、物聯(lián)網(wǎng)、實(shí)時(shí)監(jiān)測(cè)等。通過(guò)對(duì)數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理,我們可以更加精準(zhǔn)地掌握數(shù)據(jù)變化的情況,并及時(shí)做出反應(yīng)和調(diào)整,

二、Spring Boot與Kafka的整合Demo

1. 新建springboot工程

如果你沒(méi)有現(xiàn)成的Spring boot項(xiàng)目,那么我們可以使用IDEA自帶的Spring Initializr 來(lái)創(chuàng)建一個(gè)spring-boot的項(xiàng)目

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

此時(shí)我們可以直接選擇使用Apache Kafka,另外項(xiàng)目還可以加個(gè)Spring Web準(zhǔn)備讓前臺(tái)調(diào)用

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

2. 添加Kafka依賴

如果你不是像上述一樣新建的項(xiàng)目,那你也可以選擇在已有的Spring Boot應(yīng)用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依賴:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

這里我們指定了Kafka服務(wù)器的地址和端口,并配置了消費(fèi)者組的ID,關(guān)于消費(fèi)者組的概念,其實(shí)就是某一些消費(fèi)者具備相同的功能,因此會(huì)把他們?cè)O(shè)為同一個(gè)消費(fèi)者組,這樣他們就不會(huì)重復(fù)消費(fèi)同一條消息了。更具體地原理,我們會(huì)在之后地篇章中介紹。

4. 創(chuàng)建Kafka生產(chǎn)者

在Kafka中,生產(chǎn)者是發(fā)送消息的應(yīng)用程序或服務(wù)。在Spring Boot中,我們可以使用KafkaTemplate類來(lái)創(chuàng)建Kafka生產(chǎn)者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

這里我們使用@Autowired注解來(lái)自動(dòng)注入KafkaTemplate,并使用send方法將消息發(fā)送到名為“test_topic”的Kafka主題中。


5. 創(chuàng)建Kafka消費(fèi)者

在Kafka中,消費(fèi)者是接收并處理訂閱主題消息的應(yīng)用程序或服務(wù)。在Spring Boot中,我們可以使用@KafkaListener注解來(lái)創(chuàng)建Kafka消費(fèi)者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 應(yīng)用程序入口

現(xiàn)在我們已經(jīng)完成了Spring Boot和Kafka的整合。我們可以啟動(dòng)Spring Boot應(yīng)用程序,然后發(fā)送消息并消費(fèi)它,以測(cè)試我們的應(yīng)用程序是否正確地與Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在這個(gè)例子中,我們使用@Autowired注解來(lái)自動(dòng)注入KafkaProducer,并通過(guò)發(fā)送消息的方法來(lái)調(diào)用sendMessage方法。最終項(xiàng)目整體框架如圖:

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

三、啟動(dòng)與驗(yàn)證

首先自然是啟動(dòng) Kafka ,怎么啟動(dòng)可參考 《上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle》,然后是啟動(dòng)我們的Spring Boot項(xiàng)目

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

然后在瀏覽器中輸入

http://127.0.0.1:8080/send/hello

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

最后檢查我們的項(xiàng)目日志:

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

可以看到,整個(gè)發(fā)送和接收的流程都走通了

四、KafkaTemplate 介紹

不難看出,在Springboot中,使用kafka的關(guān)鍵在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生產(chǎn)者模版,用于向 Kafka 集群發(fā)送消息。并且把 Kafka 的生產(chǎn)者客戶端封裝成了一個(gè) Spring Bean,提供更加方便易用的 API。

它有三個(gè)主要屬性:

  • producerFactory:生產(chǎn)者工廠類,用于創(chuàng)建 KafkaProducer 實(shí)例。
  • defaultTopic:默認(rèn)主題名稱,如果在發(fā)送消息時(shí)沒(méi)有指定主題名稱,則使用該默認(rèn)主題。
  • messageConverter:消息轉(zhuǎn)換器,用于將消息對(duì)象轉(zhuǎn)換為 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主題發(fā)送一條消息。ProducerRecord 包含了主題名稱、分區(qū)編號(hào)、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主題發(fā)送一條消息。
  • send(String topic, K key, V data):向指定的 Kafka 主題發(fā)送一條消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回調(diào)方式發(fā)送消息,可以自定義消息的創(chuàng)建過(guò)程和錯(cuò)誤處理過(guò)程。
  • inTransaction():?jiǎn)⒂檬聞?wù),多個(gè) send 方法調(diào)用將被包裝在一個(gè)事務(wù)中,保證 Kafka 事務(wù)的原子性。

除了上述方法外,KafkaTemplate 還提供了其他方法,如 sendDefault()、sendOffsetsToTransaction() 等,可以根據(jù)實(shí)際需要進(jìn)行選擇和使用。

需要注意的是,在使用 KafkaTemplate 發(fā)送消息時(shí)應(yīng)該注意消息的序列化方式、主題和分區(qū)的選擇以及錯(cuò)誤處理等問(wèn)題,以保證消息的可靠性和正確性。

當(dāng)然,很多同學(xué)可能還注意到一個(gè)細(xì)節(jié),我們?cè)谏厦娴腄emo中,我們直接將其 @Autowired進(jìn)我們的代碼中,這是怎么做到的呢?換句話說(shuō),這個(gè) KafkaTemplate 為什么自己就會(huì)被spring 容器管理的呢?其實(shí)這得益于SpringBoot中對(duì)Kafka有了很多自動(dòng)配置的內(nèi)容。如下:

kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐
kafka對(duì)接,kafka,kafka,spring boot,分布式,KafkaTemplate,kafka實(shí)踐

如上圖,相信對(duì)Spring Boot熟悉的同學(xué)看到 ConditionalOnClass 、 ConditionalOnMissingBean 應(yīng)該就明白了。其實(shí)Spring Boot 早就貼心的為我們預(yù)留了這些自動(dòng)配置,只要我們引入了 spring-kafka 包,使得項(xiàng)目中出現(xiàn)了 KafkaTemplate 類,那么它就能被自動(dòng)配置并存入Spring 容器內(nèi)

總結(jié)

今天我們通過(guò)一個(gè)Demo講解了在SpringBoot中如何對(duì)接Kafka,也介紹了下關(guān)鍵類 KafkaTemplate ,得益于Spring Boot 的自動(dòng)配置,開(kāi)發(fā)者要做的配置內(nèi)容其實(shí)并不多,使用也主要是依賴其提供的API,相對(duì)簡(jiǎn)單,相信大家很容易也都學(xué)會(huì)了,那么在后面的過(guò)程中,我們將繼續(xù)學(xué)習(xí)其使用,并且會(huì)著重講解 Kafka 的原理與結(jié)構(gòu)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-712792.html

到了這里,關(guān)于Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • SpringBoot對(duì)接阿里云OSS上傳文件以及回調(diào)(有坑)

    SpringBoot對(duì)接阿里云OSS上傳文件以及回調(diào)(有坑)

    今天在對(duì)接阿里云OSS對(duì)象存儲(chǔ), 把這過(guò)程記錄下來(lái) 阿里云的內(nèi)容很多,文檔是真的難找又難懂 本文主要是用的PostObject API 加上 Callback參數(shù) PostObject - https://help.aliyun.com/document_detail/31988.html?spm=a2c4g.31989.0.0 Callback - https://help.aliyun.com/document_detail/31989.html?spm=a2c4g.31988.0.0 前端向后

    2024年02月11日
    瀏覽(20)
  • Springboot實(shí)戰(zhàn)14 消息驅(qū)動(dòng):如何使用 KafkaTemplate 集成 Kafka?

    從今天開(kāi)始,我們將進(jìn)入 Spring Boot 中另一個(gè)重要話題的討論,即消息通信。 消息通信是 Web 應(yīng)用程序中間層組件中的代表性技術(shù)體系,主要用于構(gòu)建復(fù)雜而又靈活的業(yè)務(wù)流程。在互聯(lián)網(wǎng)應(yīng)用中,消息通信被認(rèn)為是實(shí)現(xiàn)系統(tǒng)解耦和高并發(fā)的關(guān)鍵技術(shù)體系。本節(jié)課我們將在 Spri

    2024年02月04日
    瀏覽(22)
  • 什么是Flink CDC,以及如何使用

    什么是Flink CDC,以及如何使用

    數(shù)據(jù)庫(kù)中的CDC(Change Data Capture,變更數(shù)據(jù)捕獲)是一種用于實(shí)時(shí)跟蹤數(shù)據(jù)庫(kù)中數(shù)據(jù)變化的技術(shù)。CDC的主要目的是在數(shù)據(jù)庫(kù)中捕獲增量數(shù)據(jù),以便在需要時(shí)可以輕松地將這些數(shù)據(jù)合并到其他系統(tǒng)或應(yīng)用程序中。CDC在數(shù)據(jù)庫(kù)管理、數(shù)據(jù)同步、數(shù)據(jù)集成和數(shù)據(jù)備份等方面具有廣泛的

    2024年02月11日
    瀏覽(18)
  • 【JAVA】為什么要使用封裝以及如何封裝

    【JAVA】為什么要使用封裝以及如何封裝

    個(gè)人主頁(yè):【??個(gè)人主頁(yè)】 系列專欄:【??初識(shí)JAVA】 Java的封裝指的是在一個(gè)類中將數(shù)據(jù)和方法進(jìn)行封裝,使其可以保護(hù)起來(lái),只能在該類內(nèi)部訪問(wèn),而不允許外部直接訪問(wèn)和修改。這是Java面向?qū)ο缶幊痰娜齻€(gè)基本特性之一,另外兩個(gè)是繼承和多態(tài)。在此之前我們已經(jīng)學(xué)

    2024年02月08日
    瀏覽(19)
  • 實(shí)戰(zhàn)指南,SpringBoot + Mybatis 如何對(duì)接多數(shù)據(jù)源

    實(shí)戰(zhàn)指南,SpringBoot + Mybatis 如何對(duì)接多數(shù)據(jù)源

    MyBatis緩存原理 Mybatis plugin 的使用及原理 MyBatis+Springboot 啟動(dòng)到SQL執(zhí)行全流程 數(shù)據(jù)庫(kù)操作不再困難,MyBatis動(dòng)態(tài)Sql標(biāo)簽解析 從零開(kāi)始,手把手教你搭建Spring Boot后臺(tái)工程并說(shuō)明 Spring框架與SpringBoot的關(guān)聯(lián)與區(qū)別 Spring監(jiān)聽(tīng)器用法與原理詳解 Spring事務(wù)暢談 —— 由淺入深徹底弄懂

    2024年02月12日
    瀏覽(28)
  • 【機(jī)器學(xué)習(xí) | 深度學(xué)習(xí)】Colab是什么?以及如何使用它?

    【機(jī)器學(xué)習(xí) | 深度學(xué)習(xí)】Colab是什么?以及如何使用它?

    Colaboratory(簡(jiǎn)稱為Colab)是由Google開(kāi)發(fā)的一種基于云端的交互式筆記本環(huán)境。它提供了免費(fèi)的計(jì)算資源(包括CPU、GPU和TPU),可讓用戶在瀏覽器中編寫和執(zhí)行代碼,而無(wú)需進(jìn)行任何配置和安裝。 Colab的目標(biāo)是使機(jī)器學(xué)習(xí)和數(shù)據(jù)科學(xué)的工作更加便捷、靈活和可共享。 下面是Col

    2024年02月09日
    瀏覽(25)
  • 什么是OpenVino?以及如何使用OpenVino運(yùn)行yolo

    什么是OpenVino?以及如何使用OpenVino運(yùn)行yolo

    目錄 Openvino簡(jiǎn)介 如何使用它? 構(gòu)建源代碼 Openvino IR模型 第一個(gè)Openvino示例 C語(yǔ)言示例 C++示例 使用OpenVino跑Yolo模型 Openvino 是由 Intel 開(kāi)發(fā)的專門用于優(yōu)化和部署人工智能推理的半開(kāi)源的工具包,主要用于對(duì)深度 推理做優(yōu)化 。 Openvino內(nèi)部集成了 Opencv 、 TensorFlow 模塊,除此之外

    2023年04月26日
    瀏覽(23)
  • Logback日志框架使用詳解以及如何Springboot快速集成

    Logback日志框架使用詳解以及如何Springboot快速集成

    ? 日志系統(tǒng)是用于記錄程序的運(yùn)行過(guò)程中產(chǎn)生的運(yùn)行信息、異常信息等,一般有8個(gè)級(jí)別,從低到高為All Trace Debug Info Warn Error Fatal OFF off 最高等級(jí),用于關(guān)閉所有日志記錄 fatal 指出每個(gè)嚴(yán)重的錯(cuò)誤事件將會(huì)導(dǎo)致應(yīng)用程序的退出。 error 指出雖然發(fā)生錯(cuò)誤事件,但仍然不影響系統(tǒng)

    2024年02月07日
    瀏覽(32)
  • Spring Boot中的SimpMessagingTemplate是什么,原理,以及如何使用

    Spring Boot中的SimpMessagingTemplate是什么,原理,以及如何使用

    SimpMessagingTemplate是Spring Framework中的一個(gè)類,用于向WebSocket客戶端發(fā)送消息。在Spring Boot應(yīng)用程序中,可以使用SimpMessagingTemplate來(lái)實(shí)現(xiàn)WebSocket通信的消息發(fā)送功能。本文將介紹SimpMessagingTemplate的原理和使用方法。 SimpMessagingTemplate是Spring Framework中的一個(gè)類,用于向WebSocket客戶端

    2024年02月09日
    瀏覽(38)
  • Spring Boot中的@EnableWebSocketMessageBroker注解是什么,原理,以及如何使用

    Spring Boot中的@EnableWebSocketMessageBroker注解是什么,原理,以及如何使用

    WebSocket是一種在Web瀏覽器和Web服務(wù)器之間進(jìn)行雙向通信的技術(shù)。在傳統(tǒng)的HTTP通信中,客戶端向服務(wù)器發(fā)送請(qǐng)求,服務(wù)器響應(yīng)請(qǐng)求,然后關(guān)閉連接。而在WebSocket中,客戶端和服務(wù)器之間的連接始終保持打開(kāi)狀態(tài),可以隨時(shí)互相發(fā)送消息,實(shí)現(xiàn)實(shí)時(shí)通信。 Spring Boot提供了對(duì)WebSo

    2024年02月12日
    瀏覽(28)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包