国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列

這篇具有很好參考價(jià)值的文章主要介紹了SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列

上一篇直通車(chē)

SpringBoot整合SpringCloudStream3.1+版本Kafka

實(shí)現(xiàn)死信隊(duì)列步驟

  1. 添加死信隊(duì)列配置文件,添加對(duì)應(yīng)channel
  2. 通道綁定配置對(duì)應(yīng)的channel位置添加重試配置

結(jié)果

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列
SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列
SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列

配置文件

Kafka基本配置(application-mq.yml)

server:
  port: 7105
spring:
  application:
	name: betrice-message-queue
  config:
	import:
	- classpath:application-bindings.yml
  cloud:
	stream:
	  kafka:
		binder:
		  brokers: localhost:9092
		  configuration:
			key-serializer: org.apache.kafka.common.serialization.StringSerializer
			value-serializer: org.apache.kafka.common.serialization.StringSerializer
		  consumer-properties:
			enable.auto.commit: false
	  binders:
		betrice-kafka:
		  type: kafka
		  environment:
			spring.kafka:
		  bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

創(chuàng)建死信隊(duì)列配置文件(application-dql.yml)

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列

spring:
  cloud:
	stream:
	  kafka:
		bindings:
		  dqlTransfer-in-0:
			consumer:
			  # When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.
			  # messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].
			  # By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
			  enableDlq: true
			  dlqName: Evad05-message-dlq
			  keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
#              valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
			  valueSerde: com.devilvan.pojo.Evad05MessageSerde
			  autoCommitOnError: true
			  autoCommitOffset: true

注意:這里的valueSerde使用了對(duì)象類(lèi)型,需要搭配application/json使用,consumer接收到消息后會(huì)轉(zhuǎn)化為json字符串

通道綁定文件添加配置(application-bindings.yml)

channel對(duì)應(yīng)上方配置文件的dqlTransfer-in-0

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列

spring:
  cloud:
	stream:
	  betrice-default-binder: betrice-kafka
	  function:
		# 聲明兩個(gè)channel,transfer接收生產(chǎn)者的消息,處理完后給sink
		definition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumer
	  bindings:
		# 添加生產(chǎn)者bindiing,輸出到destination對(duì)應(yīng)的topic
		dqlTransfer-in-0:
		  destination: Evad10
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  group: evad05DlqConsumer # 使用死信隊(duì)列必須要有g(shù)roup
		  content-type: application/json
		  consumer:
			maxAttempts: 2 # 當(dāng)消息消費(fèi)失敗時(shí),嘗試消費(fèi)該消息的最大次數(shù)(消息消費(fèi)失敗后,發(fā)布者會(huì)重新投遞)。默認(rèn)3
			backOffInitialInterval: 1000 # 消息消費(fèi)失敗后重試消費(fèi)消息的初始化間隔時(shí)間。默認(rèn)1s,即第一次重試消費(fèi)會(huì)在1s后進(jìn)行
			backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時(shí)間的倍數(shù)。默認(rèn)2,即第二次是第一次間隔時(shí)間的2倍,第三次是第二次的2倍
			backOffMaxInterval: 10000 # 下一次嘗試重試的最大時(shí)間間隔,默認(rèn)為10000ms,即10s。
		dqlTransfer-out-0:
		  destination: Evad10
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain
		# 消費(fèi)死信隊(duì)列中的消息
		evad05DlqConsumer-in-0:
		  destination: Evad05-message-dlq
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain

Controller

發(fā)送消息并將消息引入死信隊(duì)列

@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
	@Resource(name = "streamBridgeUtils")
	private StreamBridge streamBridge;

	@PostMapping("streamSend")
	public void streamSend(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("發(fā)送消息:" + message);
		} catch (Exception e) {
			log.error("異常消息:" + e);
		}
	}

	@PostMapping("streamSendDql")
	public void streamSendDql(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("發(fā)送消息:" + message);
		} catch (Exception e) {
			log.error("異常消息:" + e);
		}
	}

	@PostMapping("streamSendJsonDql")
	public void streamSendJsonDql(String topic) {
		try {
			Evad05MessageSerde message = new Evad05MessageSerde();
			message.setData("evad05 test dql");
			message.setCount(1);
			streamBridge.send(topic, message);
			log.info("發(fā)送消息:" + message);
		} catch (Exception e) {
			log.error("異常消息:" + e);
		}
	}
}

Channel

這里使用了transfer通道,消息從Evad10(topic)傳來(lái),經(jīng)過(guò)transfer()方法后拋出異常,隨后進(jìn)入對(duì)應(yīng)的死信隊(duì)列

@Configuration
public class BetriceMqSubChannel {
	@Bean
	public Function<String, String> dqlTransfer() {
		return message -> {
			System.out.println("transfer: " + message);
			throw new RuntimeException("死信隊(duì)列測(cè)試!");
		};
	}

	@Bean
	public Consumer<String> evad05DlqConsumer() {
		return message -> {
			System.out.println("Topic: evad05 Dlq Consumer: " + message);
		};
	}
}

將自定義序列化類(lèi)型轉(zhuǎn)換為JSON消息

步驟

1. 通道綁定文件(application-bindings.yml)的valueSerde屬性添加自定義的序列化

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列

2. BetriceMqController中封裝該自定義類(lèi)型的對(duì)象,并作為消息發(fā)送

@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
	try {
		Evad05MessageSerde message = new Evad05MessageSerde();
		message.setData("evad05 test dql");
		message.setCount(1);
		streamBridge.send(topic, message);
		log.info("發(fā)送消息:" + message);
	} catch (Exception e) {
		log.error("異常消息:" + e);
	}
}

3. channel(BetriceMqSubChannel)接收到該消息并反序列化

@Bean
public Consumer<String> evad05DlqConsumer() {
	return message -> {
		System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));
	};
}

4. 結(jié)果

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列
SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列,SpringBoot,SpringCloud,spring boot,kafka,死信隊(duì)列

參考網(wǎng)址

Kafka 消費(fèi)端消費(fèi)重試和死信隊(duì)列 - Java小強(qiáng)技術(shù)博客 (javacui.com)
spring cloud stream kafka rabbit 實(shí)現(xiàn)死信隊(duì)列_spring cloud stream kafka 死信隊(duì)列_it噩夢(mèng)的博客-CSDN博客文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-601023.html

到了這里,關(guān)于SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • springboot:整合Kafka

    springboot:整合Kafka

    依賴(lài) yaml配置 簡(jiǎn)單demo 下面示例創(chuàng)建了一個(gè)生產(chǎn)者,發(fā)送消息到topic1,消費(fèi)者監(jiān)聽(tīng)topic1消費(fèi)消息。監(jiān)聽(tīng)器用@KafkaListener注解,topics表示監(jiān)聽(tīng)的topic,支持同時(shí)監(jiān)聽(tīng)多個(gè),用英文逗號(hào)分隔。 KafkaTemplate調(diào)用send時(shí) 默認(rèn)采用異步發(fā)送 ,如果需要同步獲取發(fā)送結(jié)果,調(diào)用get方法 帶回調(diào)

    2024年02月08日
    瀏覽(21)
  • springboot整合kafka-筆記

    這里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 測(cè)試發(fā)送kafka消息-控制臺(tái)日志

    2024年02月12日
    瀏覽(27)
  • SpringBoot整合Kafka

    SpringBoot整合Kafka

    官網(wǎng):Apache Kafka cmd進(jìn)入到kafka安裝目錄: 1:cmd啟動(dòng)zookeeer .binwindowszookeeper-server-start.bat .configzookeeper.properties 2:cmd啟動(dòng)kafka server .binwindowszookeeper-server-start.bat .configzookeeper.properties 3:使用cmd窗口啟動(dòng)一個(gè)生產(chǎn)者命令: .binwindowskafka-console-producer.bat --bootstrap-server local

    2024年01月20日
    瀏覽(21)
  • 【SpringBoot系列】SpringBoot整合Kafka(含源碼)

    【SpringBoot系列】SpringBoot整合Kafka(含源碼)

    前言 在現(xiàn)代的微服務(wù)架構(gòu)中,消息隊(duì)列已經(jīng)成為了一個(gè)不可或缺的組件。 它能夠幫助我們?cè)诓煌姆?wù)之間傳遞消息,并且能夠確保這些消息不會(huì)丟失。 在眾多的消息隊(duì)列中,Kafka 是一個(gè)非常出色的選擇。 它能夠處理大量的實(shí)時(shí)數(shù)據(jù),并且提供了強(qiáng)大的持久化能力。 在本

    2024年02月05日
    瀏覽(25)
  • 什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    什么是kafka,如何學(xué)習(xí)kafka,整合SpringBoot

    目錄 一、什么是Kafka,如何學(xué)習(xí) 二、如何整合SpringBoot 三、Kafka的優(yōu)勢(shì) ? Kafka是一種分布式的消息隊(duì)列系統(tǒng),它可以用于處理大量實(shí)時(shí)數(shù)據(jù)流 。學(xué)習(xí)Kafka需要掌握如何安裝、配置和運(yùn)行Kafka集群,以及如何使用Kafka API編寫(xiě)生產(chǎn)者和消費(fèi)者代碼來(lái)讀寫(xiě)數(shù)據(jù)。此外,還需要了解Ka

    2024年02月10日
    瀏覽(26)
  • Kafka 基礎(chǔ)整理、 Springboot 簡(jiǎn)單整合

    Kafka 基礎(chǔ)整理、 Springboot 簡(jiǎn)單整合

    定義: Kafka 是一個(gè)分布式的基于發(fā)布/訂閱默認(rèn)的消息隊(duì)列 是一個(gè)開(kāi)源的分布式事件流平臺(tái),被常用用于數(shù)據(jù)管道、流分析、數(shù)據(jù)集成、關(guān)鍵任務(wù)應(yīng)用 消費(fèi)模式: 點(diǎn)對(duì)點(diǎn)模式 (少用) 消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除消息 發(fā)布/訂閱模式 生產(chǎn)者推送消息到隊(duì)列,都消費(fèi)者

    2024年02月03日
    瀏覽(21)
  • Kafka入門(mén)(安裝和SpringBoot整合)

    Kafka入門(mén)(安裝和SpringBoot整合)

    app-tier:網(wǎng)絡(luò)名稱(chēng) –driver:網(wǎng)絡(luò)類(lèi)型為bridge Kafka依賴(lài)zookeeper所以先安裝zookeeper -p:設(shè)置映射端口(默認(rèn)2181) -d:后臺(tái)啟動(dòng) 安裝并運(yùn)行Kafka, –name:容器名稱(chēng) -p:設(shè)置映射端口(默認(rèn)9092 ) -d:后臺(tái)啟動(dòng) ALLOW_PLAINTEXT_LISTENER任何人可以訪問(wèn) KAFKA_CFG_ZOOKEEPER_CONNECT鏈接的zookeeper K

    2024年02月11日
    瀏覽(21)
  • 實(shí)戰(zhàn):徹底搞定 SpringBoot 整合 Kafka

    kafka是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個(gè)項(xiàng)目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項(xiàng)目里快速集成kafka。 除了簡(jiǎn)單的收發(fā)消息外,Spring-kafka還提供了很多高級(jí)功能,下面我們就來(lái)一一探秘這些用法。

    2024年02月10日
    瀏覽(26)
  • springboot整合ELK+kafka采集日志

    springboot整合ELK+kafka采集日志

    在分布式的項(xiàng)目中,各功能模塊產(chǎn)生的日志比較分散,同時(shí)為滿足性能要求,同一個(gè)微服務(wù)會(huì)集群化部署,當(dāng)某一次業(yè)務(wù)報(bào)錯(cuò)后,如果不能確定產(chǎn)生的節(jié)點(diǎn),那么只能逐個(gè)節(jié)點(diǎn)去查看日志文件;logback中RollingFileAppender,ConsoleAppender這類(lèi)同步化記錄器也降低系統(tǒng)性能,綜上一些

    2024年02月15日
    瀏覽(23)
  • springboot整合kafka多數(shù)據(jù)源

    springboot整合kafka多數(shù)據(jù)源

    在很多與第三方公司對(duì)接的時(shí)候,或者處在不同的網(wǎng)絡(luò)環(huán)境下,比如在互聯(lián)網(wǎng)和政務(wù)外網(wǎng)的分布部署服務(wù)的時(shí)候,我們需要對(duì)接多臺(tái)kafka來(lái)達(dá)到我們的業(yè)務(wù)需求,那么當(dāng)kafka存在多數(shù)據(jù)源的情況,就與單機(jī)的情況有所不同。 單機(jī)的情況 如果是單機(jī)的kafka我們直接通過(guò)springboot自

    2024年02月13日
    瀏覽(29)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包