Spring整合RabbitMQ
spring-amqp是對AMQP的一些概念的一些抽象,spring-rabbit是對RabbitMQ操作的封裝實現(xiàn)。
主要有幾個核心類 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等。
RabbitAdmin 類完成對Exchange,Queue,Binding的操作,在容器中管理了 RabbitAdmin 類的時候,可以對Exchange,Queue,Binding進行自動聲明。
RabbitTemplate 類是發(fā)送和接收消息的工具類。
SimpleMessageListenerContainer 是消費消息的容器。
目前比較新的一些項目都會選擇基于注解方式,而比較老的一些項目可能還是基于配置文件的。
基于配置文件的整合
-
創(chuàng)建maven項目文章來源:http://www.zghlxwxcb.cn/news/detail-814761.html
-
配置pom.xml,添加rabbit的spring依賴文章來源地址http://www.zghlxwxcb.cn/news/detail-814761.html
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
</dependencies>
- rabbit-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory
id="connectionFactory"
host="node1"
virtual-host="/"
username="root"
password="123456"
port="5672"/>
<!--創(chuàng)建一個rabbit的 template對象 (org.springframework.amqp.rabbit.core.RabbitTemplate), 以便于訪問broker-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- 自動查找類型是Queue、Exchange、Binding的bean,并為用戶向 RabbitMQ聲明 -->
<!-- 因此,我們不需要顯式地在java中聲明 -->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
<!-- 為消費者創(chuàng)建一個隊列,如果broker中存在,則使用同名存在的隊列,否則創(chuàng) 建一個新的。 -->
<!-- 如果要發(fā)送消息,得使用交換器 --> <!-- 這里使用的是默認(rèn)的交換器 -->
<rabbit:queue name="myqueue"/>
<rabbit:direct-exchange name="direct.biz.ex" auto-declare="true" auto-delete="false" durable="false">
<rabbit:bindings>
<!--exchange:其他綁定到該交換器的交換器名稱-->
<!--queue:綁定到該交換器的queue的bean名稱-->
<!--key:顯式聲明的路由key-->
<rabbit:binding queue="myqueue" key="dir.ex">
</rabbit:binding> </rabbit:bindings>
</rabbit:direct-exchange> </beans>
- Application.java
package com.lagou.rabbitmq.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;
/**
* 使用spring xml配置的方式發(fā)送接接收消息
*/
public class App {
public static void main(String[] args) {
AbstractApplicationContext context = new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
for (int i = 0; i < 1000; i++) {
// 第一個參數(shù)是路由key,第二個參數(shù)是消息
template.convertAndSend("dir.ex", "foo" + i);
}
// 主動從隊列拉取消息
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
context.close();
}
}
啟動RabbitMQ之后,直接運行即可。
基于注解的整合
-
創(chuàng)建maven項目
-
配置pom.xml,添加rabbit的spring依賴
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
</dependencies>
- 添加配置類RabbitConfiguration.java
package ai.flkj.material.server.util;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public com.rabbitmq.client.ConnectionFactory rabbitFactory() {
com.rabbitmq.client.ConnectionFactory rabbitFactory = new com.rabbitmq.client.ConnectionFactory();
rabbitFactory.setHost("node1");
rabbitFactory.setVirtualHost("/");
rabbitFactory.setUsername("root");
rabbitFactory.setPassword("123456");
rabbitFactory.setPort(5672);
return rabbitFactory;
}
@Bean
public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitFactory) {
ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitFactory);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory factory) {
AmqpAdmin amqpAdmin = new RabbitAdmin(factory);
return amqpAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
return rabbitTemplate;
}
@Bean
public Queue queue() {
Queue myqueue = new Queue("myqueue");
return myqueue;
}
}
- 主入口類App.java
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
/*** 使用spring的注解方式發(fā)送和接收消息 */
public class SpringAnnotationDemo {
public static void main(String[] args) {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
context.close();
}
}
SpringBoot整合RabbitMQ
- 添加starter依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.properties中添加連接信息
spring.application.name=springboot_rabbitmq
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
- 主入口類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitqmDemo {
public static void main(String[] args) {
SpringApplication.run(RabbitqmDemo.class, args);
}
}
- RabbitConfig類
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Configuration
public class RabbitConfig {
/**
* 聲明隊列
* @return
*/
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
/**
* 聲明交換機
* @return
*/
@Bean
public Exchange myExchange() {
// new Exchange()
// return new TopicExchange("topic.biz.ex", false, false, null);
// return new DirectExchange("direct.biz.ex", false, false, null);
// return new FanoutExchange("fanout.biz.ex", false, false, null);
// 交換器名稱,交換器類型(),是否是持久化的,是否自動刪除,交換器屬性 Map集合
// return new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, false, false, null);
return new DirectExchange("myex", false, false, null);
}
/**
* 聲明綁定
* @return
*/
@Bean
public Binding myBinding() {
// 綁定的目的地,綁定的類型:到交換器還是到隊列,交換器名稱,路由key, 綁定的屬性
// new Binding("", Binding.DestinationType.EXCHANGE, "", "", null);
// 綁定的目的地,綁定的類型:到交換器還是到隊列,交換器名稱,路由key, 綁定的屬性
// new Binding("", Binding.DestinationType.QUEUE, "", "", null);
// 綁定了交換器direct.biz.ex到隊列myqueue,路由key是 direct.biz.ex
return new Binding("myqueue", Binding.DestinationType.QUEUE, "myex", "direct.biz.ex", null);
}
}
- 使用RestController發(fā)送消息
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend("myex", "direct.biz.ex", message);
return "ok";
}
}
- 使用監(jiān)聽器,用于消費消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HelloConsumer {
@RabbitListener(queues = "myqueue")
public void service(String message) {
System.out.println("消息隊列推送來的消息:" + message);
}
}
到了這里,關(guān)于【RabbitMQ】4 Spring/SpringBoot整合RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!