使用StreamBridge實(shí)現(xiàn)RabbitMq && 延時(shí)消息
Maven依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
延時(shí)消息需要安裝插件
下載地址:link
1.下載完成放到rabbitmq安裝目錄plugins下
2.執(zhí)行命令啟用插件
3.重啟mq文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-512066.html
rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 啟用插件
//重啟mq
rabbitmq-server stop
rabbitmq-server start
Exchanges -> add a new exchange -> type 出現(xiàn)x-delayed-message即安裝成功
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-512066.html
yml配置
spring:
rabbitmq:
host: localhost
port: 5672
username: xxxx
password: xxxx
function:
# 與消費(fèi)者對(duì)應(yīng)(消費(fèi)者方法名稱)
definition: ackMessage;normal;delay
stream:
rabbit:
bindings:
ackMessage-in-0:
consumer:
acknowledge-mode: manual # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
delay-in-0:
consumer:
delayedExchange: true # 開(kāi)啟延時(shí)
delay-out-0:
producer:
delayedExchange: true # 開(kāi)啟延時(shí)
bindings:
delay-in-0:
destination: delay.exchange.cloud # mq對(duì)應(yīng)交換機(jī)
content-type: application/json
consumer:
acknowledge-mode: auto # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
group: delay-group # 消息組
binder: rabbit
delay-out-0:
destination: delay.exchange.cloud
content-type: application/json
group: delay-group
binder: rabbit
ackMessage-in-0:
destination: ackMessage.exchange.cloud
content-type: application/json
consumer:
acknowledge-mode: manual # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
group: ackMessage-group
binder: rabbit
ackMessage-out-0:
destination: ackMessage.exchange.cloud
content-type: application/json
group: ackMessage-group
binder: rabbit
normal-in-0:
destination: normal.exchange.cloud
content-type: application/json
consumer:
acknowledge-mode: auto # manual手動(dòng)確認(rèn) ,auto 自動(dòng)確認(rèn)
group: normal-group
binder: rabbit
normal-out-0:
destination: normal.exchange.cloud
content-type: application/json
group: normal-group
binder: rabbit
接口controller
import com.alibaba.fastjson2.JSON;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
/**
* @Description: RabbitmqController
*/
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/mq")
public class MqController {
//消息發(fā)送者
private final RabbitMqProducer rabbitMqProducer;
/**
* 發(fā)送普通消息Rabbitmq
* bindingName 綁定隊(duì)列名稱
* @param msg 消息內(nèi)容
*/
@GetMapping("/sendMessage/{msg}/{bindingName}")
public R<Void> sendMessage(@PathVariable("msg") String msg, @PathVariable("bindingName") String bindingName) {
log.info(bindingName + "發(fā)送消息: " + msg);
rabbitMqProducer.sendMsg(msg, bindingName);
return R.ok();
}
/**
* 發(fā)送延遲消息
*
* @param message 消息實(shí)體
* @return
*/
@PostMapping("/sendDelayedMessage")
public R<Void> sendDelayedMessage(@RequestBody Message message) {
log.info(MqTExchangesEnum.delay + "發(fā)送延時(shí)消息: " + LocalDateTime.now() + " " + message);
rabbitMqProducer.sendDelayMsg(JSON.toJSONString(message), message.getBindingName(), message.getSeconds());// 延遲時(shí)間(秒)
return R.ok();
}
}
發(fā)送者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* RabbitMq消息生產(chǎn)者
*/
@Component
public class RabbitMqProducer {
@Autowired
private StreamBridge streamBridge;
/**
* @Description RabbitMq消息生產(chǎn)者
* @Param msg 消息內(nèi)容
* @Param bindingName exchange綁定queue名稱
**/
public void sendMsg(String msg, String bindingName) {
// 構(gòu)建消息對(duì)象
Messaging messaging = new Messaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);
Message<Messaging> message = MessageBuilder.withPayload(messaging).build();
streamBridge.send(bindingName, message);
}
/**
* 發(fā)送延遲消息
*
* @param msg
* @param bindingName
* @param seconds
*/
public void sendDelayMsg(String msg, String bindingName, Integer seconds) {
// 構(gòu)建消息對(duì)象
Messaging messaging = new Messaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);
Message<Messaging> message = MessageBuilder.withPayload(messaging).setHeader("x-delay", seconds * 1000).build();
streamBridge.send(bindingName, message);
}
}
消費(fèi)者
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import java.time.LocalDateTime;
import java.util.function.Consumer;
/**
* RabbitMq消息消費(fèi)者
*/
@Component
@Slf4j
public class RabbitMqConsumer {
/**
* mq接收ackMessage消息/手動(dòng)ack確認(rèn)
* @methodName 配置文件對(duì)應(yīng)
**/
@Bean
Consumer<Message<Messaging>> ackMessage() {
log.info("ackMessage-初始化訂閱");
return obj -> {
Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
try {
log.info("ackMessage-消息接收成功:" + obj.getPayload());
//業(yè)務(wù)邏輯處理
//ack確認(rèn)
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//重新回隊(duì)列-true則重新入隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列。
// channel.basicReject(deliveryTag, true);
log.error(e.getMessage());
}
};
}
/**
* mq接收normal消息
**/
@Bean
Consumer<Messaging> normal() {
log.info("normal-初始化訂閱");
return obj -> {
log.info("normal-消息接收成功:" + obj);
//業(yè)務(wù)邏輯處理
};
}
/**
* mq接收延時(shí)消息
* Messaging 發(fā)送實(shí)體消息接收實(shí)體消息
**/
@Bean
Consumer<Message<Messaging>> delay() {
log.info("delay-初始化訂閱");
return obj -> {
Messaging payload = obj.getPayload();
log.info("delay-消息接收成功:" + LocalDateTime.now() + " " + payload);
//業(yè)務(wù)邏輯處理
};
}
}
到了這里,關(guān)于使用StreamBridge實(shí)現(xiàn)RabbitMq 消息收發(fā) && ack確認(rèn) && 延時(shí)消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!