国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring RabbitMQ那些事(1-交換機(jī)配置&消息發(fā)送訂閱實(shí)操)

這篇具有很好參考價(jià)值的文章主要介紹了Spring RabbitMQ那些事(1-交換機(jī)配置&消息發(fā)送訂閱實(shí)操)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、序言

在上一節(jié) RabbitMQ中的核心概念和交換機(jī)類型 中我們介紹了RabbitMQ中的一些核心概念,尤其是各種交換機(jī)的類型,接下來我們將具體講解各種交換機(jī)的配置和消息訂閱實(shí)操。


二、配置文件application.yml

我們先上應(yīng)用啟動配置文件application.yml,如下:

server:
  port: 8080
spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto
        concurrency: 5
        max-concurrency: 20
        prefetch: 5

備注:這里我們指定了RabbitListenerContainerFactory的類型為SimpleRabbitListenerContainerFactory,并且指定消息確認(rèn)模式為自動確認(rèn)

三、RabbitMQ交換機(jī)和隊(duì)列配置

Spring官方提供了一套 流式API 來定義隊(duì)列交換機(jī)綁定關(guān)系,非常的方便,接下來我們定義4種類型的交換機(jī)和相應(yīng)隊(duì)列的綁定關(guān)系。

1、定義4個(gè)隊(duì)列

/**
 * 定義4個(gè)隊(duì)列
 */
@Configuration
protected static class QueueConfig {

	@Bean
	public Queue queue1() {
		return QueueBuilder.durable("queue-1").build();
	}

	@Bean
	public Queue queue2() {
		return QueueBuilder.durable("queue-2").build();
	}

	@Bean
	public Queue queue3() {
		return QueueBuilder.durable("queue-3").build();
	}

	@Bean
	public Queue queue4() {
		return QueueBuilder.durable("queue-4").build();
	}
}

2、定義Fanout交換機(jī)和隊(duì)列綁定關(guān)系

/**
 * 定義Fanout交換機(jī)和對應(yīng)的綁定關(guān)系
 */
@Configuration
protected static class FanoutExchangeBindingConfig {

	@Bean
	public FanoutExchange fanoutExchange() {
		return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
	}

	/**
	 * 定義多個(gè)Fanout交換機(jī)和隊(duì)列的綁定關(guān)系
	 * @param fanoutExchange
	 * @param queue1
	 * @param queue2
	 * @param queue3
	 * @param queue4
	 * @return
	 */
	@Bean
	public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
		Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
		Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
		Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
		return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
	}

}

備注:這里我們將4個(gè)隊(duì)列綁定到了名為fanout-exchange的交換機(jī)上。

2、定義Direct交換機(jī)和隊(duì)列綁定關(guān)系

@Configuration
protected static class DirectExchangeBindingConfig {

	@Bean
	public DirectExchange directExchange() {
		return ExchangeBuilder.directExchange("direct-exchange").build();
	}

	@Bean
	public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
		return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
	}
}

備注:這里我們定義了名為direct-exchange的交換機(jī)并通過路由keyqueue3-route-keyqueue-3綁定到了該交換機(jī)上。


3、定義Topic交換機(jī)和隊(duì)列綁定關(guān)系

@Configuration
protected static class TopicExchangeBindingConfig {

	@Bean
	public TopicExchange topicExchange() {
		return ExchangeBuilder.topicExchange("topic-exchange").build();
	}

	@Bean
	public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
		Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
		return new Declarables(queue1Binding, queue2Binding);
	}
}

這里我們定義了名為topic-exchange類型的交換機(jī),該類型交換機(jī)支持路由key通配符匹配,*代表一個(gè)任意字符,#代表一個(gè)或多個(gè)任意字符。

備注:

  1. 通過路由keycom.order.*queue-1綁定到了該交換機(jī)上。
  2. 通過路由key com.#queue-2也綁定到了該交換機(jī)上。

4、定義Header交換機(jī)和隊(duì)列綁定關(guān)系

@Configuration
protected static class HeaderExchangeBinding {

	@Bean
	public HeadersExchange headersExchange() {
		return ExchangeBuilder.headersExchange("headers-exchange").build();
	}

	@Bean
	public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
		return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
	}
}

備注:這里我們定義了名為headers-exchange類型的交換機(jī),并通過參數(shù)function=loggingqueue-4綁定到了該交換機(jī)上。


四、RabbitMQ消費(fèi)者配置

Spring RabbitMQ中支持注解式監(jiān)聽端點(diǎn)配置,用于異步接收消息,如下:

@Slf4j
@Component
public class RabbitMqConsumer {

	@RabbitListener(queues = "queue-1")
	public void handleMsgFromQueue1(String msg) {
		log.info("Message received from queue-1, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-2")
	public void handleMsgFromQueue2(String msg) {
		log.info("Message received from queue-2, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-3")
	public void handleMsgFromQueue3(String msg) {
		log.info("Message received from queue-3, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-4")
	public void handleMsgFromQueue4(String msg) {
		log.info("Message received from queue-4, message body: {}", msg);
	}
}

備注:這里我們分別定義了4個(gè)消費(fèi)者,分別用來接受4個(gè)隊(duì)列的消息。

五、RabbitMQ生產(chǎn)者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {

	private final RabbitTemplate rabbitTemplate;

	public void sendMsgToFanoutExchange(String body) {
		log.info("開始發(fā)送消息到fanout-exchange, 消息體:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
	}

	public void sendMsgToDirectExchange(String body) {
		log.info("開始發(fā)送消息到direct-exchange, 消息體:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
	}

	public void sendMsgToTopicExchange(String routingKey, String body) {
		log.info("開始發(fā)送消息到topic-exchange, 消息體:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("topic-exchange", routingKey, message);
	}

	public void sendMsgToHeadersExchange(String body) {
		log.info("開始發(fā)送消息到headers-exchange, 消息體:{}", body);

		MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
		rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
	}

}


六、測試用例

這里寫了個(gè)簡單的Controller用來測試,如下:

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {

	private final RabbitMqProducer rabbitMqProducer;

	@RequestMapping("/exchange/fanout")
	public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
		rabbitMqProducer.sendMsgToFanoutExchange(body);
		return ResponseEntity.ok("廣播消息發(fā)送成功");
	}

	@RequestMapping("/exchange/direct")
	public ResponseEntity<String> sendMsgToDirectExchange(String body) {
		rabbitMqProducer.sendMsgToDirectExchange(body);
		return ResponseEntity.ok("消息發(fā)送到Direct交換成功");
	}

	@RequestMapping("/exchange/topic")
	public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
		rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
		return ResponseEntity.ok("消息發(fā)送到Topic交換機(jī)成功");
	}

	@RequestMapping("/exchange/headers")
	public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
		rabbitMqProducer.sendMsgToHeadersExchange(body);
		return ResponseEntity.ok("消息發(fā)送到Headers交換機(jī)成功");
	}

}

1、發(fā)送到FanoutExchage

直接訪問http://localhost:8080/exchange/fanout?body=hello,可以看到該消息廣播到了4個(gè)隊(duì)列上。

2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 開始發(fā)送消息到fanout-exchange, 消息體:hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

2、發(fā)送到DirectExchage

訪問http://localhost:8080/exchange/direct?body=hello,可以看到消息通過路由keyqueue3-route-key發(fā)送到了queue-3上。

2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 開始發(fā)送消息到direct-exchange, 消息體:hello
2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello

3、發(fā)送到TopicExchange

訪問http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key為 com.order.create的消息分別發(fā)送到了queue-1queue-2上。

2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 開始發(fā)送消息到topic-exchange, 消息體:hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

4、發(fā)動到HeadersExchage

訪問http://localhost:8080/exchange/headers?body=hello,消息通過頭部信息function=logging發(fā)送到了headers-exchange上。

2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 開始發(fā)送消息到headers-exchange, 消息體:hello
2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello

七、結(jié)語

下一節(jié)我們將會介紹通過兩種方式借由RabbitMQ實(shí)現(xiàn)延遲消息發(fā)送和訂閱,敬請期待。
rabbitmq生產(chǎn)者發(fā)送配置,中間件專題,RabbitMQ,java-rabbitmq,spring,rabbitmq文章來源地址http://www.zghlxwxcb.cn/news/detail-797735.html

到了這里,關(guān)于Spring RabbitMQ那些事(1-交換機(jī)配置&消息發(fā)送訂閱實(shí)操)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包