1,版本說(shuō)明
erlang 和 rabbitmq 版本說(shuō)明
https://www.rabbitmq.com/which-erlang.html
確認(rèn)需要安裝的mq版本以及對(duì)應(yīng)的erlang版本。
2,下載安裝文件
RabbitMQ下載地址:
https://packagecloud.io/rabbitmq/rabbitmq-server
Erlang下載地址:
https://packagecloud.io/rabbitmq/erlang
RabbitMQ延遲消息插件下載
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下載文件如圖
3,安裝步驟
3.1, 查詢(xún)是否有安裝過(guò)erlang、rabbitmq, 查詢(xún)到有的話(huà)需要?jiǎng)h除。
rpm -qa | grep rabbitmq-server
rpm -qa | grep erlang
# 刪除
yum -y remove rabbitmq-server.noarch
3.2, 本地安裝erlang
yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
# 查詢(xún)安裝的版本
erl -version
# Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx
3.3, 本地安裝rabbitmq
yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm
# 啟動(dòng)rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq狀態(tài)
systemctl status rabbitmq-server
# 設(shè)置rabbitmq服務(wù)開(kāi)機(jī)自啟動(dòng)
systemctl enable rabbitmq-server
# 關(guān)閉rabbitmq服務(wù)
systemctl stop rabbitmq-server
# 重啟rabbitmq服務(wù)
systemctl restart rabbitmq-server
3.4, mq 端口開(kāi)放:
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
firewall-cmd --zone=public --list-ports
3.5, 安裝mq管理界面
# 啟用管理界面插件
rabbitmq-plugins enable rabbitmq_management
curl http://localhost:15672 就可以打開(kāi)web管理頁(yè)面
# rabbitmq有一個(gè)默認(rèn)的賬號(hào)密碼guest,但該情況僅限于本機(jī)localhost進(jìn)行訪(fǎng)問(wèn),所以需要添加一個(gè)遠(yuǎn)程登錄的用戶(hù)
# 添加用戶(hù)
rabbitmqctl add_user 用戶(hù)名 密碼
rabbitmqctl add_user admin 123456
# 設(shè)置用戶(hù)角色,分配操作權(quán)限
rabbitmqctl set_user_tags 用戶(hù)名 角色
rabbitmqctl set_user_tags admin administrator
# 為用戶(hù)添加資源權(quán)限(授予訪(fǎng)問(wèn)虛擬機(jī)根節(jié)點(diǎn)的所有權(quán)限)
rabbitmqctl set_permissions -p / 用戶(hù)名 ".*" ".*" ".*"
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 角色有四種:
# administrator:可以登錄控制臺(tái)、查看所有信息、并對(duì)rabbitmq進(jìn)行管理
# monToring:監(jiān)控者;登錄控制臺(tái),查看所有信息
# policymaker:策略制定者;登錄控制臺(tái)指定策略
# managment:普通管理員;登錄控制
# 修改密碼
rabbitmqctl change_ password 用戶(hù)名 新密碼
# 刪除用戶(hù)
rabbitmqctl delete_user 用戶(hù)名
# 查看用戶(hù)清單
rabbitmqctl list_users
3.6, 延遲消息插件安裝:
# 把插件包先復(fù)制到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重啟mq
systemctl restart rabbitmq-server
rabbitmq-plugins list
3.7,登錄測(cè)試
訪(fǎng)問(wèn)地址: ip:15672 賬號(hào)密碼: admin 123456
找到交換機(jī) exchange,看看類(lèi)型是否有延遲消息類(lèi)型的
然后就可以寫(xiě)代碼去連接發(fā)消息了。
4, Java代碼
4.1, pom 引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2, 配置類(lèi):
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
4.3, 消息定義配置類(lèi):
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class OrderRabbitMQConfig {
@Autowired
private RabbitAdmin rabbitAdmin;
//================================訂單延時(shí)=================================
@Bean
CustomExchange order_pay_delay_exchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue order_pay_delay_queue() {
Queue queue = new Queue("order_pay_delay_queue", true, false, false);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
public Binding order_pay_delay_binding() {
return BindingBuilder.bind(order_pay_delay_queue())
.to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
}
//================================訂單支付通知======================================
@Bean
public DirectExchange order_pay_notify_exchange() {
return new DirectExchange("order_pay_notify_exchange", true, false);
}
@Bean
public Queue order_pay_notify_direct_queue() {
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-max-priority", 5);
Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
rabbitAdmin.declareQueue(queue);
return queue;
}
@Bean
public Binding ctc_bidding_auction_pay_notify_binding() {
return BindingBuilder.bind(order_pay_notify_direct_queue())
.to(order_pay_notify_exchange()).with("order_pay_notify_routing");
}
}
4.4, 消息發(fā)送類(lèi):文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-715957.html
import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitMQSendUtils {
private static RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 訂單支付延時(shí)通知、發(fā)送MQ消息
*/
public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
//給延遲隊(duì)列發(fā)送消息
String msg = JSONUtil.toJsonStr(dto);
log.info("訂單支付延時(shí)通知、發(fā)送MQ消息: {}, delayTimes={}", msg, delayTimes);
rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//給消息設(shè)置延遲毫秒值
message.getMessageProperties().setDelay(delayTimes);
return message;
}
});
}
/**
* 訂單支付通知,發(fā)送MQ消息
*/
public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
log.info("訂單支付通知,發(fā)送MQ消息: {}", dto);
rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
}
}
4.5, 消息監(jiān)聽(tīng)消費(fèi)類(lèi):文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-715957.html
import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* MQ消費(fèi)監(jiān)聽(tīng)
*/
@Slf4j
@Component
public class OrderMQListener {
/**
* 訂單延時(shí)通知 消息
*/
@RabbitListener(queues = {"order_pay_delay_queue"})
public void payDelayNotify(Message message) {
try {
String msg = new String(message.getBody());
log.info("【消費(fèi)】訂單延時(shí)通知 MQ 消息內(nèi)容: {}, Message={}", msg, message);
//支付訂單改成超時(shí)未支付》取消
PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
} catch (Exception e) {
log.error("訂單延時(shí)通知 消息消費(fèi)失?。?, e);
}
}
/**
* 訂單支付通知 消息
*/
@RabbitListener(queues = {"order_pay_notify_queue"})
public void payNotify(Message message) {
try {
String msg = new String(message.getBody());
log.info("訂單支付通知 MQ 消息內(nèi)容:{}, {}", msg, message);
PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
} catch (Exception e) {
log.error("訂單支付通知 消息消費(fèi)失敗:", e);
}
}
}
到了這里,關(guān)于Centos安裝RabbitMQ,JavaSpring發(fā)送RabbitMQ延遲延時(shí)消息,JavaSpring消費(fèi)RabbitMQ消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!