一、數(shù)據(jù)聚合
1.1 聚合種類
聚合(aggregations)可以實現(xiàn)對文檔數(shù)據(jù)的統(tǒng)計、分析、運算。聚合常見的有三類:
- 桶(Bucket)聚合:用來對文檔做分組
TermAggregation:按照文檔字段值分組
Date Histogram:按照日期階梯分組,例如一周為一組,或者一月為一組 - 度量(Metric)聚合:用以計算一些值,比如:最大值、最小值、平均值等
Avg:求平均值
Max:求最大值
Min:求最小值
Stats:同時求max、min、avg、sum等 - 管道(pipeline)聚合:其它聚合的結(jié)果為基礎(chǔ)做聚合
注意:參與聚合的字段類型必須是:keyword、數(shù)值、日期、布爾,一定不能是可分詞的類型。
1.2 DSL實現(xiàn)聚合
# 使用DSL實現(xiàn)聚合
# 1.bucket桶聚合 + 限定聚合范圍
# 例:根據(jù)酒店品牌名做聚合(并且限定價格不高于200的),并按照結(jié)果的升序排序,顯示前5個品牌
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0, //設(shè)置size為0,結(jié)果中不包含文檔,只包含聚合結(jié)果
"aggs": { // 定義聚合
"brandAgg": { // 定義聚合名
"terms": { // 聚合類型,按照品牌名聚合,所以選擇term
"field": "brand", // 參與聚合字段
"order": {
"_count": "asc" //指定排序規(guī)則 升序
},
"size": 20 //希望獲得聚合結(jié)果數(shù)
}
}
}
}
# 2.Metrics聚合
# 例:獲得每個品牌的用戶評分的min、max、avg,并且按照avg排序(降序)
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"score_stats.avg": "desc"
}
},
"aggs": { //子聚合
"score_stats": { //子聚合名
"stats": { //聚合類型,stats可以計算min、max、avg等
"field": "score" //聚合字段
}
}
}
}
}
}
1.3 RestAPI實現(xiàn)聚合
/**
* 桶bucket聚合
*/
@Test
void testAgg() throws IOException {
// 1.準備請求
SearchRequest request = new SearchRequest("hotel");
// 2.請求參數(shù)
// 2.1.size
request.source().size(0);
// 2.2.聚合
request.source().aggregation(
AggregationBuilders.terms("brandAgg").field("brand").size(20));
// 3.發(fā)出請求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Aggregations aggregations = response.getAggregations();
// 4.1.根據(jù)聚合名稱,獲取聚合結(jié)果
Terms brandAgg = aggregations.get("brandAgg");
// 4.2.獲取buckets
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
// 4.3.遍歷
for (Terms.Bucket bucket : buckets) {
String brandName = bucket.getKeyAsString();
System.out.println("brandName = " + brandName);
long docCount = bucket.getDocCount();
System.out.println("docCount = " + docCount);
}
}
1.4 演示:多條件聚合
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public Map<String, List<String>> filters() {
try {
// 1.準備請求
SearchRequest request = new SearchRequest("hotel");
// 2.請求參數(shù)
// 2.1.size
request.source().size(0);
// 2.2.聚合
buildAggregation(request);
// 3.發(fā)出請求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1.根據(jù)品牌名稱,獲取聚合結(jié)果
List<String> brandList = getAggByName(aggregations, "brandAgg");
// 放入map
result.put("品牌",brandList);
// 4.2.根據(jù)城市名稱,獲取聚合結(jié)果
List<String> cityList = getAggByName(aggregations, "cityAgg");
// 放入map
result.put("城市",cityList);
// 4.3.根據(jù)星級名稱,獲取聚合結(jié)果
List<String> starList = getAggByName(aggregations, "starAgg");
// 放入map
result.put("星級",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1.根據(jù)聚合名稱,獲取聚合結(jié)果
Terms brandAgg = aggregations.get(aggName);
// 4.2.獲取buckets
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
// 4.3.遍歷
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
public void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
}
}
測試
@SpringBootTest
public class HotelDemoApplicationTest {
@Autowired
private IHotelService hotelService;
@Test
void contextLoads(){
Map<String, List<String>> filters = hotelService.filters();
System.out.println(filters);
}
}
結(jié)果:
二、自動補全
自動補全如下圖所示:
2.1 拼音分詞器
要實現(xiàn)根據(jù)字母做補全,就必須對文檔按照拼音分詞。在GitHub上恰好有elasticsearch的拼音分詞插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin
安裝方式與IK分詞器一樣,分三步:
- 解壓
- 上傳到虛擬機中,elasticsearch的plugin目錄
- 重啟elasticsearch
- 測試
2.2 自定義分詞器
演示:
# 自定義拼音分詞器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
POST /test/_doc/1
{
"id": 1,
"name": "獅子"
}
POST /test/_doc/2
{
"id": 2,
"name": "虱子"
}
GET /test/_search
{
"query": {
"match": {
"name": "掉入獅子籠咋辦"
}
}
}
注意:
拼音分詞器通常在創(chuàng)建索引庫時使用,搜索時使用普通分詞器即可
2.3 DSL自動補全查詢
查詢語法如下
// 自動補全查詢
POST /test/_search
{
"suggest": {
"title_suggest": { // 自定義補全查詢名稱
"text": "s", // 關(guān)鍵字
"completion": {
"field": "title", // 補全字段
"skip_duplicates": true, // 跳過重復(fù)的
"size": 10 // 獲取前10條結(jié)果
}
}
}
}
演示:
# 2.自動補全
# 2.1 創(chuàng)建一個 自動補全的索引庫 屬性有title
DELETE /test
PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
# 2.2 插入示例數(shù)據(jù)
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}
# 2.3 自動補全查詢
# 例:輸入一個關(guān)鍵字s,看自動補全的結(jié)果
# 結(jié)果:"SK-II"、"Sony"和"switch"
POST /test/_search
{
"suggest": {
"title_suggest": {
"text": "s",
"completion": {
"field": "title",
"skip_duplicates": true,
"size": 10
}
}
}
}
結(jié)果:
2.5 實現(xiàn)酒店搜索框自動補全
2.5.1 修改酒店索引庫數(shù)據(jù)結(jié)構(gòu)
1.修改索引庫結(jié)構(gòu)
# 酒店數(shù)據(jù)索引庫
GET /hotel/_mapping
DELETE /hotel
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
# 自動補全查詢
GET /hotel/_search
{
"suggest": {
"mySuggestion": {
"text": "shang",
"completion": {
"field": "suggestion",
"skip_duplicates": true,
"size": 10
}
}
}
}
2.修改HotelDoc
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
private Object distance; //新加加字段"距離":酒店距你選擇位置的距離
private Boolean isAD; //新加加字段"標記":給你置頂?shù)木频晏砑右粋€標記
private List<String> suggestion;//新加該字段用于自動補全
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
// 自動補全字段的處理
this.suggestion = new ArrayList<>();
// 添加品牌、城市
this.suggestion.add(this.brand);
this.suggestion.add(this.city);
// 判斷商圈是否包含/
if (this.business.contains("/")) {
// business有多個值,需要切割
String[] arr = this.business.split("/");
// business的每個值都要加入到suggestion中
Collections.addAll(this.suggestion, arr);
}else{
this.suggestion.add(this.business);
}
}
}
3.【重新導(dǎo)入數(shù)據(jù),不演示,參見之前的批量導(dǎo)入文檔功能】查詢結(jié)果
2.5.2 RestAPI實現(xiàn)自動補全查詢
/**
* 自動補全查詢
*/
@Test
void testSuggest() throws IOException {
// 1.準備請求
SearchRequest request = new SearchRequest("hotel");
// 2.請求參數(shù)
request.source().suggest(new SuggestBuilder().addSuggestion(
"hotelSuggest",
SuggestBuilders
.completionSuggestion("suggestion")
.size(10)
.skipDuplicates(true)
.prefix("s")
));
// 3.發(fā)出請求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Suggest suggest = response.getSuggest();
// 4.1.根據(jù)補全查詢名稱,獲取補全結(jié)果
CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
// 4.2.獲取options
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
// 4.3.獲取補全的結(jié)果
String str = option.getText().toString();
System.out.println(str);
}
}
2.5.3 實戰(zhàn)
Mapper層
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@PostMapping("list")
public PageResult search(@RequestBody RequestParams params) {
return hotelService.search(params);
}
@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {
return hotelService.filters(params);
}
@GetMapping("suggestion")
public List<String> getSuggestion(@RequestParam("key") String key) {
return hotelService.getSuggestion(key);
}
}
Service層
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 自動補全查詢
*/
@Override
public List<String> getSuggestion(String key) {
try {
// 1.準備請求
SearchRequest request = new SearchRequest("hotel");
// 2.請求參數(shù)
request.source().suggest(new SuggestBuilder().addSuggestion(
"hotelSuggest",
SuggestBuilders
.completionSuggestion("suggestion")
.size(10)
.skipDuplicates(true)
.prefix(key)
));
// 3.發(fā)出請求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Suggest suggest = response.getSuggest();
// 4.1.根據(jù)補全查詢名稱,獲取補全結(jié)果
CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
// 4.2.獲取options
List<String> result = new ArrayList<>();
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
// 4.3.獲取補全的結(jié)果
String str = option.getText().toString();
result.add(str);
}
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
結(jié)果演示
三、數(shù)據(jù)同步
3.1 實現(xiàn)數(shù)據(jù)同步的方法
3.2 使用消息隊列MQ實現(xiàn)數(shù)據(jù)同步
3.2.1 導(dǎo)入hotel-admin
3.2.2 聲明交換機、隊列、routingkey
由于增和改都相當于插入,所以共用一個隊列;刪除占用一個隊列。
一、對消費者hotel-demo的操作
- 引入amqp依賴和配置rabbitmq的yml文件
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server:
port: 8089
spring:
datasource:
url: jdbc:mysql://mysql:3306/heima?useSSL=false
username: root
password: 123
driver-class-name: com.mysql.jdbc.Driver
rabbitmq:
host: 192.168.150.101
port: 5672
username: itcast
password: 123321
virtual-host: /
logging:
level:
cn.itcast: debug
pattern:
dateformat: HH:mm:ss:SSS
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
type-aliases-package: cn.itcast.hotel.pojo
- 定義mq的一些常量
public class HotelMqConstants {
// 交換機名稱
public static final String EXCHANGE_NAME = "hotel.topic";
// 新增修改隊列
public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
// 刪除隊列
public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
// 新增修改的RoutingKey
public static final String INSERT_KEY = "hotel.insert";
// 刪除的RoutingKey
public static final String DELETE_KEY = "hotel.delete";
}
- 聲明交換機和隊列,并監(jiān)聽MQ消息【注解方式】
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = HotelMqConstants.INSERT_KEY
))
public void listenHotelInsert(Long hotelId){
// 新增
hotelService.saveById(hotelId);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
key = HotelMqConstants.DELETE_KEY
))
public void listenHotelDelete(Long hotelId){
// 刪除
hotelService.deleteById(hotelId);
}
}
【bean方式】
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder
.bind(insertQueue())
.to(topicExchange())
.with(HotelMqConstants.INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder
.bind(deleteQueue())
.to(topicExchange())
.with(HotelMqConstants.DELETE_KEY);
}
}
- RestAPI實現(xiàn)刪改
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 搜索框查詢
*/
@Override
public PageResult search(RequestParams params) {
try {
// 1.準備Request
SearchRequest request = new SearchRequest("hotel");
// 2.準備請求參數(shù)
// 2.1.多條件查詢和過濾
buildBasicQuery(params, request);
// 2.2.分頁
int page = params.getPage();
int size = params.getSize();
request.source().from((page - 1) * size).size(size);
/**
* 2.3.距離排序
*/
String location = params.getLocation();
if (StringUtils.isNotBlank(location)) {// 不為空則查詢
request.source().sort(SortBuilders
.geoDistanceSort("location", new GeoPoint(location))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
// 3.發(fā)送請求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析響應(yīng)
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException("搜索數(shù)據(jù)失敗", e);
}
}
/**
* 復(fù)合查詢
*/
private void buildBasicQuery(RequestParams params, SearchRequest request) {
// 1.準備Boolean復(fù)合查詢
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
/**
* 1.查詢關(guān)鍵字
* must參與 算分
*/
// 1.1.關(guān)鍵字搜索,match查詢,放到must中
String key = params.getKey();
if (StringUtils.isNotBlank(key)) {
// 不為空,根據(jù)關(guān)鍵字查詢
boolQuery.must(QueryBuilders.matchQuery("all", key));
} else {
// 為空,查詢所有
boolQuery.must(QueryBuilders.matchAllQuery());
}
/**
* 2.條件過濾:多條件復(fù)合查詢
* 根據(jù) “品牌 城市 星級 價格范圍” 過濾數(shù)據(jù)
* filter不參與 算分
*/
// 1.2.品牌
String brand = params.getBrand();
if (StringUtils.isNotBlank(brand)) { // 不為空則查詢
boolQuery.filter(QueryBuilders.termQuery("brand", brand));
}
// 1.3.城市
String city = params.getCity();
if (StringUtils.isNotBlank(city)) {// 不為空則查詢
boolQuery.filter(QueryBuilders.termQuery("city", city));
}
// 1.4.星級
String starName = params.getStarName();
if (StringUtils.isNotBlank(starName)) {// 不為空則查詢
boolQuery.filter(QueryBuilders.termQuery("starName", starName));
}
// 1.5.價格范圍
Integer minPrice = params.getMinPrice();
Integer maxPrice = params.getMaxPrice();
if (minPrice != null && maxPrice != null) {// 不為空則查詢
maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice;
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));
}
/**
* 3.算分函數(shù)查詢
* 置頂功能:給你置頂?shù)木频晏砑右粋€標記,并按其算分
*/
FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
boolQuery, // 原始查詢,boolQuery
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function數(shù)組
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
QueryBuilders.termQuery("isAD", true), // 過濾條件
ScoreFunctionBuilders.weightFactorFunction(10) // 算分函數(shù)
)
}
);
/**
* 4.設(shè)置查詢條件
*/
request.source().query(functionScoreQuery);
}
/**
* 結(jié)果解析
*/
private PageResult handleResponse(SearchResponse response) {
SearchHits searchHits = response.getHits();
// 4.1.總條數(shù)
long total = searchHits.getTotalHits().value;
// 4.2.獲取文檔數(shù)組
SearchHit[] hits = searchHits.getHits();
// 4.3.遍歷
List<HotelDoc> hotels = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
// 4.4.獲取source
String json = hit.getSourceAsString();
// 4.5.反序列化,非高亮的
HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
// 4.6.處理高亮結(jié)果
// 1)獲取高亮map
Map<String, HighlightField> map = hit.getHighlightFields();
if (map != null && !map.isEmpty()) {
// 2)根據(jù)字段名,獲取高亮結(jié)果
HighlightField highlightField = map.get("name");
if (highlightField != null) {
// 3)獲取高亮結(jié)果字符串數(shù)組中的第1個元素
String hName = highlightField.getFragments()[0].toString();
// 4)把高亮結(jié)果放到HotelDoc中
hotelDoc.setName(hName);
}
}
// 4.8.排序信息
Object[] sortValues = hit.getSortValues(); // 獲取排序結(jié)果
if (sortValues.length > 0) {
/**
* 由于該程序是根據(jù)距離[酒店距你選擇位置的距離]進行排序,所以排序結(jié)果為距離
*/
hotelDoc.setDistance(sortValues[0]);
}
// 4.9.放入集合
hotels.add(hotelDoc);
}
return new PageResult(total, hotels);
}
/**
* 多條件聚合
*/
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
// 1.準備請求
SearchRequest request = new SearchRequest("hotel");
// 2.請求參數(shù)
// 2.1.query查詢信息
buildBasicQuery(params, request);
// 2.2.size
request.source().size(0);
// 2.3.聚合
buildAggregation(request);
// 3.發(fā)出請求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1.根據(jù)品牌名稱,獲取聚合結(jié)果
List<String> brandList = getAggByName(aggregations, "brandAgg");
// 放入map
result.put("品牌",brandList);
// 4.2.根據(jù)城市名稱,獲取聚合結(jié)果
List<String> cityList = getAggByName(aggregations, "cityAgg");
// 放入map
result.put("城市",cityList);
// 4.3.根據(jù)星級名稱,獲取聚合結(jié)果
List<String> starList = getAggByName(aggregations, "starAgg");
// 放入map
result.put("星級",starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1.根據(jù)聚合名稱,獲取聚合結(jié)果
Terms brandAgg = aggregations.get(aggName);
// 4.2.獲取buckets
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
// 4.3.遍歷
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
public void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
}
/**
* 自動補全查詢
*/
@Override
public List<String> getSuggestion(String key) {
try {
// 1.準備請求
SearchRequest request = new SearchRequest("hotel");
// 2.請求參數(shù)
request.source().suggest(new SuggestBuilder().addSuggestion(
"hotelSuggest",
SuggestBuilders
.completionSuggestion("suggestion")
.size(10)
.skipDuplicates(true)
.prefix(key)
));
// 3.發(fā)出請求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Suggest suggest = response.getSuggest();
// 4.1.根據(jù)補全查詢名稱,獲取補全結(jié)果
CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
// 4.2.獲取options
List<String> result = new ArrayList<>();
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
// 4.3.獲取補全的結(jié)果
String str = option.getText().toString();
result.add(str);
}
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteById(Long hotelId) {
try {
// 1.創(chuàng)建request
DeleteRequest request = new DeleteRequest("hotel", hotelId.toString());
// 2.發(fā)送請求
restHighLevelClient.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException("刪除酒店數(shù)據(jù)失敗", e);
}
}
@Override
public void saveById(Long hotelId) {
try {
// 查詢酒店數(shù)據(jù),應(yīng)該基于Feign遠程調(diào)用hotel-admin,根據(jù)id查詢酒店數(shù)據(jù)(現(xiàn)在直接去數(shù)據(jù)庫查)
Hotel hotel = getById(hotelId);
// 轉(zhuǎn)換
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.創(chuàng)建Request
IndexRequest request = new IndexRequest("hotel").id(hotelId.toString());
// 2.準備參數(shù)
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.發(fā)送請求
restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException("新增酒店數(shù)據(jù)失敗", e);
}
}
}
二、對發(fā)送者hotel-admin的操作
- 引入amqp依賴和配置rabbitmq的yml文件【同上】
- 定義mq的一些常量【同上】
- 當發(fā)送者對mysql數(shù)據(jù)庫改動時,發(fā)送消息給MQ
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
// 注入發(fā)送消息的api
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 根據(jù)id查詢
*/
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
/**
* 查詢當前頁內(nèi)容
*/
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
/**
* 新增,并發(fā)送給mq消息
*/
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
// 新增酒店
hotelService.save(hotel);
// 發(fā)送MQ消息
rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
}
/**
* 修改,并發(fā)送給mq消息
*/
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能為空");
}
hotelService.updateById(hotel);
// 發(fā)送MQ消息
rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
}
/**
* 刪除,并發(fā)送給mq消息
*/
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
// 發(fā)送MQ消息
rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
}
}
四、集群
單機的elasticsearch做數(shù)據(jù)存儲,必然面臨兩個問題:海量數(shù)據(jù)存儲問題、單點故障問題。
>> 海量數(shù)據(jù)存儲問題:將索引庫從邏輯上拆分為N個分片(shard),存儲到多個節(jié)點
>> 單點故障問題:將分片數(shù)據(jù)在不同節(jié)點備份(replica )
4.1 搭建ES集群
我們會在單機上利用docker容器運行多個es實例來模擬es集群。不過生產(chǎn)環(huán)境推薦大家每一臺服務(wù)節(jié)點僅部署一個es的實例。
部署es集群可以直接使用docker-compose來完成,但這要求你的Linux虛擬機至少有4G的內(nèi)存空間
- 創(chuàng)建es集群
首先編寫一個docker-compose文件,內(nèi)容如下:
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9202:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
es運行需要修改一些linux系統(tǒng)權(quán)限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf
添加下面的內(nèi)容:
vm.max_map_count=262144
然后執(zhí)行命令,讓配置生效:
sysctl -p
通過docker-compose啟動集群:
docker-compose up -d
- 集群狀態(tài)監(jiān)控
kibana可以監(jiān)控es集群,不過新版本需要依賴es的x-pack 功能,配置比較復(fù)雜。
這里推薦使用cerebro來監(jiān)控es集群狀態(tài),官方網(wǎng)址:https://github.com/lmenezes/cerebro
課前資料已經(jīng)提供了安裝包:
解壓即可使用,非常方便。
解壓好的目錄如下:
進入對應(yīng)的bin目錄:
雙擊其中的cerebro.bat文件即可啟動服務(wù)。
訪問http://localhost:9000 即可進入管理界面:
輸入你的elasticsearch的任意節(jié)點的地址和端口,點擊connect即可:
綠色的條,代表集群處于綠色(健康狀態(tài))。
- 創(chuàng)建索引庫
1)利用kibana的DevTools創(chuàng)建索引庫
在DevTools中輸入指令:
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片數(shù)量
"number_of_replicas": 1 // 副本數(shù)量
},
"mappings": {
"properties": {
// mapping映射定義 ...
}
}
}
2)利用cerebro創(chuàng)建索引庫
利用cerebro還可以創(chuàng)建索引庫:
填寫索引庫信息:
點擊右下角的create按鈕:
- 查看分片效果
回到首頁,即可查看索引庫分片效果:
4.2 集群職責和腦裂問題
4.3 集群故障轉(zhuǎn)移
4.4 集群分布式存儲與查詢
文章來源:http://www.zghlxwxcb.cn/news/detail-843587.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-843587.html
到了這里,關(guān)于微服務(wù)技術(shù)棧SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(五):分布式搜索 ES-下的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!