當(dāng)酒店數(shù)據(jù)發(fā)生增、刪、改時(shí),要求對(duì)elasticsearch中數(shù)據(jù)也要完成相同操作。
常見的數(shù)據(jù)同步方案有三種:
-
同步調(diào)用
-
異步通知
-
監(jiān)聽binlog
以下使用異步通知同步elasticsearch的數(shù)據(jù)?
引入依賴
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
配置elasticsearch
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@MapperScan(basePackages = "cn.itcast.hotel.mapper")
@SpringBootApplication
public class HotelAdminApplication {
public static void main(String[] args) {
SpringApplication.run(HotelAdminApplication.class, args);
}
// 最新版
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
public RestHighLevelClient client(){
return new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://43.139.59.28:9200")
));
}
}
配置交換機(jī)、隊(duì)列
聲明交換機(jī)、隊(duì)列名稱?
constatnts
包下新建一個(gè)類MqConstants,存儲(chǔ)交換機(jī)和隊(duì)列的名稱
public class MqConstants {
/**
* 交換機(jī)
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 監(jiān)聽新增和修改的隊(duì)列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 監(jiān)聽刪除的隊(duì)列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 刪除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
聲明交換機(jī)、隊(duì)列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
發(fā)送MQ消息 ?
在增、刪、改業(yè)務(wù)中分別發(fā)送MQ消息
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能為空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
接收MQ消息
接收到MQ消息要做的事情包括:
-
新增消息:根據(jù)傳遞的hotel的id查詢hotel信息,然后新增一條數(shù)據(jù)到索引庫
-
刪除消息:根據(jù)傳遞的hotel的id刪除索引庫中的一條數(shù)據(jù)文章來源:http://www.zghlxwxcb.cn/news/detail-697048.html
文檔類
獲取es傳來的值文章來源地址http://www.zghlxwxcb.cn/news/detail-697048.html
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
// 排序時(shí)的 距離值
private Object distance;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
}
}
model類?
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("tb_hotel")
public class Hotel {
@TableId(type = IdType.INPUT)
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String longitude;
private String latitude;
private String pic;
}
?service類
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Resource
private RestHighLevelClient client;
@Override
public void deleteById(Long id) {
try {
// 1.準(zhǔn)備Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.發(fā)送請(qǐng)求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
// 0.根據(jù)id查詢酒店數(shù)據(jù)
Hotel hotel = getById(id);
// 轉(zhuǎn)換為文檔類型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.準(zhǔn)備Request對(duì)象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.準(zhǔn)備Json文檔
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.發(fā)送請(qǐng)求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
監(jiān)聽器
import cn.itcast.hotel.constatnts.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 監(jiān)聽酒店新增或修改的業(yè)務(wù)
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 監(jiān)聽酒店刪除的業(yè)務(wù)
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
到了這里,關(guān)于elasticsearch實(shí)現(xiàn)mysql數(shù)據(jù)同步的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!