?? 1 參考文檔
RabbitMQ實現(xiàn)數(shù)據(jù)庫與ElasticSearch的數(shù)據(jù)同步 | Hannya。-CSDN
企業(yè)級開發(fā)項目實戰(zhàn)——基于RabbitMQ實現(xiàn)數(shù)據(jù)庫、elasticsearch的數(shù)據(jù)同步 | 波總說先賺它一個小目標-CSDN
SPringBoot集成RabbitMQ實現(xiàn)30秒過期刪除功能 | 軍大君-CSDN
?? 2 個人需求
-
當進行文件上傳、文件創(chuàng)建、文件重命名等操作時:
通過RabbitMQ:
- 生產(chǎn)者:文件服務,執(zhí)行上傳、創(chuàng)建、重命名等文件操作,將用戶文件信息(例如文件名、文件ID等)發(fā)送到RabbitMQ新增隊列。
- 消費者:查詢服務,監(jiān)聽RabbitMQ新增隊列,一旦收到消息,將用戶文件信息新增或更新到Elasticsearch中。
-
文件刪除時:
通過RabbitMQ:
- 生產(chǎn)者:文件服務,執(zhí)行文件刪除操作,將用戶文件ID發(fā)送到RabbitMQ刪除隊列。
- 消費者:查詢服務,監(jiān)聽 RabbitMQ 隊列,一旦收到消息,通過用戶文件ID從Elasticsearch中刪除相應的用戶文件信息。
-
根據(jù)文件名進行文件模糊查詢:
通過OpenFeign:
- 生產(chǎn)者:文件服務,查詢服務調(diào)用文件服務提供的OpenFeign接口,通過用戶文件ID從查詢該用戶文件是否存在。
- 消費者:查詢服務,如果不存在,將數(shù)據(jù)根據(jù)用戶文件ID從Elasticsearch中刪除。
-
分享文件時間到期處理:
通過RabbitMQ的TTL(生存時間) + 死信隊列:
- 生產(chǎn)者:文件服務, 使用TTL模擬一個“延時隊列”,在文件分享時間到期后,將消息傳遞到死信隊列。
- 消費者:文件服務,死信監(jiān)聽器監(jiān)聽到之后,將分享文件的分享狀態(tài)改為已過期狀態(tài)。
??3 聲明
只是提供思路,代碼不是很完整,直接復制運行不了。
最后面有完整網(wǎng)盤項目代碼。文章來源:http://www.zghlxwxcb.cn/news/detail-707159.html
??4 OpenFeign相關部分(查詢服務)
4.1 引入依賴
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- openfeign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- loadbalancer -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
4.2 application.yml
spring:
# nacos注冊的服務名
application:
name: netdisk-search
cloud:
nacos:
discovery:
# 配置注冊服務的IP地址
server-addr: (IP地址):8848
username: nacos
password: nacos
4.3 FileFeignService 接口
@FeignClient(name = "netdisk-file", configuration = FeignInterceptor.class)
public interface FileFeignService {
@RequestMapping("/file/getUserFile/{userFileId}")
ResultResponse<Boolean> getUserFile(@PathVariable Long userFileId);
}
4.4 @EnableFeignClients 注解
@ComponentScan(value = "com.cauli.search.*")
@EnableFeignClients(basePackages = "com.cauli.search")
@SpringBootApplication
public class NetdiskSearchApplication {
public static void main(String[] args) {
SpringApplication.run(NetdiskSearchApplication.class, args);
}
}
??5 Elasticsearch相關部分(查詢服務)
5.1 引入依賴
<!-- elasticsearch -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.0.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
5.2 application.yml
# elasticsearch相關的配置
elasticsearch:
# ES網(wǎng)關地址
hostname: (IP地址)
# ES網(wǎng)關端口
port: 9200
# ES網(wǎng)官方方案
scheme: http
5.3 ElasticSearchConfig 配置類
@Configuration
public class ElasticSearchConfig {
@Value("${elasticsearch.hostname}")
String hostname;
@Value("${elasticsearch.port}")
int port;
@Value("${elasticsearch.scheme}")
String scheme;
@Bean
public ElasticsearchClient elasticsearchClient(){
// 創(chuàng)建低級客戶端
RestClient client = RestClient.builder(new HttpHost(hostname, port,scheme)).build();
// 創(chuàng)建API客戶端,使用Jackson映射器創(chuàng)建傳輸層
ElasticsearchTransport transport = new RestClientTransport(client,new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}
5.4 Elasticsearch 服務類和服務實現(xiàn)類
public interface ElasticsearchService {
/**
* 更新ES數(shù)據(jù)
*
* @param fileSearchDTO
*/
void uploadES(FileSearchDTO fileSearchDTO);
/**
* 刪除ES數(shù)據(jù)
*
* @param userFileId
*/
void deleteES(Long userFileId);
/**
* 搜索ES數(shù)據(jù)
*
* @return
*/
List<SearchFileVO> searchES(SearchFileQueryDTO searchFileVO);
}
@Slf4j
@Service
public class ElasticsearchServiceImpl implements ElasticsearchService {
@Autowired
private ElasticsearchClient elasticsearchClient;
@Resource
private FileFeignService feignService;
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
12, // 核心線程數(shù)
20, // 最大線程數(shù)
1, // 線程存活時間
TimeUnit.SECONDS, // 存活時間單位
new ArrayBlockingQueue<>(1000) // 任務隊列
);
public void uploadES(FileSearchDTO fileSearchDTO) {
executor.execute(() -> {
try {
elasticsearchClient.index(i -> i.index("file_search")
.id(fileSearchDTO.getUserFileId())
.document(fileSearchDTO));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
public void deleteES(Long userFileId) {
executor.execute(() -> {
try {
elasticsearchClient.delete(d -> d
.index("file_search")
.id(String.valueOf(userFileId)));
} catch (Exception e) {
log.debug("ES刪除操作失敗,請檢查配置");
}
});
}
@Override
public List<SearchFileVO> searchES(SearchFileQueryDTO searchFileQueryDTO) {
int pageNum = (int) searchFileQueryDTO.getPageNum() - 1;
int pageSize = (int) (searchFileQueryDTO.getPageSize() == 0 ? 10 : searchFileQueryDTO.getPageSize());
SearchResponse<FileSearchDTO> search = null;
try {
search = elasticsearchClient.search(s -> s
.index("file_search")
.query(_1 -> _1
.bool(_2 -> _2
.must(_3 -> _3
.bool(_4 -> _4
.should(_5 -> _5
.match(_6 -> _6
.field("fileName")
.query(searchFileQueryDTO.getFileName())))
.should(_5 -> _5
.wildcard(_6 -> _6
.field("fileName")
.wildcard("*" + searchFileQueryDTO.getFileName() + "*")))
))
.must(_3 -> _3
.term(_4 -> _4
.field("userId")
.value(StpUtil.getLoginIdAsLong())))
))
.from(pageNum)
.size(pageSize)
.highlight(h -> h
.fields("fileName", f -> f.type("plain")
.preTags("<span class='keyword'>").postTags("</span>"))
.encoder(HighlighterEncoder.Html)), FileSearchDTO.class);
} catch (IOException e) {
e.printStackTrace();
}
List<SearchFileVO> searchFileVOList = new ArrayList<>();
if (search != null) {
for (Hit<FileSearchDTO> hit : search.hits().hits()) {
SearchFileVO searchFileVO = new SearchFileVO();
BeanUtil.copyProperties(hit.source(), searchFileVO);
searchFileVO.setHighLight(hit.highlight());
searchFileVOList.add(searchFileVO);
// 如果文件不存在,也從ES中刪除
if (!feignService.getUserFile(searchFileVO.getUserFileId()).getData()) {
executor.execute(() -> this.deleteES(searchFileVO.getUserFileId()));
}
}
}
return searchFileVOList;
}
}
5.5 ElasticsearchController 前端控制器
@RestController
@RequestMapping("/search")
public class ElasticsearchController {
@Autowired
private ElasticsearchService elasticService;
@GetMapping(value = "/searchFile")
public RestResult<SearchFileVO> searchFile(SearchFileQueryDTO searchFileQueryDTO) {
List<SearchFileVO> searchFileVOList = elasticService.searchES(searchFileQueryDTO);
return RestResult.success().dataList(searchFileVOList, searchFileVOList.size());
}
}
5.6 相關實體類
/**
* 文件搜索VO
*/
@Data
public class SearchFileVO {
@JsonSerialize(using = ToStringSerializer.class)
private Long userFileId;
private String fileName;
private String filePath;
private String extendName;
private Long fileSize;
private String fileUrl;
private Map<String, List<String>> highLight;
private Integer isDir;
}
/**
* 文件搜索DTO
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class FileSearchDTO {
private String indexName;
private String userFileId;
private String fileId;
private String fileName;
private String content;
private String fileUrl;
private Long fileSize;
private Integer storageType;
private String identifier;
private Long userId;
private String filePath;
private String extendName;
private Integer isDir;
private String deleteTime;
private String deleteBatchNum;
}
/**
* 文件查詢條件DTO
*/
@Data
public class SearchFileQueryDTO {
@ApiModelProperty("文件名")
private String fileName;
@ApiModelProperty("當前頁")
private long pageNum;
@ApiModelProperty("每頁數(shù)量")
private long pageSize;
}
??6 RabbitMQ相關部分
6.1 生產(chǎn)者部分(文件服務)
6.1.1 引入依賴
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- RabbitMQ(我的SpringBoot是2.6.8的) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.1.2 完整application.yml
server:
port: 8083
spring:
# MySQL配置
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/(數(shù)據(jù)庫名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: (MySQL密碼)
# nacos注冊的服務名
application:
name: netdisk-file
cloud:
nacos:
discovery:
# 配置注冊服務的IP地址
server-addr: (IP地址):8848
username: nacos
password: nacos
# rabbitmq相關的配置
rabbitmq:
host: (IP地址)
port: 5672
virtual-host: (虛擬主機名,比如:/file)
username: (用戶名,默認:guest)
password: (密碼,默認:guest)
6.1.3 RabbitMQConfig 配置類
@Configuration
public class RabbitMQConfig {
// 普通交換機
public static final String FILE_EXCHANGE = "file.exchange";
// 文件保存相關
public static final String QUEUE_FILE_SAVE = "queue.file.save";
public static final String KEY_FILE_SAVE = "key.file.save";
// 文件刪除相關
public static final String QUEUE_FILE_REMOVE = "queue.file.remove";
public static final String KEY_FILE_REMOVE = "key.file.remove";
// 死信相關
public static final String DEAD_LETTER_EXCHANGE = "deadLetter.exchange";
public static final String DEAD_LETTER_QUEUE = "deadLetter.queue";
public static final String KEY_FILE_DEAD_LETTER = "key.file.dead.letter";
//延遲隊列
public static final String DELAY_QUEUE = "delay.queue";
/**
* 文件保存隊列
*
* @return
*/
@Bean
public Queue queueFileSave() {
return new Queue(QUEUE_FILE_SAVE);
}
/**
* 文件刪除隊列
*
* @return
*/
@Bean
public Queue queueFileRemove() {
return new Queue(QUEUE_FILE_REMOVE);
}
/**
* 交換機
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(FILE_EXCHANGE);
}
/**
* 綁定文件保存隊列到交換機
*
* @return
*/
@Bean
public Binding bindFileSave() {
return BindingBuilder.bind(queueFileSave()).to(topicExchange()).with(KEY_FILE_SAVE);
}
/**
* 綁定文件刪除隊列到交換機
*
* @return
*/
@Bean
public Binding bindFileRemove() {
return BindingBuilder.bind(queueFileRemove()).to(topicExchange()).with(KEY_FILE_REMOVE);
}
/**
* 定義延時隊列
*
* @return
*/
@Bean
public Queue delayQueue() {
//設置死信交換機和路由key
return QueueBuilder.durable(DELAY_QUEUE)
//如果消息過時,則會被投遞到當前對應的死信交換機
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
//如果消息過時,死信交換機會根據(jù)routing-key投遞消息到對應的隊列
.withArgument("x-dead-letter-routing-key", KEY_FILE_DEAD_LETTER)
.build();
}
/**
* 定義死信交換機
*
* @return
*/
@Bean
public TopicExchange deadLetterExchange() {
return new TopicExchange(DEAD_LETTER_EXCHANGE);
}
/**
* 定義死信隊列
*
* @return
*/
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
/**
* 綁定死信隊列到死信交換機
*
* @return
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(KEY_FILE_DEAD_LETTER);
}
}
6.1.4 FileDealComp 文件邏輯處理組件偽代碼
/**
* 文件邏輯處理組件
*/
@Slf4j
@Component
public class FileDealComp {
@Autowired
private RabbitTemplate rabbitTemplate;
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
12, // 核心線程數(shù)
20, // 最大線程數(shù)
1, // 線程存活時間
TimeUnit.SECONDS, // 存活時間單位
new ArrayBlockingQueue<>(1000) // 任務隊列
);
/**
* 更新ES數(shù)據(jù)
*
* @param userFileId
*/
public void uploadES(Long userFileId) {
executor.execute(() -> {
FileSearchDTO fileSearchDTO = new FileSearchDTO();
// 通過用戶文件ID查詢用戶文件信息
...
// 通過文件ID查詢文件信息
...
// 將用戶文件信息和文件信息同步到fileSearchDTO對象
...
// 消息隊列更新ES
rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE, RabbitMQConfig.KEY_FILE_SAVE, fileSearchDTO);
});
}
/**
* 刪除ES數(shù)據(jù)
*
* @param userFileId
*/
public void deleteES(Long userFileId) {
// 消息隊列刪除ES
rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE, RabbitMQConfig.KEY_FILE_REMOVE, userFileId);
}
/**
* 分享文件過期處理
*
* @param shareBatchNum 分享批次號
*/
public void expiredShareFile(String shareBatchNum) {
Share share = new Share();
// 根據(jù)分享批次號獲取分享信息
...
// 將分享信息同步到share對象
...
// 定義日期格式
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long differenceInMillis = 0;
try {
// 解析日期字符串為日期對象
Date shareDate = sdf.parse(share.getShareTime());
Date endDate = sdf.parse(share.getEndTime());
// 計算時間差(毫秒數(shù))
differenceInMillis = endDate.getTime() - shareDate.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
// 存活時間
String expiration = Long.toString(differenceInMillis);
// 延時隊列
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_QUEUE, share, message -> {
message.getMessageProperties().setExpiration(expiration);
return message;
});
}
}
6.1.5 ExpiredShareFileListener 過期的分享文件處理監(jiān)聽器
@Slf4j
@Component
@RabbitListener(queues = "my-dlx-queue")
public class ExpiredShareFileListener {
@Autowired
private ShareService shareService;
// 死信相關
public static final String DEAD_LETTER_EXCHANGE = "deadLetter.exchange";
public static final String DEAD_LETTER_QUEUE = "deadLetter.queue";
public static final String KEY_FILE_DEAD_LETTER = "key.file.dead.letter";
@RabbitListener(bindings = {
@QueueBinding(
key = KEY_FILE_DEAD_LETTER,
value = @Queue(value = DEAD_LETTER_QUEUE, durable = "true"),
exchange = @Exchange(value = DEAD_LETTER_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true")
)})
public void receiveShareMessage(Share share) {
log.info("監(jiān)聽到文件過期處理操作:{}", share);
// 將share的分享狀態(tài)改為已過期 → 將share的shareStatus由0改為1
...
log.info("操作完成:{}", share);
}
}
6.2 消費者部分(查詢服務)
6.2.1 引入依賴
<!-- RabbitMQ (我的SpringBoot是2.6.8的) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.2.2 完整application.yml
server:
port: 8084
spring:
# MySQL配置
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/(數(shù)據(jù)庫名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: (MySQL密碼)
# nacos注冊的服務名
application:
name: netdisk-search
cloud:
nacos:
discovery:
# 配置注冊服務的IP地址
server-addr: (IP地址):8848
username: nacos
password: nacos
mvc:
path match:
matching-strategy: ant_path_matcher
servlet:
multipart:
enabled: true
# 單個文件最大限制
max-file-size: 1024MB
# 多個文件最大限制
max-request-size: 2048MB
# rabbitmq相關的配置
rabbitmq:
host: (IP地址)
port: 5672
virtual-host: (虛擬主機名,比如:/file)
username: (用戶名,默認:guest)
password: (密碼,默認:guest)
# elasticsearch相關的配置
elasticsearch:
# ES網(wǎng)關地址
hostname: (IP地址)
# ES網(wǎng)關端口
port: 9200
# ES網(wǎng)官方方案
scheme: http
6.2.3 FileMQListener 文件處理消息隊列監(jiān)聽
@Slf4j
@Component
public class FileMQListener {
// 普通交換機
public static final String FILE_EXCHANGE = "file.exchange";
// 文件保存相關
public static final String QUEUE_FILE_SAVE = "queue.file.save";
public static final String KEY_FILE_SAVE = "key.file.save";
// 文件刪除相關
public static final String QUEUE_FILE_REMOVE = "queue.file.remove";
public static final String KEY_FILE_REMOVE = "key.file.remove";
@Autowired
private ElasticsearchService elasticsearchService;
/**
* 監(jiān)聽文件信息添加操作
*
* @param fileSearchDTO
*/
@RabbitListener(bindings = {@QueueBinding(
key = KEY_FILE_SAVE,
value = @Queue(value = QUEUE_FILE_SAVE, durable = "true"),
exchange = @Exchange(value = FILE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"))})
public void receiveFileSaveMessage(FileSearchDTO fileSearchDTO) {
try {
log.info("監(jiān)聽到文件信息添加操作:{}", fileSearchDTO);
// 更新ES數(shù)據(jù)
elasticsearchService.uploadES(fileSearchDTO);
log.info("添加完成:{}", fileSearchDTO);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 監(jiān)聽文件信息刪除操作
*
* @param userFileId
*/
@RabbitListener(bindings = {@QueueBinding(
key = KEY_FILE_REMOVE,
value = @Queue(value = QUEUE_FILE_REMOVE, durable = "true"),
exchange = @Exchange(value = FILE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"))})
public void receiveFileDeleteMessage(Long userFileId) {
try {
log.info("監(jiān)聽到文件信息刪除操作:{}", userFileId);
// 刪除ES數(shù)據(jù)
elasticsearchService.deleteES(userFileId);
log.info("文件信息刪除完成:{}", userFileId);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
??7 代碼倉庫
netdisk-cloud | Gitee文章來源地址http://www.zghlxwxcb.cn/news/detail-707159.html
到了這里,關于RabbitMQ實現(xiàn)數(shù)據(jù)庫與ElasticSearch的數(shù)據(jù)同步和分享文件過期處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!