RabbitMQ六種工作模式簡單說明
1、simple簡單模式
- 消息產(chǎn)生著§將消息放入隊列
- 消息的消費者(consumer) 監(jiān)聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經(jīng)從隊列中消失了,造成消息的丟失)應(yīng)用場景:聊天(中間有一個過度的服務(wù)器;p端,c端)
2、work工作模式(資源的競爭)
- 消息產(chǎn)生者將消息放入隊列消費者可以有多個,消費者1,消費者2,同時監(jiān)聽同一個隊列,消息被消費?C1 C2共同爭搶當前的消息隊列內(nèi)容,誰先拿到誰負責(zé)消費消息(隱患,高并發(fā)情況下,默認會產(chǎn)生某一個消息被多個消費者共同使用,可以設(shè)置一個開關(guān)(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個消費者使用)
- 應(yīng)用場景:紅包;大項目中的資源調(diào)度(任務(wù)分配系統(tǒng)不需知道哪一個任務(wù)執(zhí)行系統(tǒng)在空閑,直接將任務(wù)扔到消息隊列中,空閑的系統(tǒng)自動爭搶)
3、publish/subscribe發(fā)布訂閱(共享資源)
- X代表交換機rabbitMQ內(nèi)部組件,erlang 消息產(chǎn)生者是代碼完成,代碼的執(zhí)行效率不高,消息產(chǎn)生者將消息放入交換機,交換機發(fā)布訂閱把消息發(fā)送到所有消息隊列中,對應(yīng)消息隊列的消費者拿到消息進行消費
- 相關(guān)場景:郵件群發(fā),群聊天,廣播(廣告)
4、routing路由模式
- 消息生產(chǎn)者將消息發(fā)送給交換機按照路由判斷,路由是字符串(info) 當前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機根據(jù)路由的key,只能匹配上路由key對應(yīng)的消息隊列,對應(yīng)的消費者才能消費消息;
- 根據(jù)業(yè)務(wù)功能定義路由字符串
- 從系統(tǒng)的代碼邏輯中獲取對應(yīng)的功能字符串,將消息任務(wù)扔到對應(yīng)的隊列中業(yè)務(wù)場景:error 通知;EXCEPTION;錯誤通知的功能;傳統(tǒng)意義的錯誤通知;客戶通知;利用key路由,可以將程序中的錯誤封裝成消息傳入到消息隊列中,開發(fā)者可以自定義消費者,實時接收錯誤;
5、topic 主題模式(路由模式的一種)
- 星號井號代表通配符
- 星號代表多個單詞,井號代表一個單詞
- 路由功能添加模糊匹配
- 消息產(chǎn)生者產(chǎn)生消息,把消息交給交換機
- 交換機根據(jù)key的規(guī)則模糊匹配到對應(yīng)的隊列,由隊列的監(jiān)聽消費者接收消息消費
6、遠程調(diào)用rpc模式
- 客戶端發(fā)送一個請求消息然后服務(wù)器回復(fù)一個響應(yīng)消息。為了收到一個響應(yīng),我們需要發(fā)送一個’回調(diào)’的請求的隊列地址。
本文使用的是Topic主題模式,完整代碼地址在結(jié)尾!!
第一步,在pom.xml加入依賴,如下
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步,編寫application.yml配置文件,如下
server:
port: 8184
spring:
application:
name: rabbitmq-demo-server
rabbitmq:
host: xxx # ip地址
port: 5672
username: xxx # 連接賬號
password: xxx # 連接密碼
template:
retry:
enabled: true # 開啟失敗重試
initial-interval: 10000ms # 第一次重試的間隔時長
max-interval: 300000ms # 最長重試間隔,超過這個間隔將不再重試
multiplier: 2 # 下次重試間隔的倍數(shù),此處是2即下次重試間隔是上次的2倍
exchange: topic.exchange # 缺省的交換機名稱,此處配置后,發(fā)送消息如果不指定交換機就會使用這個
publisher-confirm-type: correlated # 生產(chǎn)者確認機制,確保消息會正確發(fā)送,如果發(fā)送失敗會有錯誤回執(zhí),從而觸發(fā)重試
publisher-returns: true
listener:
type: simple
simple:
acknowledge-mode: manual
prefetch: 1 # 限制每次發(fā)送一條數(shù)據(jù)。
concurrency: 3 # 同一個隊列啟動幾個消費者
max-concurrency: 3 # 啟動消費者最大數(shù)量
# 重試策略相關(guān)配置
retry:
enabled: true # 是否支持重試
max-attempts: 5
stateless: false
multiplier: 1.0 # 時間策略乘數(shù)因子
initial-interval: 1000ms
max-interval: 10000ms
default-requeue-rejected: true
第三步,創(chuàng)建RabbitMqConstants類,如下
/**
* RabbitMqConstants
*
* @author luoyu
* @date 2019/03/16 22:12
* @description
*/
public class RabbitMqConstants {
public final static String TEST1_QUEUE = "test1-queue";
public final static String TEST2_QUEUE = "test2-queue";
public final static String EXCHANGE_NAME = "test.topic.exchange";
public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*";
public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test";
public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*";
public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";
}
第四步,創(chuàng)建RabbitMqConfig配置類,如下
import com.luoyu.rabbitmq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* RabbitMQConfig
*
* @author luoyu
* @date 2019/03/16 21:59
* @description
*/
@Slf4j
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 聲明交換機
*/
@Bean(RabbitMqConstants.EXCHANGE_NAME)
public Exchange exchange(){
//durable(true) 持久化,mq重啟之后交換機還在
return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();
}
/**
* 聲明隊列
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重啟的時候不需要創(chuàng)建新的隊列
* auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false
* exclusive 表示該消息隊列是否只在當前connection生效,默認是false
*/
@Bean(RabbitMqConstants.TEST1_QUEUE)
public Queue esQueue() {
return new Queue(RabbitMqConstants.TEST1_QUEUE);
}
/**
* 聲明隊列
*/
@Bean(RabbitMqConstants.TEST2_QUEUE)
public Queue gitalkQueue() {
return new Queue(RabbitMqConstants.TEST2_QUEUE);
}
/**
* TEST1_QUEUE隊列綁定交換機,指定routingKey
*/
@Bean
public Binding bindingEs(@Qualifier(RabbitMqConstants.TEST1_QUEUE) Queue queue,
@Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs();
}
/**
* TEST2_QUEUE隊列綁定交換機,指定routingKey
*/
@Bean
public Binding bindingGitalk(@Qualifier(RabbitMqConstants.TEST2_QUEUE) Queue queue,
@Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs();
}
/**
* 如果需要在生產(chǎn)者需要消息發(fā)送后的回調(diào),
* 需要對rabbitTemplate設(shè)置ConfirmCallback對象,
* 由于不同的生產(chǎn)者需要對應(yīng)不同的ConfirmCallback,
* 如果rabbitTemplate設(shè)置為單例bean,
* 則所有的rabbitTemplate實際的ConfirmCallback為最后一次申明的ConfirmCallback。
* @return
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
}
第五步,創(chuàng)建RabbitMqUtils工具類,如下
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* RabbitMqUtils
*
* @author luoyu
* @date 2019/03/16 22:08
* @description
*/
@Slf4j
@Component
public class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
private RabbitTemplate rabbitTemplate;
/**
* 構(gòu)造方法注入
*/
@Autowired
public RabbitMqUtils(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
//這是是設(shè)置回調(diào)能收到發(fā)送到響應(yīng)
rabbitTemplate.setConfirmCallback(this);
//如果設(shè)置備份隊列則不起作用
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
/**
* 回調(diào)確認
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}else{
log.info("消息發(fā)送失?。篶orrelationData({}),ack({}),cause({})",correlationData,ack,cause);
}
}
/**
* 消息發(fā)送到轉(zhuǎn)換器的時候沒有對列,配置了備份對列該回調(diào)則不生效
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
/**
* 發(fā)送到指定Queue
* @param queueName
* @param obj
*/
public void send(String queueName, Object obj){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(queueName, obj, correlationId);
}
/**
* 1、交換機名稱
* 2、routingKey
* 3、消息內(nèi)容
*/
public void sendByRoutingKey(String exChange, String routingKey, Object obj){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId);
}
}
第六步,編寫RabbitMqListener消費者監(jiān)聽器,如下
import com.luoyu.rabbitmq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Slf4j
@Component
public class RabbitMqListener {
@RabbitListener(queues = RabbitMqConstants.TEST1_QUEUE)
public void test1Consumer(Message message, Channel channel) {
try {
//手動確認消息已經(jīng)被消費
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("Test1消費消息:" + message.toString() + "。成功!");
} catch (Exception e) {
e.printStackTrace();
log.info("Test1消費消息:" + message.toString() + "。失??!");
}
}
@RabbitListener(queues = RabbitMqConstants.TEST2_QUEUE)
public void test2Consumer(Message message, Channel channel) {
try {
//手動確認消息已經(jīng)被消費
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("Test2消費消息:" + message.toString() + "。成功!");
} catch (Exception e) {
e.printStackTrace();
log.info("Test2消費消息:" + message.toString() + "。失敗!");
}
}
}
第七步,編寫生產(chǎn)者服務(wù)類,TestService,TestServiceImpl,如下
TestService
public interface TestService {
String sendTest1(String content);
String sendTest2(String content);
}
TestServiceImpl
import com.luoyu.rabbitmq.constants.RabbitMqConstants;
import com.luoyu.rabbitmq.service.TestService;
import com.luoyu.rabbitmq.util.RabbitMqUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class TestServiceImpl implements TestService {
@Autowired
private RabbitMqUtils rabbitMqUtils;
@Override
public String sendTest1(String content) {
rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content);
return "發(fā)送成功!";
}
@Override
public String sendTest2(String content) {
rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,
RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content);
return "發(fā)送成功!";
}
}
第八步,編寫TestController類,如下
import com.luoyu.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author :jhx
* @date :2020/12/26
* @desc :
*/
@RestController
@Slf4j
@RequestMapping(value = "/message")
public class TestController {
@Autowired
private TestService testService;
/**
* 發(fā)送消息test1
* @param content
* @return
*/
@PostMapping(value = "/test1")
public String sendTest1(@RequestBody String content) {
return testService.sendTest1(content);
}
/**
* 發(fā)送消息test2
* @param content
* @return
*/
@PostMapping(value = "/test2")
public String sendTest2(@RequestBody String content) {
return testService.sendTest2(content);
}
}
第九步,啟動項目,使用postman調(diào)用不同Topic主題的發(fā)送消息接口,通過控制臺打印日志,可知消息成功發(fā)送并成功被消費。
完整代碼地址:https://github.com/Jinhx128/springboot-demo
注:此工程包含多個module,本文所用代碼均在rabbitmq-demo模塊下
文章來源地址http://www.zghlxwxcb.cn/news/detail-783411.html
文章來源:http://www.zghlxwxcb.cn/news/detail-783411.html
到了這里,關(guān)于SpringBoot 2.2.5 整合RabbitMQ,實現(xiàn)Topic主題模式的消息發(fā)送及消費的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!