国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱

這篇具有很好參考價(jià)值的文章主要介紹了【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一,新建Spring Boot

最近忙著搞低代碼開發(fā),好久沒(méi)新建spring項(xiàng)目了,結(jié)果今天心血來(lái)潮準(zhǔn)備建個(gè)springboot項(xiàng)目
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
注意Type選Maven,java選8,其他默認(rèn)
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列

1,Maven配置

點(diǎn)下一步后完成就新建了一個(gè)spring boot項(xiàng)目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉(cāng)庫(kù),不然可能依賴下載不下來(lái)
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列

2,無(wú)法識(shí)別為SpringBoot項(xiàng)目

在maven配置沒(méi)問(wèn)題的前提下,IDEA無(wú)法識(shí)別這是一個(gè)Spring Boot項(xiàng)目,倒騰半天,終于發(fā)現(xiàn)問(wèn)題原因所在=======>是Maven版本太高的原因
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
把.mvn/wrapper目錄下的maven-wrapper.properties文件第一行的版本號(hào)降低,比如說(shuō)降為3.5.4,然后重新點(diǎn)下Maven的同步按鈕
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列

3,無(wú)效的源發(fā)行版

接下來(lái)運(yùn)行項(xiàng)目報(bào)錯(cuò):java: 無(wú)效的源發(fā)行版: 14
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
修改pom.xml中java.version值為8,原來(lái)是17

	<properties>
        <java.version>17</java.version>
    </properties>

4,無(wú)法訪問(wèn)SpringApplication

繼續(xù)運(yùn)行,繼續(xù)報(bào)錯(cuò)【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
降低spring-boot-starter-parent版本,原來(lái)是3.1.3,改為2.7.2

5,運(yùn)行直接Finish

繼續(xù)運(yùn)行,沒(méi)報(bào)錯(cuò),服務(wù)直接Finished
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
需要添加web依賴

 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

6,服務(wù)運(yùn)行成功

終于,一個(gè)空的spring boot項(xiàng)目成功跑起來(lái)了,喜極而泣
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列

二,安裝啟動(dòng)Kafka

1,下載

官網(wǎng)=>https://kafka.apache.org/downloads,下載最新版的kafka,目前是3.5.1
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列

2,配置

解壓到D盤Config目錄下即完成安裝,目錄為D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) zookeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,啟動(dòng)

先啟動(dòng)zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties	

再啟動(dòng)kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的時(shí)候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具體信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,創(chuàng)建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生產(chǎn)消費(fèi)消息

1,加入依賴

 		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2,yam配置文件

application.yaml

spring:
  profiles:
    active: dev

application-dev.yaml

server:
  port: 8082
  servlet:
    context-path: /test-kafka

spring:
  cache:
    type: ehcache
    config: classpath:ehcache.xml
  jpa:
    database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      retries: 10

3,報(bào)錯(cuò)enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
這個(gè)錯(cuò)誤我本地測(cè)試下來(lái)是因?yàn)闆](méi)把賬號(hào)密碼配置這塊注釋掉
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列

4,生產(chǎn)者生產(chǎn)消息

@Slf4j
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(String content) {
        String topic = "test_topic";
        kafkaTemplate.send(topic, content).addCallback(success -> {
            String topic = success.getRecordMetadata().topic();
            int partition = success.getRecordMetadata().partition();
            long offset = success.getRecordMetadata().offset();
            log.info("發(fā)送成功:主題:{},分區(qū):{},偏移量:{}",topic,partition,offset);
        }, failure -> {
            log.info("發(fā)送失?。簕}",failure.getMessage());
        });
        return "發(fā)送成功";
    }
}

5,訂閱和消費(fèi)消息

一,訂閱主題
1,獲取消費(fèi)者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * kafka消費(fèi)者配置
 * @author liuxunming
 */
@Configuration
@Component
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    public KafkaConsumer<String, String> createConsumer() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        return consumer;
    }

}

2,訂閱topic

 		KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();
        consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord<String, String> record : records) {
		String key = record.key();
		String value = record.value();
		log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}

4,消費(fèi)位移,釋放資源

// 提交消費(fèi)位移
consumer.commitSync();
// 關(guān)閉消費(fèi)者以釋放資源
consumer.close();

二,點(diǎn)對(duì)點(diǎn)模式

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test_topic"})
    public void handlerMsg(String content) {
        log.info("接收到消息:消息值:{} ",content);
    }
}

6,接口

@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestParam String content) {
        kafkaProducer.sendMessage(content);
        return "ok";
    }
}

7,測(cè)試結(jié)果

【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列
接收到消息
【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊(duì)列文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-708058.html

四,參考博文

  1. 解決IDEA無(wú)法識(shí)別SpringBoot項(xiàng)目
  2. SpringBoot從入門到精通(十二)SpringBoot集成Kafka
  3. Kafka的下載安裝以及使用
  4. Kafka消息消費(fèi)流程詳解
  5. Kafka之Consumer使用與基本原理

到了這里,關(guān)于【Spring Boot】集成Kafka實(shí)現(xiàn)消息發(fā)送和訂閱的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 實(shí)戰(zhàn)指南:使用Spring Boot實(shí)現(xiàn)消息的發(fā)送和接收

    當(dāng)涉及到消息發(fā)送和接收的場(chǎng)景時(shí),可以使用Spring Boot和消息中間件RabbitMQ來(lái)實(shí)現(xiàn)。下面是一個(gè)簡(jiǎn)單的示例代碼,展示了如何在Spring Boot應(yīng)用程序中創(chuàng)建消息發(fā)送者和接收者,并發(fā)送和接收一條消息。 首先,你需要進(jìn)行以下準(zhǔn)備工作 確保你已經(jīng)安裝了Java和Maven,并設(shè)置好相應(yīng)

    2024年02月11日
    瀏覽(26)
  • Spring Boot集成WebSocket實(shí)現(xiàn)消息推送

    Spring Boot集成WebSocket實(shí)現(xiàn)消息推送

    項(xiàng)目中經(jīng)常會(huì)用到消息推送功能,關(guān)于推送技術(shù)的實(shí)現(xiàn),我們通常會(huì)聯(lián)想到輪詢、comet長(zhǎng)連接技術(shù),雖然這些技術(shù)能夠?qū)崿F(xiàn),但是需要反復(fù)連接,對(duì)于服務(wù)資源消耗過(guò)大,隨著技術(shù)的發(fā)展,HtML5定義了WebSocket協(xié)議,能更好的節(jié)省服務(wù)器資源和帶寬,并且能夠更實(shí)時(shí)地進(jìn)行通訊。

    2023年04月08日
    瀏覽(20)
  • 使用 Spring Boot 整合 Kafka:實(shí)現(xiàn)高效的消息傳遞

    Kafka 是一種流處理平臺(tái),用于在分布式系統(tǒng)中處理高吞吐量的數(shù)據(jù)流。它是一種基于發(fā)布訂閱模式的消息系統(tǒng),能夠處理來(lái)自多個(gè)應(yīng)用程序的數(shù)據(jù)流。Kafka 具有高度的可擴(kuò)展性、可靠性和性能,使得它成為處理大數(shù)據(jù)的流行選擇。 Spring Boot 是一種開源框架,用于簡(jiǎn)化 Java 應(yīng)用

    2024年02月14日
    瀏覽(22)
  • Spring Boot 集成 WebSocket 實(shí)現(xiàn)服務(wù)端推送消息到客戶端

    Spring Boot 集成 WebSocket 實(shí)現(xiàn)服務(wù)端推送消息到客戶端

    ? ? ? 假設(shè)有這樣一個(gè)場(chǎng)景:服務(wù)端的資源經(jīng)常在更新,客戶端需要盡量及時(shí)地了解到這些更新發(fā)生后展示給用戶,如果是 HTTP 1.1,通常會(huì)開啟 ajax 請(qǐng)求詢問(wèn)服務(wù)端是否有更新,通過(guò)定時(shí)器反復(fù)輪詢服務(wù)端響應(yīng)的資源是否有更新。 ? ? ? ? ? ? ? ?? ? ? ? ?在長(zhǎng)時(shí)間不更新

    2024年02月16日
    瀏覽(86)
  • 【Spring Boot 3】【Redis】消息發(fā)布及訂閱

    軟件開發(fā)是一門實(shí)踐性科學(xué),對(duì)大多數(shù)人來(lái)說(shuō),學(xué)習(xí)一種新技術(shù)不是一開始就去深究其原理,而是先從做出一個(gè)可工作的DEMO入手。但在我個(gè)人學(xué)習(xí)和工作經(jīng)歷中,每次學(xué)習(xí)新技術(shù)總是要花費(fèi)或多或少的時(shí)間、檢索不止一篇資料才能得出一個(gè)可工作的DEMO,這占用了我大量的時(shí)

    2024年01月21日
    瀏覽(22)
  • Spring RabbitMQ那些事(1-交換機(jī)配置&消息發(fā)送訂閱實(shí)操)

    Spring RabbitMQ那些事(1-交換機(jī)配置&消息發(fā)送訂閱實(shí)操)

    在上一節(jié) RabbitMQ中的核心概念和交換機(jī)類型 中我們介紹了RabbitMQ中的一些核心概念,尤其是各種交換機(jī)的類型,接下來(lái)我們將具體講解各種交換機(jī)的配置和消息訂閱實(shí)操。 我們先上應(yīng)用啟動(dòng)配置文件 application.yml ,如下: 備注:這里我們指定了 RabbitListenerContainerFactory 的類型

    2024年01月17日
    瀏覽(23)
  • Spring Boot進(jìn)階(48):【實(shí)戰(zhàn)教程】SpringBoot集成WebSocket輕松實(shí)現(xiàn)實(shí)時(shí)消息推送

    Spring Boot進(jìn)階(48):【實(shí)戰(zhàn)教程】SpringBoot集成WebSocket輕松實(shí)現(xiàn)實(shí)時(shí)消息推送

    ????????WebSocket是一種新型的通信協(xié)議,它可以在客戶端與服務(wù)器端之間實(shí)現(xiàn)雙向通信,具有低延遲、高效性等特點(diǎn),適用于實(shí)時(shí)通信場(chǎng)景。在SpringBoot應(yīng)用中,集成WebSocket可以方便地實(shí)現(xiàn)實(shí)時(shí)通信功能,如即時(shí)聊天、實(shí)時(shí)數(shù)據(jù)傳輸?shù)取?????????本文將介紹如何在Sprin

    2024年02月09日
    瀏覽(97)
  • 小程序?qū)崿F(xiàn)消息訂閱及發(fā)送

    小程序?qū)崿F(xiàn)消息訂閱及發(fā)送

    在我們的家政服務(wù)小程序中,用戶可以新增預(yù)約。一般的場(chǎng)景是新增預(yù)約的時(shí)候提醒用戶接收通知,在狀態(tài)變更的時(shí)候我們來(lái)發(fā)送訂閱消息。本篇我們來(lái)講解一下小程序訂閱消息功能的開發(fā)。 要想發(fā)送訂閱消息,首先需要選用一個(gè)消息模板。打開你自己的小程序后臺(tái),點(diǎn)擊訂

    2024年02月07日
    瀏覽(16)
  • springboot集成kafka詳細(xì)步驟(發(fā)送及監(jiān)聽消息示例)

    1、本機(jī)的kafka環(huán)境配置,不再贅述 2、添加?pom 文件 3、配置application.yml 4、復(fù)寫kafka的相關(guān)配置類:生產(chǎn)、消費(fèi)相關(guān)配置 5、生產(chǎn)、消費(fèi)的偽代碼 6、測(cè)試消息發(fā)送 經(jīng)過(guò)測(cè)試!

    2024年02月11日
    瀏覽(20)
  • Spring Boot進(jìn)階(62):Redis魔法:用發(fā)布訂閱功能打造高效消息隊(duì)列!

    Spring Boot進(jìn)階(62):Redis魔法:用發(fā)布訂閱功能打造高效消息隊(duì)列!

    ? ? ? ? 話說(shuō),玩過(guò)MQ的同學(xué)可能都知道【發(fā)布訂閱】模式,不就是一種消息傳遞方式嘛;如果沒(méi)玩過(guò),那也不打緊,下文我會(huì)簡(jiǎn)單做個(gè)科普。但是對(duì)于Redis如何實(shí)現(xiàn)MQ的【發(fā)布訂閱】功能?這才是問(wèn)題的關(guān)鍵,有的同學(xué)就說(shuō)“壓根沒(méi)玩過(guò)呀!不造” ,哈哈,bug菌既然敢寫便有

    2024年02月09日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包