SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列
上一篇直通車(chē)
SpringBoot整合SpringCloudStream3.1+版本Kafka
實(shí)現(xiàn)死信隊(duì)列步驟
- 添加死信隊(duì)列配置文件,添加對(duì)應(yīng)channel
- 通道綁定配置對(duì)應(yīng)的channel位置添加重試配置
結(jié)果
配置文件
Kafka基本配置(application-mq.yml)
server:
port: 7105
spring:
application:
name: betrice-message-queue
config:
import:
- classpath:application-bindings.yml
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
enable.auto.commit: false
binders:
betrice-kafka:
type: kafka
environment:
spring.kafka:
bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}
創(chuàng)建死信隊(duì)列配置文件(application-dql.yml)
spring:
cloud:
stream:
kafka:
bindings:
dqlTransfer-in-0:
consumer:
# When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.
# messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].
# By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
enableDlq: true
dlqName: Evad05-message-dlq
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
# valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.devilvan.pojo.Evad05MessageSerde
autoCommitOnError: true
autoCommitOffset: true
注意:這里的valueSerde使用了對(duì)象類(lèi)型,需要搭配application/json
使用,consumer接收到消息后會(huì)轉(zhuǎn)化為json字符串
通道綁定文件添加配置(application-bindings.yml)
channel對(duì)應(yīng)上方配置文件的dqlTransfer-in-0
spring:
cloud:
stream:
betrice-default-binder: betrice-kafka
function:
# 聲明兩個(gè)channel,transfer接收生產(chǎn)者的消息,處理完后給sink
definition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumer
bindings:
# 添加生產(chǎn)者bindiing,輸出到destination對(duì)應(yīng)的topic
dqlTransfer-in-0:
destination: Evad10
binder: ${spring.cloud.stream.betrice-default-binder}
group: evad05DlqConsumer # 使用死信隊(duì)列必須要有g(shù)roup
content-type: application/json
consumer:
maxAttempts: 2 # 當(dāng)消息消費(fèi)失敗時(shí),嘗試消費(fèi)該消息的最大次數(shù)(消息消費(fèi)失敗后,發(fā)布者會(huì)重新投遞)。默認(rèn)3
backOffInitialInterval: 1000 # 消息消費(fèi)失敗后重試消費(fèi)消息的初始化間隔時(shí)間。默認(rèn)1s,即第一次重試消費(fèi)會(huì)在1s后進(jìn)行
backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時(shí)間的倍數(shù)。默認(rèn)2,即第二次是第一次間隔時(shí)間的2倍,第三次是第二次的2倍
backOffMaxInterval: 10000 # 下一次嘗試重試的最大時(shí)間間隔,默認(rèn)為10000ms,即10s。
dqlTransfer-out-0:
destination: Evad10
binder: ${spring.cloud.stream.betrice-default-binder}
content-type: text/plain
# 消費(fèi)死信隊(duì)列中的消息
evad05DlqConsumer-in-0:
destination: Evad05-message-dlq
binder: ${spring.cloud.stream.betrice-default-binder}
content-type: text/plain
Controller
發(fā)送消息并將消息引入死信隊(duì)列
@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
@Resource(name = "streamBridgeUtils")
private StreamBridge streamBridge;
@PostMapping("streamSend")
public void streamSend(String topic, String message) {
try {
streamBridge.send(topic, message);
log.info("發(fā)送消息:" + message);
} catch (Exception e) {
log.error("異常消息:" + e);
}
}
@PostMapping("streamSendDql")
public void streamSendDql(String topic, String message) {
try {
streamBridge.send(topic, message);
log.info("發(fā)送消息:" + message);
} catch (Exception e) {
log.error("異常消息:" + e);
}
}
@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
try {
Evad05MessageSerde message = new Evad05MessageSerde();
message.setData("evad05 test dql");
message.setCount(1);
streamBridge.send(topic, message);
log.info("發(fā)送消息:" + message);
} catch (Exception e) {
log.error("異常消息:" + e);
}
}
}
Channel
這里使用了transfer通道,消息從Evad10(topic)傳來(lái),經(jīng)過(guò)transfer()方法后拋出異常,隨后進(jìn)入對(duì)應(yīng)的死信隊(duì)列
@Configuration
public class BetriceMqSubChannel {
@Bean
public Function<String, String> dqlTransfer() {
return message -> {
System.out.println("transfer: " + message);
throw new RuntimeException("死信隊(duì)列測(cè)試!");
};
}
@Bean
public Consumer<String> evad05DlqConsumer() {
return message -> {
System.out.println("Topic: evad05 Dlq Consumer: " + message);
};
}
}
將自定義序列化類(lèi)型轉(zhuǎn)換為JSON消息
步驟
1. 通道綁定文件(application-bindings.yml)的valueSerde屬性添加自定義的序列化
2. BetriceMqController
中封裝該自定義類(lèi)型的對(duì)象,并作為消息發(fā)送
@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {
try {
Evad05MessageSerde message = new Evad05MessageSerde();
message.setData("evad05 test dql");
message.setCount(1);
streamBridge.send(topic, message);
log.info("發(fā)送消息:" + message);
} catch (Exception e) {
log.error("異常消息:" + e);
}
}
3. channel(BetriceMqSubChannel
)接收到該消息并反序列化
@Bean
public Consumer<String> evad05DlqConsumer() {
return message -> {
System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));
};
}
4. 結(jié)果
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-601023.html
參考網(wǎng)址
Kafka 消費(fèi)端消費(fèi)重試和死信隊(duì)列 - Java小強(qiáng)技術(shù)博客 (javacui.com)
spring cloud stream kafka rabbit 實(shí)現(xiàn)死信隊(duì)列_spring cloud stream kafka 死信隊(duì)列_it噩夢(mèng)的博客-CSDN博客文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-601023.html
到了這里,關(guān)于SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!