国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

(三)RabbitMQ七種模式介紹與代碼演示

這篇具有很好參考價值的文章主要介紹了(三)RabbitMQ七種模式介紹與代碼演示。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Lison <dreamlison@163.com>, v1.0.0, 2023.06.22

七種模式介紹與代碼演示

四大交換機

交換機概念

  • 交換機可以理解成具有路由表的路由程序,僅此而已。每個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字符串。
  • 最新版本的RabbitMQ有四種交換機類型,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
  • 交換機的作用: 生產(chǎn)者向broker(rabbitmq服務器)發(fā)送消息,交換機通過生產(chǎn)者綁定的路由鍵,將消息推送到不同的消息隊列中。而消費者,只綁定隊列,從隊列中獲取消息。
  • 以上三種類型的發(fā)送精準順序為:點對點類型 > 通配符類型 > 廣播類型

四種交換機介紹

  • 直連交換機(Direct exchange): 具有路由功能的交換機,綁定到此交換機的時候需要指定一個routing_key,交換機發(fā)送消息的時候需要routing_key,會將消息發(fā)送道對應的隊列
  • 扇形交換機(Fanout exchange): 廣播消息到所有隊列,沒有任何處理,速度最快
  • 主題交換機(Topic exchange): 在直連交換機基礎上增加模式匹配,也就是對routing_key進行模式匹配,*代表一個單詞,#代表多個單詞
  • 首部交換機(Headers exchange): 忽略routing_key,使用Headers信息(一個Hash的數(shù)據(jù)結(jié)構)進行匹配,優(yōu)勢在于可以有更多更靈活的匹配規(guī)則

交換機參數(shù)

  1. name: 交換機名稱
  2. type: 交換機類型 direct,topic,fanout,headers
  3. durability: 是否需要持久化,true 為持久化
  4. auto delete: 當最后一個綁定到 Exchange 上的隊列被刪除后,Exchange 就沒有綁定的隊列了,自動刪除該 Exchange
  5. internal: 當前 Exchange 是否為內(nèi)部交換機,默認為 false,客戶端無法直接發(fā)送消息到這個交換機中,只能通過交換機路由到交換機這種方式
  6. arguments: 擴展參數(shù),用于擴展 AMQP 協(xié)議自制定化使用

工作模式

簡單模式(Hello World)

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

做最簡單的事情,一個生產(chǎn)者對應一個消費者,RabbitMQ相當于一個消息代理,負責將A的消息轉(zhuǎn)發(fā)給B

應用場景: 將發(fā)送的電子郵件放到消息隊列,然后郵件服務在隊列中獲取郵件并發(fā)送給收件人

工作隊列模式(Work queues)

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

在多個消費者之間分配任務(競爭的消費者模式),一個生產(chǎn)者對應多個消費者,一般適用于執(zhí)行資源密集型任務,單個消費者處理不過來,需要多個消費者進行處理

應用場景: 一個訂單的處理需要10s,有多個訂單可以同時放到消息隊列,然后讓多個消費者同時處理,這樣就是并行了,而不是單個消費者的串行情況

訂閱模式(Publish/Subscribe)

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

一次向許多消費者發(fā)送消息,一個生產(chǎn)者發(fā)送的消息會被多個消費者獲取,也就是將消息將廣播到所有的消費者中。

應用場景: 更新商品庫存后需要通知多個緩存和多個數(shù)據(jù)庫,這里的結(jié)構應該是:

  • 一個fanout類型交換機扇出兩個個消息隊列,分別為緩存消息隊列、數(shù)據(jù)庫消息隊列
  • 一個緩存消息隊列對應著多個緩存消費者
  • 一個數(shù)據(jù)庫消息隊列對應著多個數(shù)據(jù)庫消費者

路由模式(Routing)

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

有選擇地(Routing key)接收消息,發(fā)送消息到交換機并且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key,僅消費指定路由key的消息

應用場景: 如在商品庫存中增加了1臺iphone12,iphone12促銷活動消費者指定routing key為iphone12,只有此促銷活動會接收到消息,其它促銷活動不關心也不會消費此routing key的消息

主題模式(Topics)

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

根據(jù)主題(Topics)來接收消息,將路由key和某模式進行匹配,此時隊列需要綁定在一個模式上,#匹配一個詞或多個詞,*只匹配一個詞。

應用場景: 同上,iphone促銷活動可以接收主題為iphone的消息,如iphone12、iphone13等

遠程過程調(diào)用(RPC)

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

如果我們需要在遠程計算機上運行功能并等待結(jié)果就可以使用RPC,具體流程可以看圖。應用場景:需要等待接口返回數(shù)據(jù),如訂單支付

發(fā)布者確認(Publisher Confirms)

與發(fā)布者進行可靠的發(fā)布確認,發(fā)布者確認是RabbitMQ擴展,可以實現(xiàn)可靠的發(fā)布。在通道上啟用發(fā)布者確認后,RabbitMQ將異步確認發(fā)送者發(fā)布的消息,這意味著它們已在服務器端處理。

應用場景: 對于消息可靠性要求較高,比如錢包扣款

代碼演示

代碼中沒有對后面兩種模式演示,有興趣可以自己研究

簡單模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class SimpleQueueSender {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        // queue:隊列名
        // durable:是否持久化
        // exclusive:是否排外  即只允許該channel訪問該隊列   一般等于true的話用于一個隊列只能有一個消費者來消費的場景
        // autoDelete:是否自動刪除  消費完刪除
        // arguments:其他屬性
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //消息內(nèi)容
        String message = "simplest mode message";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("[x]Sent '" + message + "'");

        //最后關閉通關和連接
        channel.close();
        connection.close();

    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class SimpleQueueReceiver {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        // 獲取連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received  '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });

        System.out.println("----");
    }
}

工作隊列模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class QueueWorkReceiver1 {

    private final static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
          // 獲取連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一時刻服務器只會發(fā)送一條消息給消費者
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });

    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class QueueWorkReceiver2 {

    private final static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
          // 獲取連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一時刻服務器只會發(fā)送一條消息給消費者
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });

    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class QueueWorkSender {

    private final static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 100; i++) {
            String message = "work mode message" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

發(fā)布訂閱模式

package com.lison.upgrade.middleware.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class PublishSubscribeReceive1 {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 訂閱消息的回調(diào)函數(shù)
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };

        // 消費者,有消息時出發(fā)訂閱回調(diào)函數(shù)
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
package com.lison.upgrade.middleware.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class PublishSubscribeReceive2 {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 訂閱消息的回調(diào)函數(shù)
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received2 '" + message + "'");
        };

        // 消費者,有消息時出發(fā)訂閱回調(diào)函數(shù)
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PublishSubscribeSender {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = "publish subscribe message";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

路由模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class QueueRoutingReceiver1 {

    private final static String QUEUE_NAME = "queue_routing";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 指定路由的key,接收key和key2
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }

}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class QueueRoutingReceiver2 {

    private final static String QUEUE_NAME = "queue_routing2";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();


        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 僅接收key2
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });


    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class QueueRoutingSender {

    private final static String EXCHANGE_NAME = "exchange_direct";
    private final static String EXCHANGE_TYPE = "direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 交換機聲明
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

        // 只有routingKey相同的才會消費
        String message = "routing mode message";
        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
        System.out.println("[x] Sent '" + message + "'");
//        channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());
//        System.out.println("[x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

主題模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Receiver1 {

    private final static String QUEUE_NAME = "queue_topic";
    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 可以接收key.1
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver2 {

    private final static String QUEUE_NAME = "queue_topic2";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // *號代表單個單詞,可以接收key.1
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
        // #號代表多個單詞,可以接收key.1.2
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.#");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Sender {

    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

        String message = "topics model message with key.1";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println("[x] Sent '" + message + "'");
        String message2 = "topics model message with key.1.2";
        channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes());
        System.out.println("[x] Sent '" + message2 + "'");

        channel.close();
        connection.close();
    }
}

SpringBoot整合RabbitMQ

引入依賴基礎配置

1、創(chuàng)建SpringBoot項目,引入RabbitMQ起步依賴

<!-- RabbitMQ起步依賴 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、寫配置文件

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 123456
   virtual-host: /
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'

SpringBoot整合RabbitMQ時,需要在配置類創(chuàng)建隊列和交換機,寫法如下:

@Configuration
public class RabbitConfig {
    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";
    // 創(chuàng)建交換機
    @Bean("bootExchange")
    public Exchange getExchange() {
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME) // 交換機類型
               .durable(true) // 是否持久化
               .build();
          }
    // 創(chuàng)建隊列
    @Bean("bootQueue")
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 隊列名
   }
    // 交換機綁定隊列
    @Bean
    public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("#.message.#")
               .noargs();
   }
}



生產(chǎn)者

SpringBoot整合RabbitMQ時,提供了工具類RabbitTemplate發(fā)送消息,編寫生產(chǎn)者時只需要注入RabbitTemplate即可發(fā)送消息。

@SpringBootTest
public class TestProducer {
    // 注入RabbitTemplate工具類
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage(){
        /**
         * 發(fā)送消息
         * 參數(shù)1:交換機
         * 參數(shù)2:路由key
         * 參數(shù)3:要發(fā)送的消息
         */
      rabbitTemplate.convertAndSend("boot_topic_exchange","message","這個是生產(chǎn)者發(fā)送消息!");
   }
}

(三)RabbitMQ七種模式介紹與代碼演示,中間件組件實戰(zhàn)應用,# RabbitMq,rabbitmq,分布式,中間件

消費者

@Component
public class Consumer {
    // 監(jiān)聽隊列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("發(fā)送短信:"+message);
   }
}

啟動項目,可以看到消費者會消費隊列中的消息文章來源地址http://www.zghlxwxcb.cn/news/detail-607151.html

到了這里,關于(三)RabbitMQ七種模式介紹與代碼演示的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 【云原生進階之PaaS中間件】第四章RabbitMQ-1-簡介及工作模式

    【云原生進階之PaaS中間件】第四章RabbitMQ-1-簡介及工作模式

    ????????RabbitMQ 是一個由 Erlang 語言開發(fā)的 AMQP 的開源實現(xiàn)。AMQP(Advanced Message Queue:高級消息隊列協(xié)議)它是應用層協(xié)議的一個開放標準,為面向消息的中間件設計,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。RabbitMQ 最初起源于

    2024年02月21日
    瀏覽(92)
  • 消息中間件學習筆記--RabbitMQ(二、模式,一次違反常規(guī)的Java大廠面試經(jīng)歷

    消息中間件學習筆記--RabbitMQ(二、模式,一次違反常規(guī)的Java大廠面試經(jīng)歷

    .Fanout:轉(zhuǎn)發(fā)消息到所有綁定隊列 比較常用的是Direct、Topic、Fanout. Fanout 這種Fanout模式不處理路由鍵,只·需要簡單的將隊列綁定到exchange上,一個發(fā)送到exchange的消息都會被轉(zhuǎn)發(fā)到與該exchange綁定的所有隊列上。很像廣播子網(wǎng),每臺子網(wǎng)內(nèi)的主機都獲得了一份復制的消息。Fan

    2024年04月09日
    瀏覽(29)
  • 消息中間件RabbitMQ

    消息中間件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,從字面意思上看,本質(zhì)是個隊列,F(xiàn)IFO 先入先出,只不過隊列中存放的內(nèi)容是message 而已,還是一種跨進程的通信機制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構中,MQ 是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務。使用了 MQ 之后,

    2024年01月17日
    瀏覽(104)
  • RabbitMQ消息中間件

    RabbitMQ消息中間件 RabbitMQ簡介 windows下安裝RabbitMQ RabbitMQ基本概念 RabbitMQ簡單模式 RabbitMQ工作隊列模式 RabbitMQ發(fā)布訂閱模式 RabbitMQ路由模式 RabbitMQ主題模式 RabbitMQ RPC模式 RabbitMQ發(fā)布確認模式

    2024年02月10日
    瀏覽(104)
  • 消息中間件之RabbitMQ

    消息中間件之RabbitMQ

    1.基于AMQP協(xié)議Erlang語言開發(fā)的一款消息中間件,客戶端語言支持比較多, 比如Python,Java,Ruby,PHP,JS,Swift.運維簡單,靈活路由,但是性能不高, 可以滿足一般場景下的業(yè)務需要,三高場景下吞吐量不高,消息持久化沒有采取 零拷貝技術,消息堆積時,性能會下降 2.消息吞吐量在

    2024年01月19日
    瀏覽(22)
  • 消息中間件RabbitMQ詳解

    消息中間件RabbitMQ詳解

    消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環(huán)境下擴展進程間的通信。 消息中間件適用于需要可靠的數(shù)據(jù)傳送的分布式環(huán)境。采用消息中間件機制的系統(tǒng)中

    2024年02月16日
    瀏覽(104)
  • 【JAVA開發(fā)面試】如何處理并發(fā)訪問如何進行代碼的單元測試Java多線程編程消息中間件設計模式技術難題是如何解決的

    【 點我-這里送書 】 本人詳解 作者:王文峰,參加過 CSDN 2020年度博客之星,《Java王大師王天師》 公眾號:JAVA開發(fā)王大師,專注于天道酬勤的 Java 開發(fā)問題 中國國學、傳統(tǒng)文化和代碼愛好者的程序人生,期待你的關注和支持!本人外號:神秘小峯 山峯 轉(zhuǎn)載說明:務必注明

    2024年02月03日
    瀏覽(131)
  • RabbitMQ 消息中間件 消息隊列

    RabbitMQ 消息中間件 消息隊列

    RabbitMQ 1、RabbitMQ簡介 RabbiMQ是?Erang開發(fā)的,集群?常?便,因為Erlang天?就是??分布式語?,但其本身并 不?持負載均衡。支持高并發(fā),支持可擴展。支持AJAX,持久化,用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。 2、RabbitMQ 特點 可

    2024年02月03日
    瀏覽(93)
  • 高性能消息中間件 RabbitMQ

    高性能消息中間件 RabbitMQ

    消息隊列 MQ全稱Message Queue(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于系統(tǒng)之間的 異步通信 。 同步通信相當于兩個人當面對話,你一言我一語。必須及時回復: 異步通信相當于通過第三方轉(zhuǎn)述對話,可能有消息的延遲,但不需要二人時刻保持聯(lián)系。 消息

    2024年02月11日
    瀏覽(32)
  • Springboot整合RabbitMQ消息中間件

    spring-boot-rabbitmq–消息中間件整合 前言:RabbitMQ的各種交換機說明 1、直連交換機 生產(chǎn)者發(fā)布消息時必須帶著routing-key,隊列綁定到交換機時必須指定binding-key ,且routing-key和binding-key必須完全相同,如此才能將消息路由到隊列中 直連交換機通常用來循環(huán)分發(fā)任務給多個workers,

    2024年02月11日
    瀏覽(33)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包