?? @ 作者: Eric
?? @ 主頁(yè): https://blog.csdn.net/weixin_47316183?type=blog
?? @ 主題:SpringBoot實(shí)戰(zhàn)項(xiàng)目整合RabbitMQ+ElaticSearch實(shí)現(xiàn)SKU上下架功能
?? @ 創(chuàng)作時(shí)間: 2023年07月03日
前言
最終實(shí)現(xiàn)效果:針對(duì)SKU的上下架
上架效果:
1、后臺(tái)選擇SKU
,點(diǎn)擊上架,該SKU
修改為上架
狀態(tài)
2、同時(shí)向MQ
發(fā)送消息
3、服務(wù)監(jiān)聽(tīng)收到消息后向Es
中新增該SKU
基本信息
下架效果:
1、后臺(tái)選擇SKU
,點(diǎn)擊下架,該SKU
修改為下架
狀態(tài)
2、同時(shí)向MQ
發(fā)送消息
3、服務(wù)監(jiān)聽(tīng)收到消息后向Es
中刪除該SKU
基本信息
那為什么一個(gè)上架的功能要使用這么多知識(shí)點(diǎn)呢,最簡(jiǎn)單的方式不就是修改SKU的狀態(tài),然后用戶端只顯示已上架的SKU就可以么。
原因有兩點(diǎn):
1.提高效率
2.還是效率
我們知道,在用戶端搜索SKU信息,最簡(jiǎn)單的方式就是通過(guò)sku關(guān)鍵詞直接模糊查詢數(shù)據(jù)庫(kù)中的sku信息,但是每次查詢數(shù)據(jù)庫(kù)都會(huì)發(fā)生IO的碰撞,顯然不是最優(yōu)的解決方法,而ElasticSearch就是為了檢索而生,所以我們上架后自動(dòng)將SKU數(shù)據(jù)存放到ElasticSearch中,然后用戶端直接檢索es中的數(shù)據(jù)即可,這樣檢索速度就哐哐哐的提上去了。
那為什么還要用到消息隊(duì)列MQ呢,這是因?yàn)镽abbitMQ是異步發(fā)送,當(dāng)我們一個(gè)人操作sku可能沒(méi)什么,那如果我們的系統(tǒng)有100萬(wàn)個(gè)商戶,并且這100萬(wàn)個(gè)商戶同時(shí)操作上下架呢?那么想想就是很恐怖的,當(dāng)然,真到了這個(gè)量級(jí)肯定還需要再次調(diào)優(yōu)。這個(gè)時(shí)候異步的重要的不言而喻。
主要使用技術(shù)點(diǎn):
- JDK(版本為:1.8)
- SpringBoot(版本為:2.3.6.RELEASE)
- SpringData(用來(lái)操作ES)
- RabbitMQ(版本為:3.8)
- ElaticSearch(版本為:7.8)
- Spring Cloud(版本為:Hoxton.SR8)
- Nacos(版本為:2.2.3)
1、前置條件
- Nacos的運(yùn)行
- ElasticSearch的運(yùn)行(我這里使用的是IK分詞器和Kibana工具)
- RabbitMQ的運(yùn)行
- 項(xiàng)目使用的是SpringBoot
關(guān)于如何安裝和運(yùn)行這些軟件大家可以去百度搜索下,很簡(jiǎn)單的。
2、搭建service-search模塊
1、創(chuàng)建一個(gè)模塊專門用來(lái)操作ES,根據(jù)自己項(xiàng)目接口來(lái)放,我這里放在service業(yè)務(wù)模塊下
2、引入ES依賴
<dependencies>
<!-- ES依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- 引入遠(yuǎn)程調(diào)用模塊 - 商品模塊 -->
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>service-product-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
3、es模塊的配置文件
server:
port: 8204
feign:
sentinel:
enabled: true
client:
config:
default: #配置全局的feign的調(diào)用超時(shí)時(shí)間 如果 有指定的服務(wù)配置 默認(rèn)的配置不會(huì)生效
connectTimeout: 30000 # 指定的是 消費(fèi)者 連接服務(wù)提供者的連接超時(shí)時(shí)間 是否能連接 單位是毫秒
readTimeout: 50000 # 指定的是調(diào)用服務(wù)提供者的 服務(wù) 的超時(shí)時(shí)間() 單位是毫秒
spring:
main:
allow-bean-definition-overriding: true #當(dāng)遇到同樣名字的時(shí)候,是否允許覆蓋注冊(cè)
elasticsearch: # ElaticSearch
rest:
uris: http://localhost:9200
rabbitmq:
host: 192.168.64.109
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED
publisher-returns: true
listener:
simple:
prefetch: 1
concurrency: 3
acknowledge-mode: manual
redis:
host: localhost
port: 6379
database: 0
timeout: 1800000
password:
lettuce:
pool:
max-active: 20 #最大連接數(shù)
max-wait: -1 #最大阻塞等待時(shí)間(負(fù)數(shù)表示沒(méi)限制)
max-idle: 5 #最大空閑
min-idle: 0 #最小空閑
4、記得在ES啟動(dòng)類上加上服務(wù)注冊(cè)和遠(yuǎn)程調(diào)用注解
這里需要引入兩個(gè)依賴,一個(gè)是nacos服務(wù)注冊(cè)
,一個(gè)是服務(wù)調(diào)用openfeign
,因?yàn)檫@兩個(gè)依賴我是放在了es模塊的父工程中,所以es模塊我是不需要引入的,大家根據(jù)自己項(xiàng)目結(jié)構(gòu)來(lái)即可
@EnableDiscoveryClient //服務(wù)注冊(cè)
@EnableFeignClients //服務(wù)調(diào)用
3、開(kāi)發(fā)功能接口
3.1 添加遠(yuǎn)程調(diào)用方法
說(shuō)明:因?yàn)槲疫@里的設(shè)計(jì)是在商品上架的時(shí)候,向MQ發(fā)送的是SKU主鍵ID,并不是該sku的所有基本信息,所以我需要額外寫(xiě)接口來(lái)根據(jù)sku主鍵id查詢基本信息,這里大家根據(jù)自身情況來(lái)寫(xiě)即可。
找到我們的商品模塊,先創(chuàng)建一個(gè)api包,然后將提供給遠(yuǎn)程調(diào)用的接口放在api包下
/**
* 遠(yuǎn)程調(diào)用API(生產(chǎn)者)
* @author Eric
* @date 2023-06-29 10:00
*/
@RestController
@RequestMapping("/api/product")
public class ProductInnnerController {
@Autowired
private CategoryService categoryService;
@Autowired
private SkuInfoService skuInfoService;
@ApiOperation(value = "根據(jù)分類id獲取分類信息")
@GetMapping("/inner/getCategory/{categoryId}")
public Category getCategory(@PathVariable Long categoryId) {
return categoryService.getById(categoryId);
}
@ApiOperation(value = "根據(jù)skuId獲取sku信息")
@GetMapping("/ inner/getSkuInfo/{skuId}")
public SkuInfo getSkuInfo(@PathVariable("skuId") Long skuId) {
return skuInfoService.getById(skuId);
}
}
3.2、創(chuàng)建遠(yuǎn)程調(diào)用模塊
1、創(chuàng)建service-client模塊(我這里的設(shè)定是所有遠(yuǎn)程調(diào)用的服務(wù)都放在該模塊中,但該模塊并不負(fù)責(zé)調(diào)用,而是由各自對(duì)應(yīng)的子模塊服務(wù)來(lái)進(jìn)行調(diào)用)
2、service-client模塊下再創(chuàng)建子模塊 service-product-client定義接口
我的結(jié)構(gòu)如下:
3、service-client模塊引入依賴:(主要還是openfeign的依賴)
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>common-util</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>model</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<!-- 服務(wù)調(diào)用feign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
4、ProductFeignClient添加定義方法
/**
* 遠(yuǎn)程調(diào)用其他模塊中的api
* @author Eric
* @date 2023-06-29 10:04
*/
@FeignClient(value = "service-product") //指定調(diào)用模塊
public interface ProductFeignClient {
//根據(jù)分類id獲取分類信息
@GetMapping("/api/product/inner/getCategory/{categoryId}")
public Category getCategory(@PathVariable("categoryId") Long categoryId);
//根據(jù)skuId獲取sku信息
@GetMapping("/api/product/inner/getSkuInfo/{skuId}")
public SkuInfo getSkuInfo(@PathVariable("skuId") Long skuId);
}
3.3、開(kāi)發(fā)service-search 模塊接口
1、controller
import com.atguigu.ssyx.common.result.Result;
import com.atguigu.ssyx.search.service.SkuService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 商品搜索列表接口
*
* @author Eric
* @date 2023-06-29 10:15
*/
@RestController
@RequestMapping("api/search/sku")
public class SkuApiController {
@Autowired
private SkuService skuService;
@ApiOperation(value = "上架商品")
@GetMapping("inner/upperSku/{skuId}")
public Result upperGoods(@PathVariable("skuId") Long skuId) {
skuService.upperSku(skuId);
return Result.ok();
}
@ApiOperation(value = "下架商品")
@GetMapping("inner/lowerSku/{skuId}")
public Result lowerGoods(@PathVariable("skuId") Long skuId) {
skuService.lowerSku(skuId);
return Result.ok();
}
}
2、service接口
/**
* @author Eric
* @date 2023-06-29 10:16
*/
public interface SkuService {
/**
* 上架商品列表
* @param skuId
*/
void upperSku(Long skuId);
/**
* 下架商品列表
* @param skuId
*/
void lowerSku(Long skuId);
}
3、impl
import com.alibaba.fastjson.JSON;
import com.atguigu.ssyx.enums.SkuType;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.model.search.SkuEs;
import com.atguigu.ssyx.product.client.ProductFeignClient;
import com.atguigu.ssyx.search.repository.SkuRepository;
import com.atguigu.ssyx.search.service.SkuService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author Eric
* @date 2023-06-29 10:16
*/
@Slf4j
@Service
public class SkuServiceImpl implements SkuService {
@Autowired
private ProductFeignClient productFeignClient;
@Autowired
private SkuRepository skuEsRepository;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 上架商品列表
*
* @param skuId
*/
@Override
public void upperSku(Long skuId) {
log.info("upperSku:" + skuId);
SkuEs skuEs = new SkuEs();
//查詢sku信息
SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
if (null == skuInfo) return;
// 查詢分類
Category category = productFeignClient.getCategory(skuInfo.getCategoryId());
if (category != null) {
skuEs.setCategoryId(category.getId());
skuEs.setCategoryName(category.getName());
}
skuEs.setId(skuInfo.getId());
skuEs.setKeyword(skuInfo.getSkuName() + "," + skuEs.getCategoryName());
skuEs.setWareId(skuInfo.getWareId());
skuEs.setIsNewPerson(skuInfo.getIsNewPerson());
skuEs.setImgUrl(skuInfo.getImgUrl());
skuEs.setTitle(skuInfo.getSkuName());
if (skuInfo.getSkuType() == SkuType.COMMON.getCode()) {
skuEs.setSkuType(0);
skuEs.setPrice(skuInfo.getPrice().doubleValue());
skuEs.setStock(skuInfo.getStock());
skuEs.setSale(skuInfo.getSale());
skuEs.setPerLimit(skuInfo.getPerLimit());
} else {
//TODO 待完善-秒殺商品
}
SkuEs save = skuEsRepository.save(skuEs);//往Es中新增數(shù)據(jù)
log.info("upperSku:" + JSON.toJSONString(save));
}
/**
* 下架商品列表
*
* @param skuId
*/
@Override
public void lowerSku(Long skuId) {
this.skuEsRepository.deleteById(skuId);//刪除Es中的數(shù)據(jù)
}
}
4、創(chuàng)建SkuRepository(用來(lái)操作Es,這里使用的是SpringData技術(shù))
/**
* @author Eric
* @date 2023-06-29 10:19
*/
// 參數(shù)一:泛型 參數(shù)二:類型,是由泛型中的主鍵類型而決定的
public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}
4、RabbitMQ
1、創(chuàng)建mq模塊
因?yàn)閙q的作用類似于工具類,并且多處需要使用,所以我這里選擇放在common模塊下
2、引入MQ依賴
<dependencies>
<!--rabbitmq消息隊(duì)列-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
</dependencies>
3、添加service方法
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author Eric
* @date 2023-06-30 22:57
*/
@Service
public class RabbitService {
// 引入操作rabbitmq 的模板
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息
*
* @param exchange 交換機(jī)
* @param routingKey 路由鍵(路由key)
* @param message 消息
* @return
*/
public boolean sendMessage(String exchange, String routingKey, Object message) {
// 調(diào)用發(fā)送數(shù)據(jù)的方法
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
/**
* 發(fā)送延遲消息的方法
*
* @param exchange 交換機(jī)
* @param routingKey 路由鍵
* @param message 消息內(nèi)容
* @param delayTime 延遲時(shí)間
* @return
*/
public boolean sendDelayMessage(String exchange, String routingKey, Object message, int delayTime) {
// 在發(fā)送消息的時(shí)候設(shè)置延遲時(shí)間
rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設(shè)置一個(gè)延遲時(shí)間
message.getMessageProperties().setDelay(delayTime * 1000);
return message;
}
});
return true;
}
}
4、配置mq消息轉(zhuǎn)換器
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* mq消息轉(zhuǎn)換器(默認(rèn)是字符串轉(zhuǎn)換器)
*/
@Configuration
public class MQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
5、添加消息的確認(rèn)配置(我這里配置的是手動(dòng)確認(rèn)模式)
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {
// 我們發(fā)送消息使用的是 private RabbitTemplate rabbitTemplate; 對(duì)象
// 如果不做設(shè)置的話 當(dāng)前的rabbitTemplate 與當(dāng)前的配置類沒(méi)有任何關(guān)系!
@Autowired
private RabbitTemplate rabbitTemplate;
// 設(shè)置 表示修飾一個(gè)非靜態(tài)的void方法,在服務(wù)器加載Servlet的時(shí)候運(yùn)行。并且只執(zhí)行一次!
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
/**
* 表示消息是否正確發(fā)送到了交換機(jī)上
* @param correlationData 消息的載體
* @param ack 判斷是否發(fā)送到交換機(jī)上
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息發(fā)送成功!");
}else {
System.out.println("消息發(fā)送失?。?+cause);
}
}
/**
* 消息如果沒(méi)有正確發(fā)送到隊(duì)列中,則會(huì)走這個(gè)方法!如果消息被正常處理,則這個(gè)方法不會(huì)走!
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主體: " + new String(message.getBody()));
System.out.println("應(yīng)答碼: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("消息使用的交換器 exchange : " + exchange);
System.out.println("消息使用的路由鍵 routing : " + routingKey);
}
}
6、RabbitMQ常量類
/**
* 消息隊(duì)列常量類
*/
public class MqConst {
/**
* 消息補(bǔ)償
*/
public static final String MQ_KEY_PREFIX = "ssyx.mq:list";
public static final int RETRY_COUNT = 3;
/**
* 商品上下架
*/
public static final String EXCHANGE_GOODS_DIRECT = "ssyx.goods.direct";
public static final String ROUTING_GOODS_UPPER = "ssyx.goods.upper";
public static final String ROUTING_GOODS_LOWER = "ssyx.goods.lower";
//隊(duì)列
public static final String QUEUE_GOODS_UPPER = "ssyx.goods.upper";
public static final String QUEUE_GOODS_LOWER = "ssyx.goods.lower";
/**
* 團(tuán)長(zhǎng)上下線
*/
public static final String EXCHANGE_LEADER_DIRECT = "ssyx.leader.direct";
public static final String ROUTING_LEADER_UPPER = "ssyx.leader.upper";
public static final String ROUTING_LEADER_LOWER = "ssyx.leader.lower";
//隊(duì)列
public static final String QUEUE_LEADER_UPPER = "ssyx.leader.upper";
public static final String QUEUE_LEADER_LOWER = "ssyx.leader.lower";
//訂單
public static final String EXCHANGE_ORDER_DIRECT = "ssyx.order.direct";
public static final String ROUTING_ROLLBACK_STOCK = "ssyx.rollback.stock";
public static final String ROUTING_MINUS_STOCK = "ssyx.minus.stock";
public static final String ROUTING_DELETE_CART = "ssyx.delete.cart";
//解鎖普通商品庫(kù)存
public static final String QUEUE_ROLLBACK_STOCK = "ssyx.rollback.stock";
public static final String QUEUE_SECKILL_ROLLBACK_STOCK = "ssyx.seckill.rollback.stock";
public static final String QUEUE_MINUS_STOCK = "ssyx.minus.stock";
public static final String QUEUE_DELETE_CART = "ssyx.delete.cart";
//支付
public static final String EXCHANGE_PAY_DIRECT = "ssyx.pay.direct";
public static final String ROUTING_PAY_SUCCESS = "ssyx.pay.success";
public static final String QUEUE_ORDER_PAY = "ssyx.order.pay";
public static final String QUEUE_LEADER_BILL = "ssyx.leader.bill";
//取消訂單
public static final String EXCHANGE_CANCEL_ORDER_DIRECT = "ssyx.cancel.order.direct";
public static final String ROUTING_CANCEL_ORDER = "ssyx.cancel.order";
//延遲取消訂單隊(duì)列
public static final String QUEUE_CANCEL_ORDER = "ssyx.cancel.order";
/**
* 定時(shí)任務(wù)
*/
public static final String EXCHANGE_DIRECT_TASK = "ssyx.exchange.direct.task";
public static final String ROUTING_TASK_23 = "ssyx.task.23";
//隊(duì)列
public static final String QUEUE_TASK_23 = "ssyx.queue.task.23";
}
5、完善SKU管理商品上下架
5.1、商品服務(wù)
1、找到自己的商品服務(wù),引入rabbit-util模塊
<!-- 引入MQ服務(wù) -->
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>rabbit_util</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2、同時(shí)在配置文件添加MQ配置
rabbitmq:
host: 192.168.56.101
port: 5672
username: guest
password: guest
publisher-confirm-type: CORRELATED #發(fā)布確認(rèn)模式,消息是否被成功發(fā)送到交換機(jī)
publisher-returns: true
listener:
simple:
prefetch: 1
concurrency: 3
acknowledge-mode: manual #消費(fèi)端手動(dòng)確認(rèn)
3、修改SkuInfoServiceImpl的publish方法(上下架方法)
@Autowired
private RabbitService rabbitService;
@Transactional(rollbackFor = {Exception.class})
@Override
public void publish(Long skuId, Integer status) {
// 更改發(fā)布狀態(tài)
if(status == 1) {
SkuInfo skuInfoUp = new SkuInfo();
skuInfoUp.setId(skuId);
skuInfoUp.setPublishStatus(1);
skuInfoMapper.updateById(skuInfoUp);
//商品上架:發(fā)送mq消息同步es
rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT, MqConst.ROUTING_GOODS_UPPER, skuId);
} else {
SkuInfo skuInfoUp = new SkuInfo();
skuInfoUp.setId(skuId);
skuInfoUp.setPublishStatus(0);
skuInfoMapper.updateById(skuInfoUp);
//商品下架:發(fā)送mq消息同步es
rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT, MqConst.ROUTING_GOODS_LOWER, skuId);
}
}
5.2、es服務(wù)
在service-search服務(wù)也引入消息依賴
<!-- 引入mq消息服務(wù) -->
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>rabbit_util</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2、添加SkuReceiver接收MQ消息方法
import com.atguigu.ssyx.rabbit.constant.MqConst;
import com.atguigu.ssyx.search.service.SkuService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class SkuReceiver {
@Autowired
private SkuService skuService;
/**
* 商品上架
* RabbitListener 自動(dòng)監(jiān)聽(tīng)
*
* @param skuId
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT),
key = {MqConst.ROUTING_GOODS_UPPER}
))
public void upperSku(Long skuId, Message message, Channel channel) throws IOException {
System.out.println("上架:消息消費(fèi)成功!");
if (null != skuId) {
skuService.upperSku(skuId);
}
/**
* 第一個(gè)參數(shù):表示收到的消息的標(biāo)號(hào)
* 第二個(gè)參數(shù):如果為true表示可以簽收多個(gè)消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 商品下架
*
* @param skuId
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT),
key = {MqConst.ROUTING_GOODS_LOWER}
))
public void lowerSku(Long skuId, Message message, Channel channel) throws IOException {
System.out.println("下架:消息消費(fèi)成功!");
if (null != skuId) {
skuService.lowerSku(skuId);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
6、最終測(cè)試
測(cè)試:需要啟動(dòng)如下服務(wù):
1、Nginx
2、Naocs
3、ElaticSearch
4、kibana
5、RabbitMQ
6、相關(guān)微服務(wù)模塊
7、前端服務(wù)
我們先使用Kibana查詢Es中的數(shù)據(jù)
GET /_cat/indices?v
POST /skues/_search
{
"query": {
"match_all": {}
}
}
發(fā)現(xiàn)此時(shí)為空
此時(shí)我們?nèi)我膺x擇一個(gè)SKU點(diǎn)擊上架,上架成功,此時(shí)我們?nèi)サ組Q中查看消息,發(fā)現(xiàn)消息發(fā)送成功
注意,我這里能看到消息是因?yàn)槲以谙M(fèi)前斷點(diǎn)了,如果沒(méi)有斷點(diǎn)的話消息是會(huì)被立刻消費(fèi)掉的,
此刻,我們?cè)偃s中查看,發(fā)現(xiàn)sku數(shù)據(jù)成功新增
此時(shí),我們?cè)冱c(diǎn)擊下架。
再重新查詢,發(fā)現(xiàn)es中的sku數(shù)據(jù)成功刪除!文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-665929.html
總結(jié)
寫(xiě)完我們會(huì)發(fā)現(xiàn),整個(gè)過(guò)程其實(shí)并不難,總的調(diào)用過(guò)程大概是:后臺(tái)服務(wù)選擇SKU,點(diǎn)擊上架 -》此時(shí)調(diào)用商品服務(wù) -》然后商品服務(wù)發(fā)送消息給RabbitMQ -》而我們的ES服務(wù)在監(jiān)聽(tīng)著 -》收到消息后 立刻進(jìn)行消費(fèi),同時(shí)操作ES
整個(gè)過(guò)程如果用一張圖來(lái)解釋的話,大概如下:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-665929.html
到了這里,關(guān)于SpringBoot實(shí)戰(zhàn)項(xiàng)目整合RabbitMQ+ElaticSearch實(shí)現(xiàn)SKU上下架功能的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!