国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器)

這篇具有很好參考價(jià)值的文章主要介紹了RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1.初識MQ

1.1.同步和異步通訊

微服務(wù)間通訊有同步和異步兩種方式:

同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。

異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。

兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會有延遲。

1.1.1.同步通訊

我們之前學(xué)習(xí)的Feign調(diào)用就屬于同步方式,雖然調(diào)用可以實(shí)時(shí)得到結(jié)果,但存在下面的問題:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

總結(jié):

同步調(diào)用的優(yōu)點(diǎn):

  • 時(shí)效性較強(qiáng),可以立即得到結(jié)果

同步調(diào)用的問題:

  • 耦合度高 (違背開閉原則)
  • 性能和吞吐能力下降 (按順序一個(gè)個(gè)請求,總耗時(shí)等于所有微服務(wù)請求響應(yīng)時(shí)間之和)
  • 有額外的資源消耗 (等待下游業(yè)務(wù)響應(yīng)得一直干等著,啥也不能干)
  • 有級聯(lián)失敗問題 (某個(gè)微服務(wù)掛了,凡是調(diào)用鏈中包含該微服務(wù)的都會卡死在這里,越來越多,多米諾效應(yīng)(同步請求未成功會卡死在這里,卡死的請求多了就麻煩了))

1.1.2.異步通訊

最常見的就是事件驅(qū)動模式

異步調(diào)用則可以避免上述問題:

我們以購買商品為例,用戶支付后需要調(diào)用訂單服務(wù)完成訂單狀態(tài)修改,調(diào)用物流服務(wù),從倉庫分配響應(yīng)的庫存并準(zhǔn)備發(fā)貨。

在事件模式中,支付服務(wù)是事件發(fā)布者(publisher),在支付完成后只需要發(fā)布一個(gè)支付成功的事件(event),事件中帶上訂單id。

訂單服務(wù)和物流服務(wù)是事件訂閱者(Consumer),訂閱支付成功的事件,監(jiān)聽到事件后完成自己業(yè)務(wù)即可。

為了解除事件發(fā)布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個(gè)中間人(Broker)。發(fā)布者發(fā)布事件到Broker,不關(guān)心誰來訂閱事件。訂閱者從Broker訂閱事件,不關(guān)心誰發(fā)來的消息。

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

Broker 是一個(gè)像數(shù)據(jù)總線一樣的東西,所有的服務(wù)要接收數(shù)據(jù)和發(fā)送數(shù)據(jù)都發(fā)到這個(gè)總線上,這個(gè)總線就像協(xié)議一樣,讓服務(wù)間的通訊變得標(biāo)準(zhǔn)和可控。

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

支付服務(wù)完成支付之后不需要去調(diào)用其他各種關(guān)聯(lián)的服務(wù)了
只需要提交一個(gè)支付成功事件給Broker即可
其他關(guān)聯(lián)微服務(wù)會收到broker的事件通知,從而各自干自己的業(yè)務(wù)
這個(gè)時(shí)候想加一個(gè)新的關(guān)聯(lián)微服務(wù),讓其訂閱這個(gè)事件即可,支付功能代碼不需要修改
想要?jiǎng)h除某個(gè)關(guān)聯(lián)的微服務(wù)(eg:短信費(fèi)錢不要了),取消事件訂閱即可,支付功能代碼不需要改

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

支付完成后發(fā)一個(gè)事件給broker就完事兒了,不需要等待其他服務(wù)都執(zhí)行完畢,直接可以通知用戶支付成功
也就是所有微服務(wù)可以并行執(zhí)行了,之前的調(diào)用形式其實(shí)是串行的

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

我壓根兒就沒有調(diào)用其他微服務(wù)
因此哪個(gè)微服務(wù)掛了,對我本身沒有任何影響

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

好多用戶發(fā)大量請求
.
會先在broker這里緩存一下
各個(gè)微服務(wù)能處理幾個(gè)就拿幾個(gè)進(jìn)行處理
也就是各自按照自己的能力進(jìn)行處理
[不管1s來多少個(gè)請求,各個(gè)微服務(wù)都按照自己的能力1s能處理幾個(gè)就處理幾個(gè)]
所有的壓力都給broker扛著了
(就好像洪水來了,這里有個(gè)叫broker的大壩擋住了)

好處:

  • 吞吐量提升:無需等待訂閱者處理完成,響應(yīng)更快速

  • 故障隔離:服務(wù)沒有直接調(diào)用,不存在級聯(lián)失敗問題

  • 調(diào)用間沒有阻塞,不會造成無效的資源占用

  • 耦合度極低,每個(gè)服務(wù)都可以靈活插拔,可替換

  • 流量削峰:不管發(fā)布事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件

缺點(diǎn):

  • 架構(gòu)復(fù)雜了,業(yè)務(wù)沒有明顯的流程線,不好管理
  • 需要依賴于Broker的可靠、安全、性能 (其實(shí)是把所有的壓力轉(zhuǎn)移給broker了,所以對broker的要求特別特別高)

好在現(xiàn)在開源軟件或云平臺上 Broker 的軟件是非常成熟的,比較常見的一種就是我們今天要學(xué)習(xí)的MQ技術(shù)。

其實(shí)雖然異步通信有很多優(yōu)點(diǎn),但并不是同步通信就被淘汰了
大部份情況下還是同步通信,因?yàn)榇蟛糠萸闆r下并發(fā)度并沒有那么高,這個(gè)時(shí)候時(shí)效性是第一考慮,同步通信時(shí)效性好多了,有實(shí)時(shí)反饋呀
一些不需要反饋(通知你干就完了)的場景,需要追求解耦,吞吐,并發(fā)的場景,(比較少見),再用上異步通信
.
所以現(xiàn)在應(yīng)該是同步通信和異步通信結(jié)合著來用

1.2.技術(shù)對比:

MQ,中文是消息隊(duì)列(MessageQueue),字面來看就是存放消息的隊(duì)列。也就是事件驅(qū)動架構(gòu)中的Broker。

這里的消息其實(shí)就是事件
MQ就是Broker

比較常見的MQ實(shí)現(xiàn):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

幾種常見MQ的對比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社區(qū) Rabbit Apache 阿里 Apache
開發(fā)語言 Erlang Java Java Scala&Java
協(xié)議支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協(xié)議 自定義協(xié)議
可用性 一般
單機(jī)吞吐量 一般 非常高
消息延遲 微秒級 毫秒級 毫秒級 毫秒以內(nèi)
消息可靠性 一般 一般

從上表可以看出,沒有完美的方案,只有適合的方案

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

2.快速入門

2.1.安裝RabbitMQ

鏈接:https://pan.baidu.com/s/1Jq2I-kwj3PAUraW76VGLwA
提取碼:ssah
Rabbit: 兔子啦

安裝RabbitMQ,參考課前資料:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

或者直接參考這篇博客: RabbitMQ部署

MQ的基本結(jié)構(gòu):

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

RabbitMQ中的一些角色:

  • publisher:生產(chǎn)者

  • consumer:消費(fèi)者

  • exchange個(gè):交換機(jī),負(fù)責(zé)消息路由

  • queue:隊(duì)列,存儲消息

  • virtualHost:虛擬主機(jī),隔離不同租戶的exchange、queue、消息的隔離

  • 小結(jié)
    RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

2.2.RabbitMQ消息模型

RabbitMQ官方提供了5個(gè)不同的Demo示例,對應(yīng)了不同的消息模型:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java


查官方文檔
https://www.rabbitmq.com/
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
https://www.rabbitmq.com/getstarted.html


2.3.導(dǎo)入Demo工程

課前資料提供了一個(gè)Demo工程,mq-demo:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

導(dǎo)入后可以看到結(jié)構(gòu)如下:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

包括三部分:

  • mq-demo:父工程,管理項(xiàng)目依賴
  • publisher:消息的發(fā)送者
  • consumer:消息的消費(fèi)者

2.4.入門案例

HelloWorld Demo
完全自己寫RabbitMQ的連接

簡單隊(duì)列模式的模型圖:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

官方的HelloWorld是基于最基礎(chǔ)的消息隊(duì)列模型來實(shí)現(xiàn)的,只包括三個(gè)角色:

  • publisher:消息發(fā)布者,將消息發(fā)送到隊(duì)列queue (自己寫代碼)
  • queue:消息隊(duì)列,負(fù)責(zé)接受并緩存消息 (RabbitMQ管理)
  • consumer:訂閱隊(duì)列,處理隊(duì)列中的消息 (自己寫代碼)

2.4.1.publisher實(shí)現(xiàn)

思路:

  • 建立連接
  • 創(chuàng)建Channel
  • 聲明隊(duì)列
  • 發(fā)送消息
  • 關(guān)閉連接和channel

代碼實(shí)現(xiàn):

注意ip,用戶名、密碼。得寫成自己得

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
        factory.setHost("192.168.141.100");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("whuer");
        factory.setPassword("123321");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();

        // 3.創(chuàng)建隊(duì)列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.發(fā)送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("發(fā)送消息成功:【" + message + "】");

        // 5.關(guān)閉通道和連接
        channel.close();
        connection.close();

    }
}

斷點(diǎn)運(yùn)行,查看控制臺,以充分理解

2.4.2.consumer實(shí)現(xiàn)

代碼思路:

  • 建立連接
  • 創(chuàng)建Channel
  • 聲明隊(duì)列
  • 訂閱消息

代碼實(shí)現(xiàn):

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
        factory.setHost("192.168.141.100");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("whhuer");
        factory.setPassword("123321");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();

        // 3.創(chuàng)建隊(duì)列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.訂閱消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.處理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

2.5.總結(jié)

基本消息隊(duì)列的消息發(fā)送流程:

  1. 建立connection

  2. 創(chuàng)建channel

  3. 利用channel聲明隊(duì)列

  4. 利用channel向隊(duì)列發(fā)送消息

基本消息隊(duì)列的消息接收流程:

  1. 建立connection

  2. 創(chuàng)建channel

  3. 利用channel聲明隊(duì)列

  4. 定義consumer的消費(fèi)行為handleDelivery()

  5. 利用channel將消費(fèi)者與隊(duì)列綁定

3.SpringAMQP

Spring對RabbitMQ的封裝
直接用封裝后的API,方便多了

SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實(shí)現(xiàn)了自動裝配,使用起來非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

SpringAMQP提供了三個(gè)功能:

  • 自動聲明隊(duì)列、交換機(jī)及其綁定關(guān)系
  • 基于注解的監(jiān)聽器模式,異步接收消息
  • 封裝了RabbitTemplate工具,用于發(fā)送消息

3.1.Basic Queue 簡單隊(duì)列模型

SpringAMQP實(shí)現(xiàn)官方案例,helloworld

在父工程mq-demo中引入依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.1.消息發(fā)送

首先配置MQ地址,在publisher服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.141.100 # rabbitMQ的ip地址  (linux端docker安裝的)
    port: 5672 # rabbitMQ使用地址
    username: whuer # 用戶名
    password: 123321 # 密碼
    virtual-host: /  # 虛擬主機(jī) (隔離每個(gè)用戶操作的 瀏覽器端可以設(shè)置)

然后在publisher服務(wù)中編寫測試類SpringAmqpTest,并利用RabbitTemplate實(shí)現(xiàn)消息發(fā)送:
cn.whu.mq.spring.SpringAmqpTest

可能是低版本(2.3)的boot程序, 需要加上@RunWith(SpringRunner.class)才會有SpringBoot環(huán)境(才會幫你啟動spring容器)

package cn.whu.mq.spring;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class) // 加這一行才會啟動Spring容器  才會有spring環(huán)境
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue(){
        String queueName = "simple.queue";
        String message="I love you !";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

執(zhí)行完畢,瀏覽器端查看
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.1.2.消息接收

依賴在父工程里面引入了,所有子工程都繼承了,都有了,不需要重復(fù)引入了

首先配置MQ地址,在consumer服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.141.100 # rabbitMQ的ip地址  (linux端docker安裝的)
    port: 5672 # rabbitMQ使用地址
    username: whuer # 用戶名
    password: 123321 # 密碼
    virtual-host: /  # 虛擬主機(jī) (隔離每個(gè)用戶操作的 瀏覽器端可以設(shè)置)

然后在consumer服務(wù)的cn.whu.mq.listener包中新建一個(gè)類SpringRabbitListener,代碼如下:

package cn.whu.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void ListenSimpleQueue(String msg){//發(fā)送方發(fā)的是啥 這里參數(shù)就寫啥 Spring都能處理
        System.out.println("消費(fèi)者接收到simple.queue的消息【"+msg+"】");
    }

}

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.1.3.測試

啟動consumer服務(wù),然后在publisher服務(wù)中運(yùn)行測試代碼,發(fā)送MQ消息

運(yùn)行ConsumerApplication.main, 把整個(gè)boot程序運(yùn)行起來,Spring開始工作就行了
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.2.WorkQueue

第二個(gè)官方案例

Work queues,也被稱為(Task queues),任務(wù)模型。簡單來說就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長此以往,消息就會堆積越來越多,無法及時(shí)處理。

此時(shí)就可以使用work 模型,多個(gè)消費(fèi)者共同處理消息處理,速度就能大大提高了。

其他和上面BasicQueue一摸一樣
只是消費(fèi)者由一個(gè)變成了多個(gè),防止隊(duì)列被擠滿

3.2.1.消息發(fā)送

這次我們循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。

在publisher服務(wù)中的SpringAmqpTest類中添加一個(gè)測試方法:

// 生產(chǎn)者 每秒產(chǎn)生50條消息  (50*20=1000ms=1s)
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "I love you__";
    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

3.2.2.消息接收

要模擬多個(gè)消費(fèi)者綁定同一個(gè)隊(duì)列,我們在consumer服務(wù)的SpringRabbitListener中添加2個(gè)新的方法:

@RabbitListener(queues = "simple.queue") //參數(shù)就是隊(duì)列名稱
public void ListenWorkQueue(String msg) throws InterruptedException {//發(fā)送方發(fā)的是啥 這里參數(shù)就寫啥 Spring都能處理
    System.out.println("消費(fèi)者1接收到simple.queue的消息【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue") // 參數(shù)就是隊(duì)列名稱
public void ListenWorkQueue2(String msg) throws InterruptedException {//發(fā)送方發(fā)的是啥 這里參數(shù)就寫啥 Spring都能處理
    System.err.println("消費(fèi)者2接收到simple.queue的消息【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);//隊(duì)列2就處理速度慢10倍
}

注意到這個(gè)消費(fèi)者sleep了200秒,模擬任務(wù)耗時(shí)。

3.2.3.測試

啟動ConsumerApplication后,在執(zhí)行publisher服務(wù)中剛剛編寫的發(fā)送測試方法testWorkQueue。

可以看到消費(fèi)者1很快完成了自己的25條消息。消費(fèi)者2卻在緩慢的處理自己的25條消息。

(同時(shí)說明了默認(rèn)情況下消息是平均分配給兩個(gè)隊(duì)列的,運(yùn)行結(jié)果可以看出隊(duì)列1處理所有偶數(shù)號消息,隊(duì)列2處理所有奇數(shù)號消息)(默認(rèn)情況下RabbitMQ內(nèi)部的消息預(yù)取機(jī)制造成的: 輪詢投遞,不管消費(fèi)者有沒有處理完)

也就是說消息是平均分配給每個(gè)消費(fèi)者,并沒有考慮到消費(fèi)者的處理能力。這樣顯然是有問題的。

3.2.4.能者多勞

在spring中有一個(gè)簡單的配置,可以解決這個(gè)問題。我們修改consumer服務(wù)的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條消息,處理完成返回ACK后,才能獲取下一個(gè)消息

重啟后,再次運(yùn)行發(fā)現(xiàn):
剛剛兩個(gè)消費(fèi)者處理完50條消息要5s(主要是消費(fèi)者2拖了后腿)
現(xiàn)在不會了,1s多就處理完了50條消息,因?yàn)槁俚南M(fèi)者2 只被分配到了7條消息
此行配置保證了能者多勞,也就是合理投遞消息,使得整體效率高了好多。

3.2.5.總結(jié)

Work模型的使用:

  • 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會被一個(gè)消費(fèi)者處理
  • 通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量

上面的2種方式,消息取完即在隊(duì)列中被刪除,也就是每個(gè)消息只會被一個(gè)消費(fèi)者消費(fèi)。
但是實(shí)際情況中,訂單支付完成消息需要通知給好多個(gè)微服務(wù)(倉儲、物流、短信),很明顯,取完即刪完成不了這個(gè)需求了
于是就有了接下來的發(fā)布/訂閱模式

3.3.發(fā)布/訂閱

發(fā)布訂閱的模型如圖:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

可以看到,在訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:

  • Publisher:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
  • Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
    • Direct:定向,把消息交給符合指定routing key 的隊(duì)列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
  • Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒有變化
  • Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會丟失!

3.4.Fanout

Fanout,英文翻譯是扇出,我覺得在MQ中叫廣播更合適。

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

在廣播模式下,消息發(fā)送流程是這樣的:

  • 1) 可以有多個(gè)隊(duì)列
  • 2) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
  • 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無法決定
  • 4) 交換機(jī)把消息發(fā)送給綁定過的所有隊(duì)列
  • 5) 訂閱隊(duì)列的消費(fèi)者都能拿到消息

我們的計(jì)劃是這樣的:

  • 創(chuàng)建一個(gè)交換機(jī) itcast.fanout,類型是Fanout
  • 創(chuàng)建兩個(gè)隊(duì)列fanout.queue1和fanout.queue2,綁定到交換機(jī)itcast.fanout

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.4.1.聲明隊(duì)列和交換機(jī)

Spring提供了一個(gè)接口Exchange,來表示所有不同類型的交換機(jī):

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

在consumer中創(chuàng)建一個(gè)類,聲明隊(duì)列和交換機(jī):

package cn.whu.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //whu.fanout    交換機(jī)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("whu.fanout");
    }


    //fanout.queue1 隊(duì)列1
    @Bean
    public Queue fanoutQueue1() { // 注意1: 是ringframework.amqp.core.Queue;   2. 方法名稱是將來Bean的唯一id,別沖突了
        return new Queue("fanout.queue1");
    }

    // 綁定隊(duì)列1到交換機(jī)
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        //注意是springframework.amqp.core.Binding;
        //注意參數(shù)類型名稱不要寫錯(cuò)了  會影響注入
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }


    //fanout.queue2 隊(duì)列2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    // 綁定隊(duì)列2到交換機(jī)
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

重啟consumer微服務(wù),然后瀏覽器端查看
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

接下來就可以使用交換機(jī)了 (不需要知道隊(duì)列的存在,改由RabbitMQ使用隊(duì)列了)

消息的發(fā)送和接收是沒有變化的

3.4.2.消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

// 發(fā)布/訂閱-Fanout廣播方式: 發(fā)送給交換機(jī)   (交換機(jī)怎么發(fā)送給隊(duì)列由RabbitMQ管,程序員不需要管)
@Test
public void testSendFanoutExchange() {
    // 交換機(jī)名稱
    String exchangeName = "whu.fanout";
    // 消息
    String msg = "hello, every one!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "",msg);
    // 第二個(gè)參數(shù) routingKey先不用管了
}

3.4.3.消息接收

在consumer服務(wù)的SpringRabbitListener中添加兩個(gè)方法,作為消費(fèi)者:

@RabbitListener(queues = "fanout.queue1")
public void ListenFanoutQueue1(String msg){
    System.out.println("消費(fèi)者接收到fanout.queue1的消息【"+msg+"】");
}

@RabbitListener(queues = "fanout.queue2")
public void ListenFanoutQueue2(String msg){
    System.out.println("消費(fèi)者接收到fanout.queue2的消息【"+msg+"】");
}

重啟ConsumerApplication,然后運(yùn)行SpringAmqpTest.testSendFanoutExchange方法
可以看到運(yùn)行結(jié)果如下:說明消息發(fā)送者將消息發(fā)送給交換機(jī)后,交換機(jī)將一個(gè)消息廣播到所有綁定的隊(duì)列中了,然后兩個(gè)消費(fèi)者分別從2個(gè)隊(duì)列獲取這個(gè)消息,也就是同一個(gè)消息同時(shí)發(fā)送給了多個(gè)指定的微服務(wù) 實(shí)現(xiàn)了一次發(fā)送,多個(gè)消費(fèi)者都能接收了
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.4.4.總結(jié)

交換機(jī)的作用是什么?

  • 接收publisher發(fā)送的消息
  • 將消息按照規(guī)則路由到與之綁定的隊(duì)列
  • 不能緩存消息,路由失敗,消息丟失
  • FanoutExchange的會將消息路由到每個(gè)綁定的隊(duì)列

聲明隊(duì)列、交換機(jī)、綁定關(guān)系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

3.5.Direct

在Fanout模式中,一條消息,會被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。

  • 每個(gè)隊(duì)列綁定一個(gè)key
    RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
  • 每個(gè)隊(duì)列綁定多個(gè)key
    RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

在Direct模型下:

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會接收到消息

交換機(jī)綁定隊(duì)列時(shí)綁定關(guān)系上有一個(gè)key (或者多個(gè)key)
發(fā)送方發(fā)送消息時(shí),消息上也綁定了一個(gè)key
交換機(jī)路由消息時(shí),就根據(jù)key來路由 (哪個(gè)隊(duì)列的綁定關(guān)系上也有一模一樣的key,我就路由給哪個(gè)隊(duì)列, 當(dāng)然可以路由給多個(gè)) 專業(yè)說法:DirectExchange將消息發(fā)送給RountingKey和BindingKey一致的隊(duì)列

案例需求如下

  1. 利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽direct.queue1和direct.queue2

  3. 在publisher中編寫測試方法,向whu. direct發(fā)送消息

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.5.1.基于注解聲明隊(duì)列和交換機(jī)

基于@Bean的方式聲明隊(duì)列和交換機(jī)比較麻煩,Spring還提供了基于注解方式來聲明。
(可以直接用注解聲明交換機(jī),隊(duì)列名稱,綁定關(guān)系,綁定key)完全不需要寫一大堆的Bean
在consumer的SpringRabbitListener中添加兩個(gè)消費(fèi)者,同時(shí)基于注解來聲明隊(duì)列和交換機(jī):

// 聲明隊(duì)列: 同時(shí)聲明 交換機(jī) 綁定交換機(jī)  綁定key  (也即下面的隊(duì)列需要用到交換機(jī) 但是不用額外配置類寫交換機(jī)bean了)
// 注解寫法: 直接寫 bindings 里面就包含了上述所有信息
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "whu.direct",type = ExchangeTypes.DIRECT),
        key = {"blue","red"}
))
public void ListenDirectQueue1(String msg) {
    System.out.println("消費(fèi)者1接收到direct.queue1的消息【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "whu.direct",type = ExchangeTypes.DIRECT),
        key = {"yellow","red"}
))
public void ListenDirectQueue2(String msg) {
    System.out.println("消費(fèi)者2接收到direct.queue2的消息【" + msg + "】");
}

重啟ConsumerApplication,可以在瀏覽器端看到
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.5.2.消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

代碼和上面Fanout廣播差不多,唯一的區(qū)別就是這里需要指定第二個(gè)參數(shù)routingKey了

// 發(fā)布/訂閱-Direct定向方式: 發(fā)送給交換機(jī)   (交換機(jī)怎么發(fā)送給隊(duì)列RabbitMQ管,程序員不需要管)
@Test
public void testSendDirectExchange() {
    // 交換機(jī)名稱
    String exchangeName = "whu.direct";
    // 消息
    String msg = "hello, blue!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue",msg);
    // 第二個(gè)參數(shù) routingKey 綁定的key可以指定了 定向發(fā)送的唯一標(biāo)識
}

分別發(fā)送blue,yellow,red,共3次,消費(fèi)端運(yùn)行結(jié)果如下
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.5.3.總結(jié)

描述下Direct交換機(jī)與Fanout交換機(jī)的差異?

  • Fanout交換機(jī)將消息路由給每一個(gè)與之綁定的隊(duì)列
  • Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個(gè)隊(duì)列
  • 如果多個(gè)隊(duì)列具有相同的RoutingKey,則與Fanout功能類似

基于@RabbitListener注解聲明隊(duì)列和交換機(jī)有哪些常見注解?

  • @Queue
  • @Exchange

3.6.Topic

3.6.1.說明

Topic類型的ExchangeDirect相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!

Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

#:匹配一個(gè)或多個(gè)詞

*:匹配不多不少恰好1個(gè)詞

舉例:

item.#:能夠匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

?

圖示:

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

解釋:

  • Queue1:綁定的是china.# ,因此凡是以 china.開頭的routing key 都會被匹配到。包括china.news和china.weather
  • Queue2:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會被匹配。包括china.news和japan.news

案例需求:

實(shí)現(xiàn)思路如下:

  1. 并利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽topic.queue1和topic.queue2

  3. 在publisher中編寫測試方法,向whu. topic發(fā)送消息

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

3.6.2.消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測試方法:

// 發(fā)布/訂閱-Topic通配符方式: 發(fā)送給交換機(jī)   (交換機(jī)怎么發(fā)送給隊(duì)列RabbitMQ管,程序員不需要管)
@Test
public void testSendTopicExchange() {
    // 交換機(jī)名稱
    String exchangeName = "whu.topic";
    // 消息
    String msg = "武漢大學(xué)120周年校慶,現(xiàn)場驚現(xiàn)雷布斯!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news",msg);
    // routingKey 寫成.的形式  隊(duì)列那邊通配符匹配
}

3.6.3.消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

// Topic 通配符路由模式
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "whu.topic",type = ExchangeTypes.TOPIC),
        key = {"china.#"} // 所有china.開頭的都匹配  #表示任意多個(gè)單詞
))
public void ListenTopicQueue1(String msg) {
    System.out.println("消費(fèi)者1接收到topic.queue1的消息【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "whu.topic",type = ExchangeTypes.TOPIC),
        key = {"#.news"} // 所有以.news結(jié)尾的都匹配  #表示任意多個(gè)單詞
))
public void ListenTopicQueue2(String msg) {
    System.out.println("消費(fèi)者2接收到topic.queue2的消息【" + msg + "】");
}

重啟ConsumerApplication,然后執(zhí)行SpringAmqpTest.testSendTopicExchange
查看運(yùn)行結(jié)果:
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
確實(shí)china.new對于china.#和#.news 都能匹配上 所以都收到了

3.6.4.總結(jié)

描述下Direct交換機(jī)與Topic交換機(jī)的差異?

  • Topic交換機(jī)接收的消息RoutingKey必須是多個(gè)單詞,以 **.** 分割
  • Topic交換機(jī)與隊(duì)列綁定時(shí)的bindingKey可以指定通配符
  • #:代表0個(gè)或多個(gè)詞
  • *:代表1個(gè)詞

3.7.消息轉(zhuǎn)換器

之前說過,Spring會把你發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時(shí)候,還會把字節(jié)反序列化為Java對象。

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

只不過,默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 數(shù)據(jù)體積過大
  • 有安全漏洞
  • 可讀性差

我們來測試一下。

先在FanoutConfig里添加(聲明)一個(gè)新的隊(duì)列

// 測試序列化和反序列化
@Bean
public Queue objectQueue(){
    return new Queue("object.queue");
}

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

下面測試時(shí)就往這個(gè)新的隊(duì)列里發(fā)送

3.7.1.測試默認(rèn)轉(zhuǎn)換器

我們修改消息發(fā)送的代碼,發(fā)送一個(gè)Map對象:

// 測試消息的系列化和反序列化 (發(fā)送復(fù)雜一點(diǎn)的對象才能測出來)
@Test
public void testSendObjectQueue(){
    String queueName="object.queue";
    // 準(zhǔn)備消息
    Map<String, Object> map = new HashMap<>();
    map.put("name","張三");
    map.put("age",18);
	// 發(fā)送消息
    rabbitTemplate.convertAndSend(queueName, map);
}

停止consumer服務(wù)

發(fā)送消息后查看控制臺:因?yàn)闆]有寫消費(fèi)者,所以消息就一直會被留在隊(duì)列中

RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
亂碼了,控制臺查看不了消息
看下內(nèi)容類型: content_type: application/x-java-serialized-object
默認(rèn)是jdk的序列化

3.7.2.配置JSON轉(zhuǎn)換器(發(fā)送方:序列化器)

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher和consumer兩個(gè)服務(wù)中都引入依賴:(或者直接父工程中引入依賴)

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

配置消息轉(zhuǎn)換器。

在啟動類PublisherApplication中添加一個(gè)Bean即可:這里配置的是序列化器,發(fā)送時(shí)要用,因此在PublisherApplication啟動類里寫

@Bean  // 注意是amqp下的MessageConverter
public MessageConverter messageConverter() {
    // 返回的就是SpringMVC常使用的消息轉(zhuǎn)換器
    return new Jackson2JsonMessageConverter();
}

然后再次執(zhí)行上面的SpringAmqpTest.testSendObjectQueue測試方法
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java
果然現(xiàn)在的序列化結(jié)果好多了

3.7.3.配置JSON轉(zhuǎn)換器(接收方:反序列化器)

發(fā)送的json消息轉(zhuǎn)換器已經(jīng)配好了,接下來配置接收

在publisher和consumer兩個(gè)服務(wù)中都引入依賴:(或者直接父工程中引入依賴)

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

在接收方:ConsumerApplication啟動類中也配置一個(gè)相同的jackson消息轉(zhuǎn)換器,用相同方式反序列化呀

@Bean  // 注意是amqp包下的MessageConverter
public MessageConverter messageConverter() {
    // 返回的就是SpringMVC常使用的消息轉(zhuǎn)換器
    return new Jackson2JsonMessageConverter();
}

然后定義一個(gè)消費(fèi)者,監(jiān)聽object.queue隊(duì)列并消費(fèi)消息

@RabbitListener(queues = "object.queue") //監(jiān)聽object.queue隊(duì)列
public void listenObjectQueue(Map<String, Object> msg) {//參數(shù)和發(fā)送方發(fā)送的消息類型一模一樣
    System.out.println("收到消息:[" + msg + "]");
}

注意重啟ConsumerApplication后再執(zhí)行SpringAmqpTest.testSendObjectQueue發(fā)送消息
運(yùn)行結(jié)果如下:完全ok啦
RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java文章來源地址http://www.zghlxwxcb.cn/news/detail-672928.html

  • 小結(jié)
    RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器),分布式,java-rabbitmq,rabbitmq,java

到了這里,關(guān)于RabbitMQ-同步和異步通訊、安裝和入門案例、SpringAMQP(5個(gè)消息發(fā)送接收Demo,jackson消息轉(zhuǎn)換器)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 服務(wù)異步通訊——RabbitMQ

    服務(wù)異步通訊——RabbitMQ

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊 :就像打電話,需要實(shí)時(shí)響應(yīng)。 異步通訊 :就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是一個(gè)人卻不能跟多個(gè)人同時(shí)通話。而發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會有延

    2024年01月18日
    瀏覽(21)
  • 微服務(wù)——服務(wù)異步通訊RabbitMQ

    微服務(wù)——服務(wù)異步通訊RabbitMQ

    ?前置文章 消息隊(duì)列——RabbitMQ基本概念+容器化部署和簡單工作模式程序_北嶺山腳鼠鼠的博客-CSDN博客 消息隊(duì)列——rabbitmq的不同工作模式_北嶺山腳鼠鼠的博客-CSDN博客 消息隊(duì)列——spring和springboot整合rabbitmq_北嶺山腳鼠鼠的博客-CSDN博客 目錄 Work queues 工作隊(duì)列模式? 案例

    2024年02月15日
    瀏覽(22)
  • RabbitMQ入門 消息隊(duì)列快速入門 SpringAMQP WorkQueue 隊(duì)列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    RabbitMQ入門 消息隊(duì)列快速入門 SpringAMQP WorkQueue 隊(duì)列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實(shí)時(shí)響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會有延遲。 1.

    2024年04月08日
    瀏覽(19)
  • SpringCloud學(xué)習(xí)路線(9)——服務(wù)異步通訊RabbitMQ

    SpringCloud學(xué)習(xí)路線(9)——服務(wù)異步通訊RabbitMQ

    一、初見MQ (一)什么是MQ? MQ(MessageQueue) ,意思是 消息隊(duì)列 ,也就是事件驅(qū)動架構(gòu)中的Broker。 (二)同步調(diào)用 1、概念: 同步調(diào)用是指,某一服務(wù)需要多個(gè)服務(wù)共同參與,但多個(gè)服務(wù)之間有一定的執(zhí)行順序,當(dāng)每一個(gè)服務(wù)都需要等待前面一個(gè)服務(wù)完成才能繼續(xù)執(zhí)行。

    2024年02月15日
    瀏覽(20)
  • Java分布式微服務(wù)4——異步服務(wù)通訊(RabbitMQ)中間件

    Java分布式微服務(wù)4——異步服務(wù)通訊(RabbitMQ)中間件

    為什么需要異步調(diào)用? 故障隔離 :支付服務(wù)不負(fù)責(zé)調(diào)用其他三個(gè)服務(wù),只負(fù)責(zé)通知Broker支付成功這個(gè)事件,然后就返回結(jié)果,后面的服務(wù)故障了和前面發(fā)布事件的服務(wù)無關(guān),前面的服務(wù)發(fā)布完事件就結(jié)束了 吞吐量提升 :Broker將支付成功的事件廣播給訂閱了這個(gè)事件的那些服

    2024年02月13日
    瀏覽(28)
  • ElasticSearch - 在 微服務(wù)項(xiàng)目 中基于 RabbitMQ 實(shí)現(xiàn) ES 和 MySQL 數(shù)據(jù)異步同步(考點(diǎn))

    ElasticSearch - 在 微服務(wù)項(xiàng)目 中基于 RabbitMQ 實(shí)現(xiàn) ES 和 MySQL 數(shù)據(jù)異步同步(考點(diǎn))

    目錄 一、數(shù)據(jù)同步 1.1、什么是數(shù)據(jù)同步 1.2、解決數(shù)據(jù)同步面臨的問題 1.3、解決辦法 1.3.1、同步調(diào)用 1.3.2、異步通知(推薦) 1.3.3、監(jiān)聽 binlog 1.3、基于 RabbitMQ 實(shí)現(xiàn)數(shù)據(jù)同步 1.3.1、需求 1.3.2、在“酒店搜索服務(wù)”中 聲明 exchange、queue、routingKey,同時(shí)開啟監(jiān)聽 1.3.3、在“酒店

    2024年02月08日
    瀏覽(31)
  • kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)

    引入依賴 回調(diào)函數(shù)會在producer收到ack時(shí)調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果Exception為null,說明信息發(fā)送失敗 注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。 只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get(

    2024年02月11日
    瀏覽(29)
  • RabbitMQ從入門到精通之安裝、通訊方式詳解

    RabbitMQ從入門到精通之安裝、通訊方式詳解

    1.1 現(xiàn)存問題 服務(wù)異步調(diào)用: 服務(wù)A如何保證異步請求一定能被服務(wù)B接收到并處理 削峰: 海量請求,如何實(shí)現(xiàn)削峰的效果,將請求全部放到一個(gè)隊(duì)列中,慢慢的消費(fèi),這個(gè)隊(duì)列怎么實(shí)現(xiàn)? 服務(wù)解耦: 如何盡量的降低服務(wù)之間的耦合問題,如果在訂單與積分和商家服務(wù)解構(gòu),需

    2024年02月09日
    瀏覽(14)
  • RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)

    RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)

    提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 SpringBoot 已經(jīng)提供了對 AMQP 協(xié)議完全支持的 spring-boot-starter-amqp 依賴,引入此依賴即可快速方便的在 SpringBoot 中使用 RabbitMQ。 https://spring.io/projects/spring-amqp RabbitTemplate 是 SpringBoot AMQP 提供的快速發(fā) Rabbit

    2024年03月21日
    瀏覽(22)
  • Ajax_4(進(jìn)階)同步異步+ 宏任務(wù)微任務(wù) + Promise鏈 + async終極解決方案 +事件循環(huán)原理 + 綜合案例

    01-同步代碼和異步代碼 什么是同步代碼? 同步代碼:逐行執(zhí)行,需要原地等待結(jié)果后,才繼續(xù)向下執(zhí)行。 什么是異步代碼? 調(diào)用后耗時(shí),不阻塞代碼繼續(xù)執(zhí)行,(不必原地等待),在將來完成后 觸發(fā)一個(gè) 回調(diào)函數(shù) 。 代碼閱讀 目標(biāo):閱讀并回答代碼執(zhí)行和打印的順序 打印

    2024年02月13日
    瀏覽(51)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包