国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

JavaWeb_SpringCloud微服務(wù)_Day4-MQ, RabbitMQ, SpringAMQP

這篇具有很好參考價(jià)值的文章主要介紹了JavaWeb_SpringCloud微服務(wù)_Day4-MQ, RabbitMQ, SpringAMQP。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

MQ

同步通訊

  • 優(yōu)點(diǎn):
    • 時(shí)效性強(qiáng), 可以立即得到結(jié)果
  • 缺點(diǎn):
    • 耦合度高
    • 性能和吞吐能力下降
    • 有額外的資源消耗
    • 有級聯(lián)失敗問題

異步通訊

  • 優(yōu)點(diǎn):
    • 耦合度低
    • 吞吐量提升
    • 故障隔離
    • 流量削峰
  • 缺點(diǎn):
    • 依賴于Broker的可靠性, 安全性, 吞吐能力
    • 架構(gòu)復(fù)雜, 業(yè)務(wù)沒有明顯的流程線, 不好追蹤管理

mq常見技術(shù)

RabbitMq ActiveMQ RocketMQ Kafka
公司/社區(qū) Rabbit Apache 阿里 Apache
開發(fā)語言 Erlang java java Scala&java
協(xié)議支持 AMQP, XMPP, SMTP, STOMP OpenWire, STOMP, REST, XMPP, AMQP 自定義協(xié)議 自定義協(xié)議
可用性 一般
單機(jī)吞吐量 一般 非常高
消息延遲 微秒級 毫秒級 毫秒級 毫秒以內(nèi)
消息可靠性 一般 一般

RabbitMQ

下載安裝

  • docker下載
docker pull rabbitmq:3-management
  • docker運(yùn)行
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

介紹

  • channel: 操作MQ的工具
  • exchange: 路由消息到隊(duì)列中
  • queue: 緩存消息
  • virtual host: 虛擬主機(jī), 是對queue, exchange等資源的邏輯分組

SimpleQueue模型

  • publisher
public void testSendMessage() throws IOException, TimeoutException {
    // 1.建立連接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
    factory.setHost("192.168.174.133");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123321");
    // 1.2.建立連接
    Connection connection = factory.newConnection();

    // 2.創(chuàng)建通道Channel
    Channel channel = connection.createChannel();

    // 3.創(chuàng)建隊(duì)列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.發(fā)送消息
    String message = "hello, rabbitmq!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("發(fā)送消息成功:【" + message + "】");

    // 5.關(guān)閉通道和連接
    channel.close();
    connection.close();
}
  • consumer
public static void main(String[] args) throws IOException, TimeoutException {
    // 1.建立連接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
    factory.setHost("192.168.174.133");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123321");
    // 1.2.建立連接
    Connection connection = factory.newConnection();

    // 2.創(chuàng)建通道Channel
    Channel channel = connection.createChannel();

    // 3.創(chuàng)建隊(duì)列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.訂閱消息
    channel.basicConsume(queueName, true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                    AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 5.處理消息
            String message = new String(body);
            System.out.println("接收到消息:【" + message + "】");
        }
    });
    System.out.println("等待接收消息。。。。");
}

SpringAMQP

介紹

  • AMQP: 應(yīng)用間消息通信的一種協(xié)議, 與語言和平臺無關(guān).
  • 依賴:
    <!--AMQP依賴,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

SimpleQueue模型

  • 配置
    spring:
    rabbitmq:
        host: 192.168.174.133 # 主機(jī)名S
        port: 5672 # 端口
        virtual-host: / # 虛擬主機(jī)
        username: itcast # 用戶名
        password: 123321 # 密碼
    
  • publisher
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue()
        {
            String queueName = "simple.queue";
            String message = "hello, spring amqp!";
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    
  • comsumer
    @Component
    public class SpringRabbitListener {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg)
        {
            System.out.println("spring 消費(fèi)者接收到消息: ["+msg+"]");
        }
    }
    

WorkQueue模型

  • publisher
    @Test
    public void testWorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message);
            Thread.sleep(20);
        }
    }
    
  • consumer
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消費(fèi)者1接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消費(fèi)者2接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(200);
    }
    
  • 配置
    spring:
    rabbitmq:
        listener:
        simple:
            prefetch: 1 # 每次只能獲取下一消息, 處理完成才能獲取下一個(gè)消息
    
  • work模型總結(jié):
    • 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列, 同一條消息只會被一個(gè)消費(fèi)者處理
    • consumer會預(yù)取消息, 會導(dǎo)致性能差的consumer堆積消息, 可以通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量.

發(fā)布訂閱模型

介紹

  • 發(fā)布訂閱模式與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個(gè)消費(fèi)者, 實(shí)現(xiàn)方式就是加入了exchange(交換機(jī)).
  • 常見exchange類型:
    • Fanout: 廣播
    • Direct: 路由
    • Topic: 話題

FanoutExchange

  • config
    @Configuration
    public class FanoutConfig {
    
        @Bean
        public FanoutExchange fanoutExchange()
        {
            return new FanoutExchange("itcast.fanout");
        }
    
        @Bean
        public Queue fanoutQueue1()
        {
            return new Queue("fanout.queue1");
        }
    
        @Bean
        public Queue fanoutQueue2()
        {
            return new Queue("fanout.queue2");
        }
    
        @Bean
        public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange)
        {
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
        @Bean
        public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange)
        {
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
    
  • consumer
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消費(fèi)者1接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.err.println("消費(fèi)者2接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(200);
    }
    
  • publisher
    @Test
    public void testSendFanoutExchange()
    {
        // 交換機(jī)名稱
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, every one!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
    
  • 總結(jié):
    • 交換機(jī)的作用
      • 接收publisher發(fā)送的消息
      • 將消息按照規(guī)則路由到與之綁定的隊(duì)列
      • 不能緩存消息, 路由失敗, 消息丟失
      • FanoutExchange的會將消息路由到每個(gè)綁定的隊(duì)列
    • 聲明隊(duì)列的Bean: Queue
    • 聲明交換機(jī)的Bean: FanoutExchange
    • 聲明綁定關(guān)系的Bean: Binding

Direct Exchange

  • 介紹
    • 每一個(gè)Queue都與Exchange設(shè)置一個(gè)BindingKey
    • 發(fā)布者發(fā)送消息時(shí), 指定消息的RoutingKey
    • Exchange將消息路由到BindingKey與消息RoutingKey一致的隊(duì)列
  • consumer
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) throws InterruptedException {
        System.out.println("消費(fèi)者1接收到消息: ["+msg+"]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) throws InterruptedException {
        System.err.println("消費(fèi)者2接收到消息: ["+msg+"]");
    }
    
  • publisher
    @Test
    public void testSendDirectExchange()
    {
        // 交換機(jī)名稱
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, every blue!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
    

Topic Exchange

  • consumer
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg) throws InterruptedException {
        System.out.println("消費(fèi)者1接收到消息: ["+msg+"]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.err.println("消費(fèi)者2接收到消息: ["+msg+"]");
    }
    
  • publisher
    @Test
    public void testSendTopicExchange()
    {
        // 交換機(jī)名稱
        String exchangeName = "itcast.topic";
        // 消息
        String message = "hello, china.news";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
    
  • 總結(jié)
    • TopicExchange與DirectExchange類似, 區(qū)別在于routingKey必須是多個(gè)單詞的列表, 并且以.分割.

消息轉(zhuǎn)換器

  • 依賴
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
  • MessageConverter
    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }
    
  • publisher
    @Test
    public void testSendObjectQueue()
    {
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "柳巖");
        msg.put("age", 21);
        rabbitTemplate.convertAndSend("object.queue", msg);
    }
    
  • consumer
    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String, Object> msg)
    {
        System.out.println("接收到object.queue的消息: "+msg);
    }
    

來源

黑馬程序員. SpringCloud微服務(wù)文章來源地址http://www.zghlxwxcb.cn/news/detail-602873.html

到了這里,關(guān)于JavaWeb_SpringCloud微服務(wù)_Day4-MQ, RabbitMQ, SpringAMQP的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包