国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring Cloud Stream集成Kafka

這篇具有很好參考價值的文章主要介紹了Spring Cloud Stream集成Kafka。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Spring Cloud Stream是一個構(gòu)建消息驅(qū)動微服務(wù)的框架,抽象了MQ的使用方式, 提供統(tǒng)一的API操作。Spring Cloud Stream通過Binder(綁定器)、inputs/outputs Channel完成應(yīng)用程序和MQ的解耦。

  • Binder
    負(fù)責(zé)綁定應(yīng)用程序和MQ中間件,即指定應(yīng)用程序是和KafKa交互還是和RabbitMQ交互或者和其他的MQ中間件交互

  • inputs/outputs Channel
    inputs/outputs Channel抽象發(fā)布訂閱消息的方式,即無論是什么類型的MQ應(yīng)用程序都通過統(tǒng)一的方式發(fā)布訂閱消息

我們已經(jīng)搭建好了Kafka(參考Kafka單節(jié)點安裝),本文主要介紹一下Spring Cloud Stream與Kafka進(jìn)行集成實現(xiàn)消息的生產(chǎn)及消費。

項目創(chuàng)建

首先需要創(chuàng)建一個SpringBoot項目,命名為:spring-integration-kafka,在配置文件中導(dǎo)入相關(guān)的依賴。
項目情況為:

  • 構(gòu)建工具:Gradle
  • SpringBoot版本:2.7.5
  • SpringBoot依賴管理版本:1.0.15.RELEASE
  • SpringCloud依賴管理版本:2021.0.5

項目依賴

配置文件build.gradle.kts的關(guān)鍵配置項如下:

plugins {
    id("org.springframework.boot") version "2.7.5"
    id("io.spring.dependency-management") version "1.0.15.RELEASE"
}

apply(plugin = "org.springframework.boot")
apply(plugin = "io.spring.dependency-management")
apply(plugin = "java")


extra["springCloudVersion"] = "2021.0.5"

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

dependencies {
	implementation("org.springframework.boot:spring-boot-starter-web")
	implementation("org.springframework.boot:spring-boot-starter-actuator")
	
    implementation("org.springframework.cloud:spring-cloud-starter-bootstrap")
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
    
    implementation("io.springfox:springfox-boot-starter:3.0.0")
    implementation("com.github.xiaoymin:swagger-bootstrap-ui:1.9.6")
}

集成配置

定義配置文件application.yml,配置文件中主要配置Kafka的地址、以及Spring Colud Stream的Binder和inputs/outputs Channel,其中:kafkaChannel1用于向Kafka發(fā)送消息;kafkaChannel2用于消費Kafka的消息。

spring:
  kafka:
    bootstrap-servers: wux-labs-vm:9092  # 定義Kafka的地址
    producer:
      acks: 1
  cloud:
    stream:
      binders:
        kafkaBiner1:   # 定義一個Binder,名稱隨意
          type: kafka   # Binder的類型是 kafka
          environment:
            spring:
              kafka: ${spring.kafka}  # Binder的配置使用前面配置的Kafka的信息
      default-binder: kafkaBiner1    # 默認(rèn)Binder,是前面配置的Binder的名稱
      bindings:
        kafkaChannel1:      # 定義一個(作為outputs Channel)通道,名稱隨意,在代碼中使用該通道名稱即可
          binder: kafkaBiner1 # 使用kafkaBiner1
          destination: KafkaFirstTopic # 定義目標(biāo)Topic的名稱
        kafkaChannel2:       # 定義一個(作為inputs Channel)通道,名稱隨意,在代碼中使用該通道名稱即可
          binder: kafkaBiner1 # 使用kafkaBiner1
          destination: KafkaFirstTopic # 定義目標(biāo)Topic的名稱
          group: group0   # 作為消息的消費方,需要指定group

集成生產(chǎn)者

下面開發(fā)一個生產(chǎn)者,發(fā)送消息需要通過outputs Channel進(jìn)行,使用kafkaChannel1發(fā)送消息到Kafka。

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Api("生產(chǎn)者接口")
@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private StreamBridge bridge;

    @ApiOperation("向Kafka發(fā)送數(shù)據(jù)")
    @PostMapping("/kafka")
    public String sendToKafka(String message) {
        boolean status = bridge.send("kafkaChannel1", message);
        return "發(fā)送消息:" + message + "=====>" + status;
    }
}

集成消費者

消費消息需要通過inputs Channel進(jìn)行,定義一個Processor,指定訂閱通道為kafkaChannel2,這個通道被用于進(jìn)行消息消費,需要定義group。

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface ConsumerProcessor {
    @Input("kafkaChannel2")
    SubscribableChannel subscribableChannel();
}

啟用通道并監(jiān)聽。

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@EnableBinding(ConsumerProcessor.class)
public class ConsumerProcessorImpl {
    @StreamListener("kafkaChannel2")
    public void kafkaStreamListener(Object message) {
        System.out.println("接收到Kafka消息:" + new String((byte[]) message));
    }
}

集成驗證

生產(chǎn)者驗證

首先啟動一個Kafka自帶的消費者,監(jiān)聽KafkaFirstTopic。

springboot集成kafka stream,# SpringCloud,# Kafka,kafka,java,java-rabbitmq

接下來啟動SpringBoot項目并發(fā)送消息。在消費者那里可以看到接收到的消息。

springboot集成kafka stream,# SpringCloud,# Kafka,kafka,java,java-rabbitmq

消費者驗證

前面消息已經(jīng)發(fā)送到了Kafka的Topic了,可以看到控制臺直接打印出了監(jiān)聽到的消息。

springboot集成kafka stream,# SpringCloud,# Kafka,kafka,java,java-rabbitmq
至此,Spring Cloud Stream集成Kafka完成。文章來源地址http://www.zghlxwxcb.cn/news/detail-643671.html

到了這里,關(guān)于Spring Cloud Stream集成Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Spring cloud stream 結(jié)合 rabbitMq使用

    之前的開發(fā)主要是底層開發(fā),沒有深入涉及到消息方面?,F(xiàn)在面對的是一個這樣的場景: 假設(shè)公司項目A用了RabbitMQ,而項目B用了Kafka。這時候就會出現(xiàn)有兩個消息框架,這兩個消息框架可能編碼有所不同,且結(jié)構(gòu)也有所不同,而且之前甚至可能使用的是別的框架,造成了一個

    2024年02月04日
    瀏覽(23)
  • SpringBoot 如何使用 Spring Cloud Stream 處理事件

    SpringBoot 如何使用 Spring Cloud Stream 處理事件

    在分布式系統(tǒng)中,事件驅(qū)動架構(gòu)(Event-Driven Architecture,EDA)已經(jīng)成為一種非常流行的架構(gòu)模式。事件驅(qū)動架構(gòu)將系統(tǒng)中的各個組件連接在一起,以便它們可以相互協(xié)作,響應(yīng)事件并執(zhí)行相應(yīng)的操作。SpringBoot 也提供了一種方便的方式來處理事件——使用 Spring Cloud Stream。 Spr

    2024年02月10日
    瀏覽(28)
  • 實戰(zhàn):Spring Cloud Stream消息驅(qū)動框架整合rabbitMq

    實戰(zhàn):Spring Cloud Stream消息驅(qū)動框架整合rabbitMq

    相信很多同學(xué)都開發(fā)過WEB服務(wù),在WEB服務(wù)的開發(fā)中一般是通過緩存、隊列、讀寫分離、削峰填谷、限流降級等手段來提高服務(wù)性能和保證服務(wù)的正常投用。對于削峰填谷就不得不用到我們的MQ消息中間件,比如適用于大數(shù)據(jù)的kafka,性能較高支持事務(wù)活躍度高的rabbitmq等等,MQ的

    2024年02月08日
    瀏覽(27)
  • 《微服務(wù)實戰(zhàn)》 第十六章 Spring cloud stream應(yīng)用

    《微服務(wù)實戰(zhàn)》 第十六章 Spring cloud stream應(yīng)用

    第十六章 Spring cloud stream應(yīng)用 第十五章 RabbitMQ 延遲隊列 第十四章 RabbitMQ應(yīng)用 https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit 官方定義Spring Cloud Stream是一個構(gòu)建消息驅(qū)動微服務(wù)的框架。應(yīng)用程序通過inputs或者outputs來與Spring Cloud Stream中binder對象交互。通過我們配置來bindin

    2024年02月06日
    瀏覽(19)
  • Spring Cloud Stream 4.0.4 rabbitmq 發(fā)送消息多function

    Spring Cloud Stream 4.0.4 rabbitmq 發(fā)送消息多function

    注意當(dāng)多個消費者時,需要添加配置項:spring.cloud.function.definition 啟動日志 交換機(jī)名稱對應(yīng): spring.cloud.stream.bindings.demo-in-0.destination配置項的值 隊列名稱是交換機(jī)名稱+分組名 http://localhost:8080/sendMsg?delay=10000name=zhangsan 問題總結(jié) 問題一 解決辦法: 查看配置是否正確: spring

    2024年02月19日
    瀏覽(22)
  • Spring Cloud Stream解密:流式數(shù)據(jù)在微服務(wù)中的魔力

    Spring Cloud Stream解密:流式數(shù)據(jù)在微服務(wù)中的魔力

    歡迎來到我的博客,代碼的世界里,每一行都是一個故事 在微服務(wù)的大舞臺上,數(shù)據(jù)流就像一曲美妙的交響樂,而Spring Cloud Stream正是指揮家,將音符有序地傳遞給每個微服務(wù)。在這篇文章中,我們將揭開Spring Cloud Stream的神秘面紗,一起探索在微服務(wù)體系結(jié)構(gòu)中如何通過流式

    2024年02月20日
    瀏覽(23)
  • 【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驅(qū)動架構(gòu)(MDA)解析,實現(xiàn)異步處理與解耦合!

    【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驅(qū)動架構(gòu)(MDA)解析,實現(xiàn)異步處理與解耦合!

    ???? 歡迎光臨,終于等到你啦 ???? ??我是 蘇澤 ,一位對技術(shù)充滿熱情的探索者和分享者。???? ??持續(xù)更新的專欄 《Spring 狂野之旅:從入門到入魔》 ?? 本專欄帶你從Spring入門到入魔 ? 這是蘇澤的個人主頁可以看到我其他的內(nèi)容哦???? 努力的蘇澤 http://suzee.blog.

    2024年03月10日
    瀏覽(29)
  • 【微服務(wù)學(xué)習(xí)】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    【微服務(wù)學(xué)習(xí)】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)

    @[TOC](【微服務(wù)學(xué)習(xí)】spring-cloud-starter-stream 4.x 版本的使用(rocketmq 版)) 2.1 消息發(fā)送者 2.1.1 使用 StreamBridge streamBridge; 往指定信道發(fā)送消息 2.1.2 通過隱式綁定信道, 注冊 Bean 發(fā)送消息 2.2 消息接收者 注意: 多個方法之間可以使用 “|” 間隔, 但是綁定時 多個需要按順序?qū)? 其中

    2024年02月03日
    瀏覽(25)
  • SpringCloud(17)之SpringCloud Stream

    SpringCloud(17)之SpringCloud Stream

    ????????Spring Cloud Stream是一個框架,用于構(gòu)建與共享消息系統(tǒng)連接的高度可擴(kuò)展的事件驅(qū)動微服務(wù)。該框架提供了一個靈活的編程模型,該模型建立在已經(jīng)建立和熟悉的Spring習(xí)慣用法和最佳實踐之上,包括對持久發(fā)布/子語義、使用者組和有狀態(tài)分區(qū)的支持。?? ??????

    2024年03月12日
    瀏覽(25)
  • 消息驅(qū)動 —— SpringCloud Stream

    消息驅(qū)動 —— SpringCloud Stream

    Spring Cloud Stream 是用于構(gòu)建消息驅(qū)動的微服務(wù)應(yīng)用程序的框架,提供了多種中間件的合理配置 Spring Cloud Stream 包含以下核心概念: Destination Binders:目標(biāo)綁定器,目標(biāo)指的是 Kafka 或者 RabbitMQ,綁定器就是封裝了目標(biāo)中間件的包,如果操作的是 Kafka,就使用 Kafka Binder,如果操作

    2024年02月08日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包