一、數(shù)據(jù)同步問(wèn)題分析
之前測(cè)試的數(shù)據(jù)都是一次從mysql導(dǎo)入到es,隨著時(shí)間的推移,每天都有可能發(fā)生增刪改查,不可能每次都全量同步,所以需要考慮增量同步問(wèn)題。
二、解決方案
1、同步調(diào)用
缺點(diǎn):
耦合性高,服務(wù)之間會(huì)相互影響
2、異步通知
依賴消息隊(duì)列的可靠性
3、監(jiān)聽(tīng)binlog
4、方案對(duì)比
三、案例-利用MQ實(shí)現(xiàn)Mysql與Elasticsearch數(shù)據(jù)同步
1、導(dǎo)入hotel-admin項(xiàng)目
啟動(dòng):端口8099文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-510827.html
2、申明exchange、queue、RoutingKey
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-510827.html
/**
* 定義隊(duì)列和exchange
* @author edevp
*/
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.EXCHANGE_NAME, true, false);
}
@Bean
public Queue InsertQueue() {
return new Queue(MqConstants.INSERT_QUEUE_NAME, true);
}
@Bean
public Queue DeleteQueue() {
return new Queue(MqConstants.DELETE_QUEUE_NAME, true);
}
@Bean
public Binding insertBinding() {
return BindingBuilder.bind(InsertQueue()).to(topicExchange()).with(MqConstants.INSERT_KEY);
}
@Bean
public Binding deleteBinding() {
return BindingBuilder.bind(DeleteQueue()).to(topicExchange()).with(MqConstants.DELETE_KEY);
}
}
3、在hotel-admin中完成增刪改查的消息推送
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能為空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_NAME,MqConstants.DELETE_KEY,id);
}
4、在hotel-demo中完成消息監(jiān)聽(tīng)并更新到es中
```java
/**
* 消息監(jiān)聽(tīng)
* @author edevp
**/
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.INSERT_QUEUE_NAME),
exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = MqConstants.INSERT_KEY
))
public void listenHotelInsert(Long hotelId){
// 新增
hotelService.saveById(hotelId);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.DELETE_QUEUE_NAME),
exchange = @Exchange(name = MqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = MqConstants.DELETE_KEY
))
public void listenHotelDelete(Long hotelId){
// 刪除
hotelService.deleteById(hotelId);
}
}
# 四、代碼倉(cāng)庫(kù)
hotel-admin:[https://gitee.com/edevp/hotel-admin](https://gitee.com/edevp/hotel-admin)(hotel-db-sync分支)
hotel-demo:[https://gitee.com/edevp/hotel-demo](https://gitee.com/edevp/hotel-demo)
到了這里,關(guān)于Elasticsearch實(shí)戰(zhàn)-數(shù)據(jù)同步(解決es數(shù)據(jù)增量同步)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!