前言
相信很多同學都開發(fā)過WEB服務,在WEB服務的開發(fā)中一般是通過緩存、隊列、讀寫分離、削峰填谷、限流降級等手段來提高服務性能和保證服務的正常投用。對于削峰填谷就不得不用到我們的MQ消息中間件,比如適用于大數(shù)據(jù)的kafka,性能較高支持事務活躍度高的rabbitmq等等,MQ的選用和整合已經是JAVA WEB開發(fā)中不可或缺對的一部分。當然,作為號稱JAVA微服務框架全家桶的Spring Cloud也提供了良好的適配中間件的功能。今天我們就來整合一下微服務全家桶Spring Cloud提供的消息驅動——Spring Cloud Stream。
Spring Cloud Stream簡析
Spring Cloud Stream是用于構建微服務具有消息驅動能力的框架,應用程序通過inputs、outputs通道與binder進行交互,binder與消息中間件進行通信。
binder的作用是將消息中間件進行粘合,相當于對第三方中間件進行封裝整合,讓開發(fā)人員不用關心底層消息中間件如何運行。
inputs是消息輸入通道,類似于消息中間件的consumer消費者;outputs是消息輸出通道,類似于消息中間件的producer生產者。應用程序收發(fā)消息不再直接調用消息中間件的接口或者邏輯代碼,直接使用Spring Cloud Stream 的OUTPUT與INPUT通道進行處理。
可以通過binder綁定選用各種消息中間件,用binding進行中間件的相關參數(shù)配置,讓應用程序達到靈活配置和切換消息中間件的目的。
Spring Cloud Stream與rabbitmq整合
本次整合直接與rabbitmq整合,如果是使用kafka的同學,可以直接移植配置修改對應粘接mq即可。
本次整合加入了消費重試機制、死信隊列,并提供死信隊列消費監(jiān)聽方法,可直接移植到生產環(huán)境。
1、添加pom依賴
引入spring-cloud-starter-stream-rabbit 需要從Spring Cloud中引入,注意dependencyManagement的配置。
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR10</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
2、application.yml增加mq配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
cloud:
stream:
binders: #stream框架粘接的mq
myRabbit: #自定義個人mq名稱
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
bindings: #stream綁定信道
output_channel: #自定義發(fā)送信道名稱
destination: assExchange #目的地 交換機/主題
content-type: application/json
binder: myRabbit #粘接到的mq
group: assGroup
input_channel: #自定義接收信道
destination: assExchange #目的地 交換機/主題
content-type: application/json
binder: myRabbit #粘接到的mq
group: assGroup
consumer:
maxAttempts: 3 # 嘗試消費該消息的最大次數(shù)(消息消費失敗后,發(fā)布者會重新投遞)。默認3
backOffInitialInterval: 1000 # 重試消費消息的初始化間隔時間。默認1s,即第一次重試消費會在1s后進行
backOffMultiplier: 2 # 相鄰兩次重試之間的間隔時間的倍數(shù)。默認2
backOffMaxInterval: 10000 # 下一次嘗試重試的最大時間間隔,默認為10000ms,即10s
rabbit: #stream mq配置
bindings:
input_channel:
consumer:
concurrency: 1 #消費者數(shù)量
max-concurrency: 5 #最大消費者數(shù)量
durable-subscription: true #持久化隊列
recovery-interval: 3000 #3s 重連
acknowledge-mode: MANUAL #手動
requeue-rejected: false #是否重新放入隊列
auto-bind-dlq: true #開啟死信隊列
requeueRejected: true #異常放入死信
3、定義輸入輸出信道
/**
* MqChannel
* @author senfel
* @version 1.0
* @date 2023/6/2 15:46
*/
public interface MqChannel {
/**
* 消息目的地 RabbitMQ中為交換機名稱
*/
String destination = "assExchange";
/**
* 輸出信道
*/
String OUTPUT_CHANNEL = "output_channel";
/**
* 輸入信道
*/
String INPUT_CHANNEL = "input_channel";
/**
* 死信隊列
*/
String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";
@Output(MqChannel.OUTPUT_CHANNEL)
MessageChannel output();
@Input(MqChannel.INPUT_CHANNEL)
SubscribableChannel input();
}
4、使用輸入輸出信道收發(fā)消息
TestMQService
/**
* TestMQService
* @author senfel
* @version 1.0
* @date 2023/6/2 15:47
*/
public interface TestMQService {
/**
* 發(fā)送消息
*/
void send(String str);
}
TestMQServiceImpl
/**
* TestMQServiceImpl
* @author senfel
* @version 1.0
* @date 2023/6/2 15:49
*/
@Service
@Slf4j
@EnableBinding(MqChannel.class)
public class TestMQServiceImpl implements TestMQService {
@Resource
private MqChannel mqChannel;
@Override
public void send(String str) {
mqChannel.output().send(MessageBuilder.withPayload("測試=========="+str).build());
}
/**
* 接收消息監(jiān)聽
* @param message 消息體
* @param channel 信道
* @param tag 標簽
* @param death
* @author senfel
* @date 2023/6/5 9:25
* @return void
*/
@StreamListener(MqChannel.INPUT_CHANNEL)
public void process(String message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("message : "+message);
if(message.contains("9")){
// 參數(shù)1為消息的tag 參數(shù)2為是否多條處理 參數(shù)3為是否重發(fā)
//channel.basicNack(tag,false,false);
System.err.println("--------------消費者消費異常--------------------------------------");
System.err.println(message);
throw new RuntimeException("拋出異常");
}else{
System.err.println("--------------消費者--------------------------------------");
System.err.println(message);
channel.basicAck(tag,false);
}
}
/**
* 死信監(jiān)聽
* @param message 消息體
* @param channel 信道
* @param tag 標簽
* @param death
* @author senfel
* @date 2023/6/5 14:30
* @return void
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(MqChannel.INPUT_CHANNEL_DLQ)
, exchange = @Exchange(MqChannel.destination)
),
concurrency = "1-5"
)
public void processByDlq(String message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("message : "+message);
System.err.println("---------------死信消費者------------------------------------");
System.err.println(message);
}
}
controller
/**
* @author senfel
* @version 1.0
* @date 2023/6/2 17:27
*/
@RestController
public class TestController{
@Resource
private TestMQService testMQService;
@GetMapping("/test")
public String testMq(String str){
testMQService.send(str);
return str;
}
}
5、模擬正常消息消費
文章來源:http://www.zghlxwxcb.cn/news/detail-475447.html
6、模擬異常消息
異常消息重試滿足3次投遞后進入死信消費文章來源地址http://www.zghlxwxcb.cn/news/detail-475447.html
到了這里,關于實戰(zhàn):Spring Cloud Stream消息驅動框架整合rabbitMq的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!