目錄
一、前期項(xiàng)目環(huán)境準(zhǔn)備
1.1父項(xiàng)目以及子項(xiàng)目
1.2配置pom.xml
1.3配置application.yml
二、扇出(Fanout)?交換機(jī)實(shí)現(xiàn)消息的發(fā)送和接收
2.1編寫子項(xiàng)目consumer(消費(fèi)者,接收消息)的代碼實(shí)現(xiàn)扇出(Fanout)交換機(jī)接收消息
2.1.1consumer子項(xiàng)目結(jié)構(gòu)
2.1.2FanoutConfig類的實(shí)現(xiàn)扇出(Fanout)交換機(jī)、隊(duì)列以及交換機(jī)和隊(duì)列的綁定的創(chuàng)建
2.1.3springRabbitListener類實(shí)現(xiàn)Fanout消息的接收
2.1.4運(yùn)行consumer子項(xiàng)目
2.2編寫子項(xiàng)目publisher的代碼實(shí)現(xiàn)扇出(Fanout)交換機(jī)的發(fā)送消息
2.2.1publisher子項(xiàng)目結(jié)構(gòu)
2.2.2編寫SpringAmqpTest測(cè)試類代碼
2.2.3運(yùn)行該測(cè)試類,然后回到consumer子項(xiàng)目的兩個(gè)隊(duì)列是否都能夠接收到該消息
三、訂閱(Direct)交換機(jī)實(shí)現(xiàn)消息的發(fā)送和接收
3.1編寫子項(xiàng)目consumer(消費(fèi)者,接收消息)的代碼實(shí)現(xiàn)扇出訂閱(Direct)交換機(jī)接收消息
3.1.1在springRabbitListener類添加以下兩個(gè)Direct隊(duì)列方法
3.1.2再次重新運(yùn)行consumer子項(xiàng)目
?3.2編寫子項(xiàng)目publisher的代碼實(shí)現(xiàn)扇出(Direct)交換機(jī)的發(fā)送消息
3.2.1在SpringAmqpTest測(cè)試類繼續(xù)添加下面代碼
3.2.2運(yùn)行該測(cè)試類,然后回到consumer子項(xiàng)目的key擁有red的監(jiān)聽(tīng)隊(duì)列是否都?jí)蚪邮盏皆撓?/p>
四、主題(Topic)交換機(jī)實(shí)現(xiàn)消息的發(fā)送和接收
4.1編寫子項(xiàng)目consumer(消費(fèi)者,接收消息)的代碼實(shí)現(xiàn)扇出主題(Topic)交換機(jī)接收消息
4.1.1在springRabbitListener類添加以下兩個(gè)Topic隊(duì)列方法
4.1.2再次重新運(yùn)行consumer子項(xiàng)目
4.2編寫子項(xiàng)目publisher的代碼實(shí)現(xiàn)扇出(Topic)交換機(jī)的發(fā)送消息
4.2.1在SpringAmqpTest測(cè)試類繼續(xù)添加下面代碼
4.2.2運(yùn)行該測(cè)試類,然后回到consumer子項(xiàng)目的key擁有.news的監(jiān)聽(tīng)隊(duì)列是否都?jí)蚪邮盏皆撓?/p>
五、總結(jié)
5.1交換機(jī)的作用是什么?
5.2聲明隊(duì)列、交換機(jī)、綁定關(guān)系的Bean是什么?
5.3描述下Direct交換機(jī)與Fanout交換機(jī)的差異?
5.4基于@RabbitListener注解聲明隊(duì)列和交換機(jī)有哪些常見(jiàn)注解?
5.5描述下Direct交換機(jī)與Topic交換機(jī)的差異?
一、前期項(xiàng)目環(huán)境準(zhǔn)備
1.1父項(xiàng)目以及子項(xiàng)目
創(chuàng)建父項(xiàng)目mq-demo,以及兩個(gè)子項(xiàng)目publisher(生產(chǎn)者,發(fā)送消息)和consumer(消費(fèi)者,接收消息)
1.2配置pom.xml
在父項(xiàng)目mq-demo的pom.xml引入以下的amqp依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.3配置application.yml
在兩個(gè)子項(xiàng)目的application.yml都進(jìn)行以下的配置(設(shè)置連接參數(shù),分別是:主機(jī)名、端口號(hào)、用戶名、密碼、virtual-host):
spring:
rabbitmq:
host: 192.168.22.134
port: 5672 #發(fā)送信息的端口,不是啟動(dòng)rabbitmq管理頁(yè)面的端口(15672)
username: itcast
password: 123321
virtual-host: /
二、扇出(Fanout)?交換機(jī)實(shí)現(xiàn)消息的發(fā)送和接收
在扇出(廣播)模式下,消息發(fā)送流程是這樣的:
-
1) 可以有多個(gè)隊(duì)列
-
2) 每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
-
3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī),交換機(jī)來(lái)決定要發(fā)給哪個(gè)隊(duì)列,生產(chǎn)者無(wú)法決定
-
4) 交換機(jī)把消息發(fā)送給綁定過(guò)的所有隊(duì)列
-
5) 訂閱隊(duì)列的消費(fèi)者都能拿到消息 ?
2.1編寫子項(xiàng)目consumer(消費(fèi)者,接收消息)的代碼實(shí)現(xiàn)扇出(Fanout)交換機(jī)接收消息
2.1.1consumer子項(xiàng)目結(jié)構(gòu)
其中config包是配置包,listener包是監(jiān)聽(tīng)消息以便接收的包
2.1.2FanoutConfig類的實(shí)現(xiàn)扇出(Fanout)交換機(jī)、隊(duì)列以及交換機(jī)和隊(duì)列的綁定的創(chuàng)建
分別創(chuàng)建一個(gè)名為itcast.fanout的交換機(jī)、一個(gè)名為fanout.queue1的隊(duì)列、一個(gè)名為fanout.queue2d1隊(duì)列以及交換機(jī)和兩個(gè)隊(duì)列之間的綁定。
@Configuration
public class FanoutConfig {
// 聲明FanoutExchange交換機(jī)
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// 聲明第1個(gè)隊(duì)列
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
//綁定隊(duì)列1和交換機(jī)
@Bean
public Binding fanoutBing1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 聲明第2個(gè)隊(duì)列
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
//綁定隊(duì)列2和交換機(jī)
@Bean
public Binding fanoutBing2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.1.3springRabbitListener類實(shí)現(xiàn)Fanout消息的接收
?利用@RabbitListeneer注解的queues屬性="隊(duì)列名稱",來(lái)對(duì)隊(duì)列的消息進(jìn)行監(jiān)聽(tīng)。
@Component
public class springRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void ListenerFanoutQueue1(String msg) {
System.out.println("消費(fèi)者接收到fanout.queue1的消息【"+msg+"】");
}
@RabbitListener(queues = "fanout.queue2")
public void ListenerFanoutQueue2(String msg) {
System.out.println("消費(fèi)者接收到fanout.queue2的消息【"+msg+"】");
}
}
2.1.4運(yùn)行consumer子項(xiàng)目
在http://192.168.22.134:15672/(這里輸入自己的ip地址和自己的RabbitMQ端口號(hào))查看名為itcast.fanout交換機(jī)、名為fanout.queue1隊(duì)列和名為fanout.queue2是否創(chuàng)建成功。
2.2編寫子項(xiàng)目publisher的代碼實(shí)現(xiàn)扇出(Fanout)交換機(jī)的發(fā)送消息
2.2.1publisher子項(xiàng)目結(jié)構(gòu)
?publisher子項(xiàng)目結(jié)構(gòu)比consumer子項(xiàng)目結(jié)構(gòu)而言相對(duì)簡(jiǎn)單了,在publisher子項(xiàng)目結(jié)構(gòu)中我們只需編寫@Test類來(lái)完成消息的發(fā)送,然后看看consumer子項(xiàng)目是否能夠接收到該消息即可。
2.2.2編寫SpringAmqpTest測(cè)試類代碼
需要指定交換機(jī)的名稱和發(fā)送的消息,然后利用rabbitTemplate.convertAndSend()方法進(jìn)行消息的發(fā)送
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendFanoutExchange(){
//交換機(jī)名稱
String exchangeName ="itcast.fanout";
//消息
String message = "hello, every one!";
// 發(fā)送消息,參數(shù)分別是:交互機(jī)名稱、RoutingKey(暫時(shí)為空)、消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
}
2.2.3運(yùn)行該測(cè)試類,然后回到consumer子項(xiàng)目的兩個(gè)隊(duì)列是否都能夠接收到該消息
三、訂閱(Direct)交換機(jī)實(shí)現(xiàn)消息的發(fā)送和接收
在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。
在Direct模型下:
-
隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)
RoutingKey
(路由key) -
消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的
RoutingKey
。 -
Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
Routing Key
進(jìn)行判斷,只有隊(duì)列的Routingkey
與消息的Routing key
完全一致,才會(huì)接收到消息
3.1編寫子項(xiàng)目consumer(消費(fèi)者,接收消息)的代碼實(shí)現(xiàn)扇出訂閱(Direct)交換機(jī)接收消息
?為了編碼的方便,我們就直接在子項(xiàng)目consumer下的listener包里面的springRabbitListener類使用注解的方式進(jìn)行交換機(jī)、隊(duì)列的創(chuàng)建和消息的監(jiān)聽(tīng)了。
3.1.1在springRabbitListener類添加以下兩個(gè)Direct隊(duì)列方法
這里我們直接使用@RabbitListener注解進(jìn)行交換機(jī)、隊(duì)列的創(chuàng)建和消息的監(jiān)聽(tīng)。這里不是使用binding而是使用bindings進(jìn)行構(gòu)建,其中:
@Queue里面的name屬性是隊(duì)列的名稱
@Exchange里面的name屬性是交換機(jī)的名稱,type屬性是交換機(jī)的類型(這里是direct類型)
?key屬性是指定消息的 RoutingKey,接收擁有相同的RoutingKey隊(duì)列的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct",
type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void ListenerDirectQueue1(String msg){
System.out.println("消費(fèi)者接收到direct.queue1的消息【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",
type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void ListenerDirectQueue2(String msg){
System.out.println("消費(fèi)者接收到direct.queue2的消息【"+msg+"】");
}
3.1.2再次重新運(yùn)行consumer子項(xiàng)目
?在http://192.168.22.134:15672/(這里輸入自己的ip地址和自己的RabbitMQ端口號(hào))查看名為itcast.direct交換機(jī)、名為direct.queue1隊(duì)列和名為direct.queue2是否創(chuàng)建成功。
3.2編寫子項(xiàng)目publisher的代碼實(shí)現(xiàn)扇出(Direct)交換機(jī)的發(fā)送消息
??為了編碼的方便,我們就直接在子項(xiàng)目publisher下的SpringAmqpTest測(cè)試類進(jìn)行編碼。
3.2.1在SpringAmqpTest測(cè)試類繼續(xù)添加下面代碼
這里的?rabbitTemplate.convertAndSend()方法相對(duì)于Fanout的測(cè)試類來(lái)說(shuō),要在中間多填一個(gè)參數(shù)的名稱,該名稱就是RoutingKey(指定隊(duì)列key),向監(jiān)聽(tīng)消息隊(duì)列也是該key的發(fā)送消息,比如這里的發(fā)送消息的RoutingKey為red,那么只有在監(jiān)聽(tīng)消息隊(duì)列的key也擁有red才能接收到這個(gè)消息
@Test
public void testSendDirectExchange(){
//交換機(jī)名稱
String exchangeName ="itcast.direct";
//消息
String message = "hello, red!";
// 發(fā)送消息,參數(shù)分別是:交互機(jī)名稱、RoutingKey(指定隊(duì)列key)、消息
rabbitTemplate.convertAndSend(exchangeName,"red",message);
}
3.2.2運(yùn)行該測(cè)試類,然后回到consumer子項(xiàng)目的key擁有red的監(jiān)聽(tīng)隊(duì)列是否都?jí)蚪邮盏皆撓?/h4>
從上面的3.1.1編碼可知,我們兩個(gè)隊(duì)列的key都擁有red,所以兩個(gè)隊(duì)列都接受到了該消息
四、主題(Topic)交換機(jī)實(shí)現(xiàn)消息的發(fā)送和接收
Topic類型的Exchange與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!
Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
????????#:匹配一個(gè)或多個(gè)詞
????????*:匹配不多不少恰好1個(gè)詞
舉例:
???????? item.#:能夠匹配item.spu.insert 或者 item.spu
? ? ? ? ?item.*:只能匹配item.spu?
解釋:
????????Queue1:綁定的是china.# ,因此凡是以 china.開(kāi)頭的routing key 都會(huì)被匹配到。包括china.news和china.weather
????????Queue4:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配。包括china.news和japan.news
4.1編寫子項(xiàng)目consumer(消費(fèi)者,接收消息)的代碼實(shí)現(xiàn)扇出主題(Topic)交換機(jī)接收消息
??為了編碼的方便,我們就直接在子項(xiàng)目consumer下的listener包里面的springRabbitListener類使用注解的方式進(jìn)行交換機(jī)、隊(duì)列的創(chuàng)建和消息的監(jiān)聽(tīng)了。
4.1.1在springRabbitListener類添加以下兩個(gè)Topic隊(duì)列方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic",
type = ExchangeTypes.TOPIC),
key = "China.#"
))
public void ListenerTopicQueue1(String msg){
System.out.println("消費(fèi)者接收到topic.queue1的消息【"+msg+"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic",
type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void ListenerTopicQueue2(String msg){
System.out.println("消費(fèi)者接收到topic.queue2的消息【"+msg+"】");
}
這里我們直接使用@RabbitListener注解進(jìn)行交換機(jī)、隊(duì)列的創(chuàng)建和消息的監(jiān)聽(tīng)。這里不是使用binding而是使用bindings進(jìn)行構(gòu)建,其中:
@Queue里面的name屬性是隊(duì)列的名稱
@Exchange里面的name屬性是交換機(jī)的名稱,type屬性是交換機(jī)的類型(這里是topic類型)
?key屬性是指定消息的 RoutingKey,接收擁有相同的RoutingKey隊(duì)列的消息,
但是不同的是,這里我們可以使用#或者*的通配符進(jìn)行更方便的匹配發(fā)送和接收消息的隊(duì)列
。
4.1.2再次重新運(yùn)行consumer子項(xiàng)目
?在http://192.168.22.134:15672/(這里輸入自己的ip地址和自己的RabbitMQ端口號(hào))查看名為itcast.topic交換機(jī)、名為topic.queue1隊(duì)列和名為topic.queue2是否創(chuàng)建成功。?
4.2編寫子項(xiàng)目publisher的代碼實(shí)現(xiàn)扇出(Topic)交換機(jī)的發(fā)送消息
??為了編碼的方便,我們就直接在子項(xiàng)目publisher下的SpringAmqpTest測(cè)試類進(jìn)行編碼。
4.2.1在SpringAmqpTest測(cè)試類繼續(xù)添加下面代碼
這里的?rabbitTemplate.convertAndSend()方法相對(duì)于Fanout的測(cè)試類來(lái)說(shuō),要在中間多填一個(gè)參數(shù)的名稱,該名稱就是RoutingKey(指定隊(duì)列key),向監(jiān)聽(tīng)消息隊(duì)列也是該key的發(fā)送消息,比如這里的發(fā)送消息的RoutingKey為USA.news,那么只有在監(jiān)聽(tīng)消息隊(duì)列的key也擁有.news才能接收到這個(gè)消息
@Test
public void testSendTopicExchange(){
//交換機(jī)名稱
String exchangeName ="itcast.topic";
//消息
String message = "今天是個(gè)好日子!";
// 發(fā)送消息,參數(shù)分別是:交互機(jī)名稱、RoutingKey(通配符(*代表一個(gè),#代碼0個(gè)或多個(gè)))、消息
//rabbitTemplate.convertAndSend(exchangeName,"China.news",message);//兩個(gè)隊(duì)列都收到消息
//rabbitTemplate.convertAndSend(exchangeName,"China.weather",message);//隊(duì)列1收到消息
rabbitTemplate.convertAndSend(exchangeName,"USA.news",message);//隊(duì)列2收到消息
}
4.2.2運(yùn)行該測(cè)試類,然后回到consumer子項(xiàng)目的key擁有.news的監(jiān)聽(tīng)隊(duì)列是否都?jí)蚪邮盏皆撓?/h4>
從上面的3.1.1編碼可知,只有隊(duì)列2是#.news,所以只有對(duì)了2接受到了該消息
五、總結(jié)
5.1交換機(jī)的作用是什么?
???????接收publisher發(fā)送的消息
???????將消息按照規(guī)則路由到與之綁定的隊(duì)列
???????不能緩存消息,路由失敗,消息丟失
???????FanoutExchange的會(huì)將消息路由到每個(gè)綁定的隊(duì)列
5.2聲明隊(duì)列、交換機(jī)、綁定關(guān)系的Bean是什么?
????????Queue、FanoutExchange、Binding?
5.3描述下Direct交換機(jī)與Fanout交換機(jī)的差異?
????????Fanout交換機(jī)將消息路由給每一個(gè)與之綁定的隊(duì)列
????????Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個(gè)隊(duì)列
????????如果多個(gè)隊(duì)列具有相同的RoutingKey,則與Fanout功能類似
5.4基于@RabbitListener注解聲明隊(duì)列和交換機(jī)有哪些常見(jiàn)注解?
????????@Queue和@Exchange
5.5描述下Direct交換機(jī)與Topic交換機(jī)的差異?
????????Topic交換機(jī)接收的消息RoutingKey必須是多個(gè)單詞,以.分割
????????Topic交換機(jī)與隊(duì)列綁定時(shí)的bindingKey可以指定通配符
? ?#
:代表0個(gè)或多個(gè)詞
? ?*
:代表1個(gè)詞文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-754307.html
今天的知識(shí)就分享到這了,希望能夠幫助到你~?文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-754307.html
到了這里,關(guān)于利用消息中間件RabbitMQ創(chuàng)建隊(duì)列以及扇出(Fanout)、訂閱(Direct)、主題(Topic)交換機(jī)來(lái)完成消息的發(fā)送和監(jiān)聽(tīng)接收(完整版)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!