MQ
同步通訊
- 優(yōu)點(diǎn):
- 時(shí)效性強(qiáng), 可以立即得到結(jié)果
- 缺點(diǎn):
- 耦合度高
- 性能和吞吐能力下降
- 有額外的資源消耗
- 有級聯(lián)失敗問題
異步通訊
- 優(yōu)點(diǎn):
- 耦合度低
- 吞吐量提升
- 故障隔離
- 流量削峰
- 缺點(diǎn):
- 依賴于Broker的可靠性, 安全性, 吞吐能力
- 架構(gòu)復(fù)雜, 業(yè)務(wù)沒有明顯的流程線, 不好追蹤管理
mq常見技術(shù)
RabbitMq | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | 阿里 | Apache |
開發(fā)語言 | Erlang | java | java | Scala&java |
協(xié)議支持 | AMQP, XMPP, SMTP, STOMP | OpenWire, STOMP, REST, XMPP, AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機(jī)吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ
下載安裝
- docker下載
docker pull rabbitmq:3-management
- docker運(yùn)行
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
介紹
- channel: 操作MQ的工具
- exchange: 路由消息到隊(duì)列中
- queue: 緩存消息
- virtual host: 虛擬主機(jī), 是對queue, exchange等資源的邏輯分組
SimpleQueue模型
- publisher
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
factory.setHost("192.168.174.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創(chuàng)建通道Channel
Channel channel = connection.createChannel();
// 3.創(chuàng)建隊(duì)列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發(fā)送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發(fā)送消息成功:【" + message + "】");
// 5.關(guān)閉通道和連接
channel.close();
connection.close();
}
- consumer
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號、vhost、用戶名、密碼
factory.setHost("192.168.174.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創(chuàng)建通道Channel
Channel channel = connection.createChannel();
// 3.創(chuàng)建隊(duì)列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
SpringAMQP
介紹
- AMQP: 應(yīng)用間消息通信的一種協(xié)議, 與語言和平臺無關(guān).
- 依賴:
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
SimpleQueue模型
- 配置
spring: rabbitmq: host: 192.168.174.133 # 主機(jī)名S port: 5672 # 端口 virtual-host: / # 虛擬主機(jī) username: itcast # 用戶名 password: 123321 # 密碼
- publisher
@SpringBootTest @RunWith(SpringRunner.class) public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
- comsumer
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { System.out.println("spring 消費(fèi)者接收到消息: ["+msg+"]"); } }
WorkQueue模型
- publisher
@Test public void testWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, spring amqp!"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend(queueName, message); Thread.sleep(20); } }
- consumer
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消費(fèi)者1接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消費(fèi)者2接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(200); }
- 配置
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能獲取下一消息, 處理完成才能獲取下一個(gè)消息
- work模型總結(jié):
- 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列, 同一條消息只會被一個(gè)消費(fèi)者處理
- consumer會預(yù)取消息, 會導(dǎo)致性能差的consumer堆積消息, 可以通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量.
發(fā)布訂閱模型
介紹
- 發(fā)布訂閱模式與之前案例的區(qū)別就是允許將同一消息發(fā)送給多個(gè)消費(fèi)者, 實(shí)現(xiàn)方式就是加入了exchange(交換機(jī)).
- 常見exchange類型:
- Fanout: 廣播
- Direct: 路由
- Topic: 話題
FanoutExchange
- config
@Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("itcast.fanout"); } @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
- consumer
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) throws InterruptedException { System.out.println("消費(fèi)者1接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) throws InterruptedException { System.err.println("消費(fèi)者2接收到消息: ["+msg+"]"+ LocalTime.now()); Thread.sleep(200); }
- publisher
@Test public void testSendFanoutExchange() { // 交換機(jī)名稱 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, every one!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
- 總結(jié):
- 交換機(jī)的作用
- 接收publisher發(fā)送的消息
- 將消息按照規(guī)則路由到與之綁定的隊(duì)列
- 不能緩存消息, 路由失敗, 消息丟失
- FanoutExchange的會將消息路由到每個(gè)綁定的隊(duì)列
- 聲明隊(duì)列的Bean: Queue
- 聲明交換機(jī)的Bean: FanoutExchange
- 聲明綁定關(guān)系的Bean: Binding
- 交換機(jī)的作用
Direct Exchange
- 介紹
- 每一個(gè)Queue都與Exchange設(shè)置一個(gè)BindingKey
- 發(fā)布者發(fā)送消息時(shí), 指定消息的RoutingKey
- Exchange將消息路由到BindingKey與消息RoutingKey一致的隊(duì)列
- consumer
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg) throws InterruptedException { System.out.println("消費(fèi)者1接收到消息: ["+msg+"]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg) throws InterruptedException { System.err.println("消費(fèi)者2接收到消息: ["+msg+"]"); }
- publisher
@Test public void testSendDirectExchange() { // 交換機(jī)名稱 String exchangeName = "itcast.direct"; // 消息 String message = "hello, every blue!"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); }
Topic Exchange
- consumer
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg) throws InterruptedException { System.out.println("消費(fèi)者1接收到消息: ["+msg+"]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg) throws InterruptedException { System.err.println("消費(fèi)者2接收到消息: ["+msg+"]"); }
- publisher
@Test public void testSendTopicExchange() { // 交換機(jī)名稱 String exchangeName = "itcast.topic"; // 消息 String message = "hello, china.news"; // 發(fā)送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
- 總結(jié)
- TopicExchange與DirectExchange類似, 區(qū)別在于routingKey必須是多個(gè)單詞的列表, 并且以
.
分割.
- TopicExchange與DirectExchange類似, 區(qū)別在于routingKey必須是多個(gè)單詞的列表, 并且以
消息轉(zhuǎn)換器
- 依賴
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
- MessageConverter
@Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); }
- publisher
@Test public void testSendObjectQueue() { Map<String, Object> msg = new HashMap<>(); msg.put("name", "柳巖"); msg.put("age", 21); rabbitTemplate.convertAndSend("object.queue", msg); }
- consumer
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String, Object> msg) { System.out.println("接收到object.queue的消息: "+msg); }
來源
黑馬程序員. SpringCloud微服務(wù)文章來源地址http://www.zghlxwxcb.cn/news/detail-602873.html
文章來源:http://www.zghlxwxcb.cn/news/detail-602873.html
到了這里,關(guān)于JavaWeb_SpringCloud微服務(wù)_Day4-MQ, RabbitMQ, SpringAMQP的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!