国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱

這篇具有很好參考價值的文章主要介紹了使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一,新建Spring Boot

最近忙著搞低代碼開發(fā),好久沒新建spring項目了,結(jié)果今天心血來潮準備建個springboot項目
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
注意Type選Maven,java選8,其他默認
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列

1,Maven配置

點下一步后完成就新建了一個spring boot項目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉庫,不然可能依賴下載不下來
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列

2,無法識別為SpringBoot項目

在maven配置沒問題的前提下,IDEA無法識別這是一個Spring Boot項目,倒騰半天,終于發(fā)現(xiàn)問題原因所在=======>是Maven版本太高的原因
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
把.mvn/wrapper目錄下的maven-wrapper.properties文件第一行的版本號降低,比如說降為3.5.4,然后重新點下Maven的同步按鈕
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列

3,無效的源發(fā)行版

接下來運行項目報錯:java: 無效的源發(fā)行版: 14
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
修改pom.xml中java.version值為8,原來是17

	<properties>
        <java.version>17</java.version>
    </properties>

4,無法訪問SpringApplication

繼續(xù)運行,繼續(xù)報錯使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
降低spring-boot-starter-parent版本,原來是3.1.3,改為2.7.2

5,運行直接Finish

繼續(xù)運行,沒報錯,服務(wù)直接Finished
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
需要添加web依賴

 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

6,服務(wù)運行成功

終于,一個空的spring boot項目成功跑起來了,喜極而泣
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列

二,安裝啟動Kafka

1,下載

官網(wǎng)=>https://kafka.apache.org/downloads,下載最新版的kafka,目前是3.5.1
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列

2,配置

解壓到D盤Config目錄下即完成安裝,目錄為D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) zookeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,啟動

先啟動zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties	

再啟動kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的時候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具體信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,創(chuàng)建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生產(chǎn)消費消息

1,加入依賴

 		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2,yam配置文件

application.yaml

spring:
  profiles:
    active: dev

application-dev.yaml

server:
  port: 8082
  servlet:
    context-path: /test-kafka

spring:
  cache:
    type: ehcache
    config: classpath:ehcache.xml
  jpa:
    database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      retries: 10

3,報錯enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
這個錯誤我本地測試下來是因為沒把賬號密碼配置這塊注釋掉
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列

4,生產(chǎn)者生產(chǎn)消息

@Slf4j
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(String content) {
        String topic = "test_topic";
        kafkaTemplate.send(topic, content).addCallback(success -> {
            String topic = success.getRecordMetadata().topic();
            int partition = success.getRecordMetadata().partition();
            long offset = success.getRecordMetadata().offset();
            log.info("發(fā)送成功:主題:{},分區(qū):{},偏移量:{}",topic,partition,offset);
        }, failure -> {
            log.info("發(fā)送失敗:{}",failure.getMessage());
        });
        return "發(fā)送成功";
    }
}

5,訂閱和消費消息

一,訂閱主題
1,獲取消費者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * kafka消費者配置
 * @author liuxunming
 */
@Configuration
@Component
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    public KafkaConsumer<String, String> createConsumer() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        return consumer;
    }

}

2,訂閱topic

 		KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();
        consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord<String, String> record : records) {
		String key = record.key();
		String value = record.value();
		log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}

4,消費位移,釋放資源

// 提交消費位移
consumer.commitSync();
// 關(guān)閉消費者以釋放資源
consumer.close();

二,點對點模式

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test_topic"})
    public void handlerMsg(String content) {
        log.info("接收到消息:消息值:{} ",content);
    }
}

6,接口

@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestParam String content) {
        kafkaProducer.sendMessage(content);
        return "ok";
    }
}

7,測試結(jié)果

使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列
接收到消息
使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱,后端,spring boot,kafka,后端,訂閱,消息隊列文章來源地址http://www.zghlxwxcb.cn/news/detail-682415.html

四,參考博文

  1. 解決IDEA無法識別SpringBoot項目
  2. SpringBoot從入門到精通(十二)SpringBoot集成Kafka
  3. Kafka的下載安裝以及使用
  4. Kafka消息消費流程詳解
  5. Kafka之Consumer使用與基本原理

到了這里,關(guān)于使用Spring Boot和Kafka實現(xiàn)消息發(fā)送和訂閱的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【Spring Boot】Spring Boot 集成 RocketMQ 實現(xiàn)簡單的消息發(fā)送和消費

    【Spring Boot】Spring Boot 集成 RocketMQ 實現(xiàn)簡單的消息發(fā)送和消費

    本文主要有以下內(nèi)容: 簡單消息的發(fā)送 順序消息的發(fā)送 RocketMQTemplate的API介紹 環(huán)境搭建: RocketMQ的安裝教程:在官網(wǎng)上下載bin文件,解壓到本地,并配置環(huán)境變量,如下圖所示: 在 Spring boot 項目中引入 RocketMQ 依賴: 在application.yml增加相關(guān)配置: 在 Spring Boot 中使用RocketM

    2024年02月14日
    瀏覽(93)
  • spring boot集成mqtt協(xié)議發(fā)送和訂閱數(shù)據(jù)

    maven的pom.xml引入包 mqtt.yml配置文件 初始化MQTT配置bean mqtt發(fā)送數(shù)據(jù)網(wǎng)關(guān)配置 發(fā)送數(shù)據(jù)到mqtt偽代碼 參考鏈接: https://blog.csdn.net/sinat_21184471/article/details/87186186 https://blog.csdn.net/qq_29467891/article/details/107043225?utm_source=app https://blog.csdn.net/myinsert/article/details/107715538

    2024年02月11日
    瀏覽(47)
  • 【Spring Boot 3】【Redis】消息發(fā)布及訂閱

    軟件開發(fā)是一門實踐性科學(xué),對大多數(shù)人來說,學(xué)習(xí)一種新技術(shù)不是一開始就去深究其原理,而是先從做出一個可工作的DEMO入手。但在我個人學(xué)習(xí)和工作經(jīng)歷中,每次學(xué)習(xí)新技術(shù)總是要花費或多或少的時間、檢索不止一篇資料才能得出一個可工作的DEMO,這占用了我大量的時

    2024年01月21日
    瀏覽(22)
  • Spring RabbitMQ那些事(1-交換機配置&消息發(fā)送訂閱實操)

    Spring RabbitMQ那些事(1-交換機配置&消息發(fā)送訂閱實操)

    在上一節(jié) RabbitMQ中的核心概念和交換機類型 中我們介紹了RabbitMQ中的一些核心概念,尤其是各種交換機的類型,接下來我們將具體講解各種交換機的配置和消息訂閱實操。 我們先上應(yīng)用啟動配置文件 application.yml ,如下: 備注:這里我們指定了 RabbitListenerContainerFactory 的類型

    2024年01月17日
    瀏覽(23)
  • 小程序?qū)崿F(xiàn)消息訂閱及發(fā)送

    小程序?qū)崿F(xiàn)消息訂閱及發(fā)送

    在我們的家政服務(wù)小程序中,用戶可以新增預(yù)約。一般的場景是新增預(yù)約的時候提醒用戶接收通知,在狀態(tài)變更的時候我們來發(fā)送訂閱消息。本篇我們來講解一下小程序訂閱消息功能的開發(fā)。 要想發(fā)送訂閱消息,首先需要選用一個消息模板。打開你自己的小程序后臺,點擊訂

    2024年02月07日
    瀏覽(16)
  • java后端使用websocket實現(xiàn)與客戶端之間接收及發(fā)送消息

    客戶端請求websocket接口,連接通道=》我這邊業(yè)務(wù)成功客戶端發(fā)消息=》客戶端自動刷新。 接口:ws://localhost:8080/websocket/xx 經(jīng)測試,成功 如果是線上服務(wù)器連接,則需要在nginx里配置websocket相關(guān)內(nèi)容,再重啟nginx,代碼如下 本地連接的時候用的是ws://,因為是http鏈接,但是如果是

    2024年02月16日
    瀏覽(26)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 來配置發(fā)送和消費 RocketMQ 消息

    ? ? ? ?本文解析將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個 spring-boot-starter 工具包來配置,發(fā)送和消費 RocketMQ 消息。 添加maven依賴: 修改application.properties 注意: 請將上述示例配置中的 127.0.0.1:9876 替換

    2024年03月22日
    瀏覽(28)
  • Spring Boot進階(62):Redis魔法:用發(fā)布訂閱功能打造高效消息隊列!

    Spring Boot進階(62):Redis魔法:用發(fā)布訂閱功能打造高效消息隊列!

    ? ? ? ? 話說,玩過MQ的同學(xué)可能都知道【發(fā)布訂閱】模式,不就是一種消息傳遞方式嘛;如果沒玩過,那也不打緊,下文我會簡單做個科普。但是對于Redis如何實現(xiàn)MQ的【發(fā)布訂閱】功能?這才是問題的關(guān)鍵,有的同學(xué)就說“壓根沒玩過呀!不造” ,哈哈,bug菌既然敢寫便有

    2024年02月09日
    瀏覽(27)
  • Spring Boot與Apache Kafka實現(xiàn)高吞吐量消息處理:解決大規(guī)模數(shù)據(jù)處理問題

    現(xiàn)代數(shù)據(jù)量越來越龐大對數(shù)據(jù)處理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息隊列之一。Spring Boot是現(xiàn)代Java應(yīng)用程序快速開發(fā)的首選框架。綜合使用Spring Boot和Apache Kafka可以實現(xiàn)高吞吐量消息處理。 Apache Kafka采用分布式發(fā)布-訂閱模式具有高度的可擴展性和可

    2024年02月05日
    瀏覽(24)
  • Spring Boot 中的 RabbitMQ 消息發(fā)送配置

    Spring Boot 中的 RabbitMQ 消息發(fā)送配置

    RabbitMQ 是一個開源的消息代理系統(tǒng),它實現(xiàn)了 AMQP(高級消息隊列協(xié)議)標準,并支持多種消息傳輸協(xié)議。它具有高可用性、可擴展性和可靠性,廣泛應(yīng)用于分布式系統(tǒng)、微服務(wù)架構(gòu)、異步任務(wù)處理、日志收集等場景。 RabbitMQ 的核心概念包括: Producer:消息生產(chǎn)者,負責(zé)將消

    2024年02月07日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包