本片文章只是對(duì)之前寫(xiě)的文章的補(bǔ)充,
es與mysql之間的數(shù)據(jù)同步
http://t.csdn.cn/npHt4
補(bǔ)充一:
之前的文章對(duì)于交換機(jī)、隊(duì)列、綁定,使用的是@bean,
而這里使用的是純注解版
在消費(fèi)方,聲明交換機(jī):
package com.hmall.search.mq;
import com.hmall.search.service.IsearchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.events.Event;
/**
* @author ning
* @since 2022/12/12 0:16
*/
@Slf4j
@Component
public class ItemListener {
@Autowired
private IsearchService isearchService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "up.queue"),//聲明隊(duì)列
exchange = @Exchange(name = "item.topic",type = ExchangeTypes.TOPIC),//聲明交換機(jī)
key = "item.up"http://聲明綁定關(guān)系
))
private void listenItemUp(Long id){
log.info("監(jiān)聽(tīng)到上架消息:"+ id);
isearchService.saveitById(id);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "down.queue"),
exchange = @Exchange(name = "item.topic",type = ExchangeTypes.TOPIC),
key = "item.down"
))
private void listenItemDown(Long id){
log.info("監(jiān)聽(tīng)到下架消息:"+ id);
isearchService.deleteItemById(id);
}
}
補(bǔ)充二:
之前的文章是直接使用es操作數(shù)據(jù),新增和修改,這樣做不是很合適,而且沒(méi)有遵守單一原則,所以這里使用feign遠(yuǎn)程調(diào)用其他模塊的接口方法,
1、新建一個(gè)feign模塊(如果沒(méi)有的話)
可以參考
http://t.csdn.cn/GqMVN
2、在模塊中新建一個(gè)接口ItemClient(使用哪個(gè)模塊就用哪個(gè)模塊名+Client),在模塊中定義你要在es中調(diào)用的方法,(也就是es需要操作數(shù)據(jù)庫(kù),但是其他模塊已經(jīng)寫(xiě)完的方法,就不需要再寫(xiě)一遍了)
例如:需要根據(jù)id查詢數(shù)據(jù)庫(kù),就把其他模塊寫(xiě)完的根據(jù)id查詢數(shù)據(jù)庫(kù)的方法寫(xiě)到接口里
package com.hmall.client;
import com.hmall.common.dto.Item;
import com.hmall.common.dto.PageDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
/**
* 商品模塊的遠(yuǎn)程調(diào)用
*
* @author ning
* @since 2022/12/9 18:39
*/
//表示對(duì)應(yīng)的是itemservice服務(wù)器
@FeignClient("itemservice")
public interface ItemClient {
//分頁(yè)查詢
@GetMapping("/item/list")
public PageDTO<Item> list(@RequestParam("page") Integer page, @RequestParam("size") Integer size);
//根據(jù)id查詢數(shù)據(jù)
@GetMapping("/item/{id}")
public Item selectById(@PathVariable("id") Long id);
}
以上接口中還有分頁(yè)查詢的內(nèi)容,詳情可以參考文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-527455.html
使用分頁(yè)導(dǎo)入的方式把大量數(shù)據(jù)從mysql導(dǎo)入es
http://t.csdn.cn/XECXD文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-527455.html
3、生產(chǎn)方和消費(fèi)方的代碼,只有消費(fèi)方新增的代碼有一點(diǎn)不同,其他的都一樣
//注入es的工具類
@Autowired
private RestHighLevelClient client;
//注入feign遠(yuǎn)程調(diào)用的接口
@Autowired
private ItemClient itemClient;
@Override
public void saveitById(Long id) {
try {
//使用feign的遠(yuǎn)程調(diào)用接口,查詢數(shù)據(jù)庫(kù)
//查詢一條商品數(shù)據(jù),并轉(zhuǎn)為json
Item item = itemClient.selectById(id);
ItemDoc itemDoc = new ItemDoc(item);
String jsonString = JSON.toJSONString(itemDoc);
//創(chuàng)建請(qǐng)求
IndexRequest request = new IndexRequest("item").id(id.toString());
//設(shè)置參數(shù)
request.source(jsonString, XContentType.JSON);
//執(zhí)行請(qǐng)求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
到了這里,關(guān)于補(bǔ)充:es與mysql之間的數(shù)據(jù)同步 2 使用分頁(yè)導(dǎo)入的方式把大量數(shù)據(jù)從mysql導(dǎo)入es的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!