MQ
同步調(diào)用和異步調(diào)用
同步調(diào)用優(yōu)點:
時效性強,立即得到結(jié)果
缺點:
- 耦合度高 新業(yè)務(wù)新需求到來時,需要修改代碼
- 性能和吞吐能力下降 調(diào)用服務(wù)的響應(yīng)時間為所有服務(wù)的時間之和
- 資源浪費 調(diào)用鏈中的服務(wù)在等待時不會釋放請求占用的資源
- 級聯(lián)失敗 一個服務(wù)執(zhí)行失敗會導(dǎo)致調(diào)用鏈后續(xù)所有服務(wù)失敗
異步調(diào)用優(yōu)點:
- 服務(wù)解耦 便于擴展
- 性能提高 吞吐量提高
- 不會級聯(lián)失敗
- 流量削峰
RabbitMQ
基礎(chǔ)概念
- channel: 操作MQ工具
- exchange: 交換機, 將消息路由到隊列中
- queue: 保存消息的隊列
- virtual host: 虛擬主機, 相當于namespace,隔離的環(huán)境,對queue和exchange的邏輯分組
模型
基于 Spring Amqp
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
簡單隊列 (Hello-World)
消息發(fā)送者和接收者都需要以下配置:
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: /
username: guest
password: guest
發(fā)送消息:
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void helloWorldModel(){
String queueName="zbq.queue1";
String message="hello, spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
}
消息接受:
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "zbq.queue1")
public void listenSimpleQueue(String msg){
log.info("收到消息: "+msg);
}
}
工作隊列 (Work Queue)
發(fā)送者:
@Test
public void workQueueModel(){
String queueName="zbq.work.queue";
String msg="hello, amqp ";
for(int i=0;i<1000;i++){
rabbitTemplate.convertAndSend(queueName,msg+i);
}
}
接收者:
@RabbitListener(queues = "zbq.work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
log.info("消費者1號接收到消息: "+msg);
Thread.sleep(20);
}
@RabbitListener(queues = "zbq.work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
log.info("消費者2號接收到消息: "+msg);
Thread.sleep(100);
}
Pub/Sub (Fanout exchange)
Fanout交換將將消息發(fā)送到每一個綁定到它的隊列中
- 聲明一個FanoutExchange,聲明2個隊列, 綁定隊列到FanoutExchange上
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("zbq.fanout");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("zbq.fanout.queue1");
}
@Bean
public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2(){
return new Queue("zbq.fanout.queue2");
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.消費者監(jiān)聽這兩個隊列
@RabbitListener(queues = "zbq.fanout.queue1")
public void listenFanoutQueue1(String msg){
log.info("消費者1收到Fanout消息: "+msg);
}
@RabbitListener(queues = "zbq.fanout.queue2")
public void listenFanoutQueue2(String msg){
log.info("消費者2收到Fanout消息: "+msg);
}
3.發(fā)消息到fanoutexchange
@Test
public void fanoutModel(){
String exchangeName="zbq.fanout";
String msg="hello, fanout ";
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchangeName,"",msg+i);
}
}
查看消費者輸出信息
Direct Exchange
Direct交換機會將消息按照路由規(guī)則發(fā)送到指定的隊列
1.聲明交換機, 隊列,并綁定,添加routingkey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "zbq.direct.queue1"),
exchange = @Exchange(name="zbq.direct",type = ExchangeTypes.DIRECT),
key = {"girlfriend","family"}
))
public void listenDirectQueue1(String msg){
log.info("消費者1收到Direct消息: "+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "zbq.direct.queue2"),
exchange = @Exchange(name="zbq.direct",type = ExchangeTypes.DIRECT),
key = {"friend","family"}
))
public void listenDirectQueue2(String msg){
log.info("消費者2收到Direct消息: "+msg);
}
2.發(fā)送消息
@Test
public void directModel(){
String exchangeName="zbq.direct";
String msg="晚上回去吃飯 ";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(exchangeName,"family",msg+i);
}
}
@Test
public void directModel2(){
String exchangeName="zbq.direct";
String msg="hello, direct";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(exchangeName,"girlfriend",msg+i);
}
}
Topic Exchange
話題交換機的routingkey 必須是多個單詞的列表,并以.
分隔
可以使用通配符#
和*
#
:代表0個或者多個單詞
*
:代表1個單詞
1.定義
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "zbq.topic.queue1"),
exchange = @Exchange(name="zbq.topic",type = ExchangeTypes.TOPIC),
key = "China.#"
))
public void listenTopicQueue1(String msg){
log.info("消費者1收到Topic消息: "+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "zbq.topic.queue2"),
exchange = @Exchange(name="zbq.topic",type = ExchangeTypes.TOPIC),
key = "#.weather"
))
public void listenTopicQueue2(String msg){
log.info("消費者2收到Topic消息: "+msg);
}
2.發(fā)消息
@Test
public void topicModel(){
String exchangeName="zbq.topic";
String msg="首都北京,今日氣溫10攝氏度";
rabbitTemplate.convertAndSend(exchangeName,"China.weather",msg);
}
序列化方式
@Test
public void test(){
Map<String,Object> map=new HashMap<>();
map.put("hair","long");
map.put("eyes","big");
rabbitTemplate.convertAndSend("zbq.queue1",map);
}
發(fā)送對象類型過去, 查看序列化后的值
RabbitMQ默認使用JDK自帶序列化
引入以下依賴修改序列化方法:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
注入Bean文章來源:http://www.zghlxwxcb.cn/news/detail-810928.html
@Bean
public MessageConverter customMC(){
return new Jackson2JsonMessageConverter();
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-810928.html
到了這里,關(guān)于RabbitMQ 消息隊列使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!