分布式搜索引擎03
0.學(xué)習(xí)目標(biāo)
1.數(shù)據(jù)聚合
**聚合(aggregations)**可以讓我們極其方便的實現(xiàn)對數(shù)據(jù)的統(tǒng)計、分析、運算。例如:
- 什么品牌的手機最受歡迎?
- 這些手機的平均價格、最高價格、最低價格?
- 這些手機每月的銷售情況如何?
實現(xiàn)這些統(tǒng)計功能的比數(shù)據(jù)庫的sql要方便的多,而且查詢速度非常快,可以實現(xiàn)近實時搜索效果。
1.1.聚合的種類
聚合常見的有三類:
-
**桶(Bucket)**聚合:用來對文檔做分組
- TermAggregation:按照文檔字段值分組,例如按照品牌值分組、按照國家分組
- Date Histogram:按照日期階梯分組,例如一周為一組,或者一月為一組
-
**度量(Metric)**聚合:用以計算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同時求max、min、avg、sum等
-
**管道(pipeline)**聚合:其它聚合的結(jié)果為基礎(chǔ)做聚合
**注意:**參加聚合的字段必須是keyword、日期、數(shù)值、布爾類型
1.2.DSL實現(xiàn)聚合
現(xiàn)在,我們要統(tǒng)計所有數(shù)據(jù)中的酒店品牌有幾種,其實就是按照品牌對數(shù)據(jù)分組。此時可以根據(jù)酒店品牌的名稱做聚合,也就是Bucket聚合。
1.2.1.Bucket聚合語法
語法如下:
GET /hotel/_search
{
"size": 0, // 設(shè)置size為0,結(jié)果中不包含文檔,只包含聚合結(jié)果
"aggs": { // 定義聚合
"brandAgg": { //給聚合起個名字
"terms": { // 聚合的類型,按照品牌值聚合,所以選擇term
"field": "brand", // 參與聚合的字段
"size": 20 // 希望獲取的聚合結(jié)果數(shù)量
}
}
}
}
結(jié)果如圖:
1.2.2.聚合結(jié)果排序
默認(rèn)情況下,Bucket聚合會統(tǒng)計Bucket內(nèi)的文檔數(shù)量,記為_count,并且按照_count降序排序。
我們可以指定order屬性,自定義聚合的排序方式:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" // 按照_count升序排列
},
"size": 20
}
}
}
}
1.2.3.限定聚合范圍
默認(rèn)情況下,Bucket聚合是對索引庫的所有文檔做聚合,但真實場景下,用戶會輸入搜索條件,因此聚合必須是對搜索結(jié)果聚合。那么聚合必須添加限定條件。
我們可以限定要聚合的文檔范圍,只要添加query條件即可:
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只對200元以下的文檔聚合
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
這次,聚合得到的品牌明顯變少了:
1.2.4.Metric聚合語法
上節(jié)課,我們對酒店按照品牌分組,形成了一個個桶?,F(xiàn)在我們需要對桶內(nèi)的酒店做運算,獲取每個品牌的用戶評分的min、max、avg等值。
這就要用到Metric聚合了,例如stat聚合:就可以獲取min、max、avg等結(jié)果。
語法如下:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { // 是brands聚合的子聚合,也就是分組后對每組分別計算
"score_stats": { // 聚合名稱
"stats": { // 聚合類型,這里stats可以計算min、max、avg等
"field": "score" // 聚合字段,這里是score
}
}
}
}
}
}
這次的score_stats聚合是在brandAgg的聚合內(nèi)部嵌套的子聚合。因為我們需要在每個桶分別計算。
另外,我們還可以給聚合結(jié)果做個排序,例如按照每個桶的酒店平均分做排序:
1.2.5.小結(jié)
aggs代表聚合,與query同級,此時query的作用是?
- 限定聚合的的文檔范圍
聚合必須的三要素:
- 聚合名稱
- 聚合類型
- 聚合字段
聚合可配置屬性有:
- size:指定聚合結(jié)果數(shù)量
- order:指定聚合結(jié)果排序方式
- field:指定聚合字段
1.3.RestAPI實現(xiàn)聚合
1.3.1.API語法
聚合條件與query條件同級別,因此需要使用request.source()來指定聚合條件。
聚合條件的語法:
聚合的結(jié)果也與查詢結(jié)果不同,API也比較特殊。不過同樣是JSON逐層解析:
1.3.2.業(yè)務(wù)需求
需求:搜索頁面的品牌、城市等信息不應(yīng)該是在頁面寫死,而是通過聚合索引庫中的酒店數(shù)據(jù)得來的:
分析:
目前,頁面的城市列表、星級列表、品牌列表都是寫死的,并不會隨著搜索結(jié)果的變化而變化。但是用戶搜索條件改變時,搜索結(jié)果會跟著變化。
例如:用戶搜索“東方明珠”,那搜索的酒店肯定是在上海東方明珠附近,因此,城市只能是上海,此時城市列表中就不應(yīng)該顯示北京、深圳、杭州這些信息了。
也就是說,搜索結(jié)果中包含哪些城市,頁面就應(yīng)該列出哪些城市;搜索結(jié)果中包含哪些品牌,頁面就應(yīng)該列出哪些品牌。
如何得知搜索結(jié)果中包含哪些品牌?如何得知搜索結(jié)果中包含哪些城市?
使用聚合功能,利用Bucket聚合,對搜索結(jié)果中的文檔基于品牌分組、基于城市分組,就能得知包含哪些品牌、哪些城市了。
因為是對搜索結(jié)果聚合,因此聚合是限定范圍的聚合,也就是說聚合的限定條件跟搜索文檔的條件一致。
查看瀏覽器可以發(fā)現(xiàn),前端其實已經(jīng)發(fā)出了這樣的一個請求:
請求參數(shù)與搜索文檔的參數(shù)完全一致。
返回值類型就是頁面要展示的最終結(jié)果:
結(jié)果是一個Map結(jié)構(gòu):
- key是字符串,城市、星級、品牌、價格
- value是集合,例如多個城市的名稱
1.3.3.業(yè)務(wù)實現(xiàn)
在cn.itcast.hotel.web
包的HotelController
中添加一個方法,遵循下面的要求:
- 請求方式:
POST
- 請求路徑:
/hotel/filters
- 請求參數(shù):
RequestParams
,與搜索文檔的參數(shù)一致 - 返回值類型:
Map<String, List<String>>
代碼:
@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
return hotelService.getFilters(params);
}
這里調(diào)用了IHotelService中的getFilters方法,尚未實現(xiàn)。
在cn.itcast.hotel.service.IHotelService
中定義新方法:
Map<String, List<String>> filters(RequestParams params);
在cn.itcast.hotel.service.impl.HotelService
中實現(xiàn)該方法:
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
// 1.準(zhǔn)備Request
SearchRequest request = new SearchRequest("hotel");
// 2.準(zhǔn)備DSL
// 2.1.query
buildBasicQuery(params, request);
// 2.2.設(shè)置size
request.source().size(0);
// 2.3.聚合
buildAggregation(request);
// 3.發(fā)出請求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1.根據(jù)品牌名稱,獲取品牌結(jié)果
List<String> brandList = getAggByName(aggregations, "brandAgg");
result.put("品牌", brandList);
// 4.2.根據(jù)品牌名稱,獲取品牌結(jié)果
List<String> cityList = getAggByName(aggregations, "cityAgg");
result.put("城市", cityList);
// 4.3.根據(jù)品牌名稱,獲取品牌結(jié)果
List<String> starList = getAggByName(aggregations, "starAgg");
result.put("星級", starList);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}
private List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1.根據(jù)聚合名稱獲取聚合結(jié)果
Terms brandTerms = aggregations.get(aggName);
// 4.2.獲取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3.遍歷
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 4.4.獲取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}
2.自動補全
當(dāng)用戶在搜索框輸入字符時,我們應(yīng)該提示出與該字符有關(guān)的搜索項,如圖:
這種根據(jù)用戶輸入的字母,提示完整詞條的功能,就是自動補全了。
因為需要根據(jù)拼音字母來推斷,因此要用到拼音分詞功能。
2.1.拼音分詞器
要實現(xiàn)根據(jù)字母做補全,就必須對文檔按照拼音分詞。在GitHub上恰好有elasticsearch的拼音分詞插件。地址:
https://github.com/medcl/elasticsearch-analysis-pinyin
課前資料中也提供了拼音分詞器的安裝包:
安裝方式與IK分詞器一樣,分三步:
? ①解壓
? ②上傳到虛擬機中,elasticsearch的plugin目錄
? ③重啟elasticsearch
? ④測試
詳細(xì)安裝步驟可以參考IK分詞器的安裝過程。
測試用法如下:
POST /_analyze
{
"text": "如家酒店還不錯",
"analyzer": "pinyin"
}
結(jié)果:
2.2.自定義分詞器
默認(rèn)的拼音分詞器會將每個漢字單獨分為拼音,而我們希望的是每個詞條形成一組拼音,需要對拼音分詞器做個性化定制,形成自定義分詞器。
elasticsearch中分詞器(analyzer)的組成包含三部分:
- character filters:在tokenizer之前對文本進(jìn)行處理。例如刪除字符、替換字符
- tokenizer:將文本按照一定的規(guī)則切割成詞條(term)。例如keyword,就是不分詞;還有ik_smart
- tokenizer filter:將tokenizer輸出的詞條做進(jìn)一步處理。例如大小寫轉(zhuǎn)換、同義詞處理、拼音處理等
文檔分詞時會依次由這三部分來處理文檔:
聲明自定義分詞器的語法如下:
PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定義分詞器
"my_analyzer": { // 分詞器名稱
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": { // 自定義tokenizer filter
"py": { // 過濾器名稱
"type": "pinyin", // 過濾器類型,這里是pinyin
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
測試:
總結(jié):
如何使用拼音分詞器?
-
①下載pinyin分詞器
-
②解壓并放到elasticsearch的plugin目錄
-
③重啟即可
如何自定義分詞器?
-
①創(chuàng)建索引庫時,在settings中配置,可以包含三部分
-
②character filter
-
③tokenizer
-
④filter
拼音分詞器注意事項?
- 為了避免搜索到同音字,搜索時不要使用拼音分詞器
2.3.自動補全查詢
elasticsearch提供了Completion Suggester查詢來實現(xiàn)自動補全功能。這個查詢會匹配以用戶輸入內(nèi)容開頭的詞條并返回。為了提高補全查詢的效率,對于文檔中字段的類型有一些約束:
-
參與補全查詢的字段必須是completion類型。
-
字段的內(nèi)容一般是用來補全的多個詞條形成的數(shù)組。
比如,一個這樣的索引庫:
// 創(chuàng)建索引庫
PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
然后插入下面的數(shù)據(jù):
// 示例數(shù)據(jù)
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}
查詢的DSL語句如下:
// 自動補全查詢
GET /test/_search
{
"suggest": {
"title_suggest": {
"text": "s", // 關(guān)鍵字
"completion": {
"field": "title", // 補全查詢的字段
"skip_duplicates": true, // 跳過重復(fù)的
"size": 10 // 獲取前10條結(jié)果
}
}
}
}
2.4.實現(xiàn)酒店搜索框自動補全
現(xiàn)在,我們的hotel索引庫還沒有設(shè)置拼音分詞器,需要修改索引庫中的配置。但是我們知道索引庫是無法修改的,只能刪除然后重新創(chuàng)建。
另外,我們需要添加一個字段,用來做自動補全,將brand、suggestion、city等都放進(jìn)去,作為自動補全的提示。
因此,總結(jié)一下,我們需要做的事情包括:
-
修改hotel索引庫結(jié)構(gòu),設(shè)置自定義拼音分詞器
-
修改索引庫的name、all字段,使用自定義分詞器
-
索引庫添加一個新字段suggestion,類型為completion類型,使用自定義的分詞器
-
給HotelDoc類添加suggestion字段,內(nèi)容包含brand、business
-
重新導(dǎo)入數(shù)據(jù)到hotel庫
2.4.1.修改酒店映射結(jié)構(gòu)
代碼如下:
// 酒店數(shù)據(jù)索引庫
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
2.4.2.修改HotelDoc實體
HotelDoc中要添加一個字段,用來做自動補全,內(nèi)容可以是酒店品牌、城市、商圈等信息。按照自動補全字段的要求,最好是這些字段的數(shù)組。
因此我們在HotelDoc中添加一個suggestion字段,類型為List<String>
,然后將brand、city、business等信息放到里面。
代碼如下:
package cn.itcast.hotel.pojo;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@Data
@NoArgsConstructor
public class HotelDoc {
private Long id;
private String name;
private String address;
private Integer price;
private Integer score;
private String brand;
private String city;
private String starName;
private String business;
private String location;
private String pic;
private Object distance;
private Boolean isAD;
private List<String> suggestion;
public HotelDoc(Hotel hotel) {
this.id = hotel.getId();
this.name = hotel.getName();
this.address = hotel.getAddress();
this.price = hotel.getPrice();
this.score = hotel.getScore();
this.brand = hotel.getBrand();
this.city = hotel.getCity();
this.starName = hotel.getStarName();
this.business = hotel.getBusiness();
this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
this.pic = hotel.getPic();
// 組裝suggestion
if(this.business.contains("/")){
// business有多個值,需要切割
String[] arr = this.business.split("/");
// 添加元素
this.suggestion = new ArrayList<>();
this.suggestion.add(this.brand);
Collections.addAll(this.suggestion, arr);
}else {
this.suggestion = Arrays.asList(this.brand, this.business);
}
}
}
2.4.3.重新導(dǎo)入
重新執(zhí)行之前編寫的導(dǎo)入數(shù)據(jù)功能,可以看到新的酒店數(shù)據(jù)中包含了suggestion:
2.4.4.自動補全查詢的JavaAPI
之前我們學(xué)習(xí)了自動補全查詢的DSL,而沒有學(xué)習(xí)對應(yīng)的JavaAPI,這里給出一個示例:
而自動補全的結(jié)果也比較特殊,解析的代碼如下:
2.4.5.實現(xiàn)搜索框自動補全
查看前端頁面,可以發(fā)現(xiàn)當(dāng)我們在輸入框鍵入時,前端會發(fā)起ajax請求:
返回值是補全詞條的集合,類型為List<String>
1)在cn.itcast.hotel.web
包下的HotelController
中添加新接口,接收新的請求:
@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key") String prefix) {
return hotelService.getSuggestions(prefix);
}
2)在cn.itcast.hotel.service
包下的IhotelService
中添加方法:
List<String> getSuggestions(String prefix);
3)在cn.itcast.hotel.service.impl.HotelService
中實現(xiàn)該方法:
@Override
public List<String> getSuggestions(String prefix) {
try {
// 1.準(zhǔn)備Request
SearchRequest request = new SearchRequest("hotel");
// 2.準(zhǔn)備DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
// 3.發(fā)起請求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析結(jié)果
Suggest suggest = response.getSuggest();
// 4.1.根據(jù)補全查詢名稱,獲取補全結(jié)果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
// 4.2.獲取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
// 4.3.遍歷
List<String> list = new ArrayList<>(options.size());
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
3.數(shù)據(jù)同步
elasticsearch中的酒店數(shù)據(jù)來自于mysql數(shù)據(jù)庫,因此mysql數(shù)據(jù)發(fā)生改變時,elasticsearch也必須跟著改變,這個就是elasticsearch與mysql之間的數(shù)據(jù)同步。
3.1.思路分析
常見的數(shù)據(jù)同步方案有三種:
- 同步調(diào)用
- 異步通知
- 監(jiān)聽binlog
3.1.1.同步調(diào)用
方案一:同步調(diào)用
基本步驟如下:
- hotel-demo對外提供接口,用來修改elasticsearch中的數(shù)據(jù)
- 酒店管理服務(wù)在完成數(shù)據(jù)庫操作后,直接調(diào)用hotel-demo提供的接口,
3.1.2.異步通知
方案二:異步通知
流程如下:
- hotel-admin對mysql數(shù)據(jù)庫數(shù)據(jù)完成增、刪、改后,發(fā)送MQ消息
- hotel-demo監(jiān)聽MQ,接收到消息后完成elasticsearch數(shù)據(jù)修改
3.1.3.監(jiān)聽binlog
方案三:監(jiān)聽binlog
流程如下:
- 給mysql開啟binlog功能
- mysql完成增、刪、改操作都會記錄在binlog中
- hotel-demo基于canal監(jiān)聽binlog變化,實時更新elasticsearch中的內(nèi)容
3.1.4.選擇
方式一:同步調(diào)用
- 優(yōu)點:實現(xiàn)簡單,粗暴
- 缺點:業(yè)務(wù)耦合度高
方式二:異步通知
- 優(yōu)點:低耦合,實現(xiàn)難度一般
- 缺點:依賴mq的可靠性
方式三:監(jiān)聽binlog
- 優(yōu)點:完全解除服務(wù)間耦合
- 缺點:開啟binlog增加數(shù)據(jù)庫負(fù)擔(dān)、實現(xiàn)復(fù)雜度高
3.2.實現(xiàn)數(shù)據(jù)同步
3.2.1.思路
利用課前資料提供的hotel-admin項目作為酒店管理的微服務(wù)。當(dāng)酒店數(shù)據(jù)發(fā)生增、刪、改時,要求對elasticsearch中數(shù)據(jù)也要完成相同操作。
步驟:
-
導(dǎo)入課前資料提供的hotel-admin項目,啟動并測試酒店數(shù)據(jù)的CRUD
-
聲明exchange、queue、RoutingKey
-
在hotel-admin中的增、刪、改業(yè)務(wù)中完成消息發(fā)送
-
在hotel-demo中完成消息監(jiān)聽,并更新elasticsearch中數(shù)據(jù)
-
啟動并測試數(shù)據(jù)同步功能
3.2.2.導(dǎo)入demo
導(dǎo)入課前資料提供的hotel-admin項目:
運行后,訪問 http://localhost:8099
其中包含了酒店的CRUD功能:
3.2.3.聲明交換機、隊列
MQ結(jié)構(gòu)如圖:
1)引入依賴
在hotel-admin、hotel-demo中引入rabbitmq的依賴:
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)聲明隊列交換機名稱
在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts
包下新建一個類MqConstants
:
package cn.itcast.hotel.constatnts;
public class MqConstants {
/**
* 交換機
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 監(jiān)聽新增和修改的隊列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 監(jiān)聽刪除的隊列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 刪除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
3)聲明隊列交換機
在hotel-demo中,定義配置類,聲明隊列、交換機:
package cn.itcast.hotel.config;
import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
3.2.4.發(fā)送MQ消息
在hotel-admin中的增、刪、改業(yè)務(wù)中分別發(fā)送MQ消息:
3.2.5.接收MQ消息
hotel-demo接收到MQ消息要做的事情包括:
- 新增消息:根據(jù)傳遞的hotel的id查詢hotel信息,然后新增一條數(shù)據(jù)到索引庫
- 刪除消息:根據(jù)傳遞的hotel的id刪除索引庫中的一條數(shù)據(jù)
1)首先在hotel-demo的cn.itcast.hotel.service
包下的IHotelService
中新增新增、刪除業(yè)務(wù)
void deleteById(Long id);
void insertById(Long id);
2)給hotel-demo中的cn.itcast.hotel.service.impl
包下的HotelService中實現(xiàn)業(yè)務(wù):
@Override
public void deleteById(Long id) {
try {
// 1.準(zhǔn)備Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.發(fā)送請求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
// 0.根據(jù)id查詢酒店數(shù)據(jù)
Hotel hotel = getById(id);
// 轉(zhuǎn)換為文檔類型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.準(zhǔn)備Request對象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.準(zhǔn)備Json文檔
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.發(fā)送請求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
3)編寫監(jiān)聽器
在hotel-demo中的cn.itcast.hotel.mq
包新增一個類:
package cn.itcast.hotel.mq;
import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 監(jiān)聽酒店新增或修改的業(yè)務(wù)
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 監(jiān)聽酒店刪除的業(yè)務(wù)
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
4.集群
單機的elasticsearch做數(shù)據(jù)存儲,必然面臨兩個問題:海量數(shù)據(jù)存儲問題、單點故障問題。
- 海量數(shù)據(jù)存儲問題:將索引庫從邏輯上拆分為N個分片(shard),存儲到多個節(jié)點
- 單點故障問題:將分片數(shù)據(jù)在不同節(jié)點備份(replica )
ES集群相關(guān)概念:
-
集群(cluster):一組擁有共同的 cluster name 的 節(jié)點。
-
節(jié)點(node) :集群中的一個 Elasticearch 實例
-
分片(shard):索引可以被拆分為不同的部分進(jìn)行存儲,稱為分片。在集群環(huán)境下,一個索引的不同分片可以拆分到不同的節(jié)點中
解決問題:數(shù)據(jù)量太大,單點存儲量有限的問題。
此處,我們把數(shù)據(jù)分成3片:shard0、shard1、shard2
-
主分片(Primary shard):相對于副本分片的定義。
-
副本分片(Replica shard)每個主分片可以有一個或者多個副本,數(shù)據(jù)和主分片一樣。
?
數(shù)據(jù)備份可以保證高可用,但是每個分片備份一份,所需要的節(jié)點數(shù)量就會翻一倍,成本實在是太高了!
為了在高可用和成本間尋求平衡,我們可以這樣做:
- 首先對數(shù)據(jù)分片,存儲到不同節(jié)點
- 然后對每個分片進(jìn)行備份,放到對方節(jié)點,完成互相備份
這樣可以大大減少所需要的服務(wù)節(jié)點數(shù)量,如圖,我們以3分片,每個分片備份一份為例:
現(xiàn)在,每個分片都有1個備份,存儲在3個節(jié)點:
- node0:保存了分片0和1
- node1:保存了分片0和2
- node2:保存了分片1和2
4.1.搭建ES集群
參考課前資料的文檔:
其中的第四章節(jié):
4.2.集群腦裂問題
4.2.1.集群職責(zé)劃分
elasticsearch中集群節(jié)點有不同的職責(zé)劃分:
默認(rèn)情況下,集群中的任何一個節(jié)點都同時具備上述四種角色。
但是真實的集群一定要將集群職責(zé)分離:
- master節(jié)點:對CPU要求高,但是內(nèi)存要求第
- data節(jié)點:對CPU和內(nèi)存要求都高
- coordinating節(jié)點:對網(wǎng)絡(luò)帶寬、CPU要求高
職責(zé)分離可以讓我們根據(jù)不同節(jié)點的需求分配不同的硬件去部署。而且避免業(yè)務(wù)之間的互相干擾。
一個典型的es集群職責(zé)劃分如圖:
4.2.2.腦裂問題
腦裂是因為集群中的節(jié)點失聯(lián)導(dǎo)致的。
例如一個集群中,主節(jié)點與其它節(jié)點失聯(lián):
此時,node2和node3認(rèn)為node1宕機,就會重新選主:
當(dāng)node3當(dāng)選后,集群繼續(xù)對外提供服務(wù),node2和node3自成集群,node1自成集群,兩個集群數(shù)據(jù)不同步,出現(xiàn)數(shù)據(jù)差異。
當(dāng)網(wǎng)絡(luò)恢復(fù)后,因為集群中有兩個master節(jié)點,集群狀態(tài)的不一致,出現(xiàn)腦裂的情況:
解決腦裂的方案是,要求選票超過 ( eligible節(jié)點數(shù)量 + 1 )/ 2 才能當(dāng)選為主,因此eligible節(jié)點數(shù)量最好是奇數(shù)。對應(yīng)配置項是discovery.zen.minimum_master_nodes,在es7.0以后,已經(jīng)成為默認(rèn)配置,因此一般不會發(fā)生腦裂問題
例如:3個節(jié)點形成的集群,選票必須超過 (3 + 1) / 2 ,也就是2票。node3得到node2和node3的選票,當(dāng)選為主。node1只有自己1票,沒有當(dāng)選。集群中依然只有1個主節(jié)點,沒有出現(xiàn)腦裂。
4.2.3.小結(jié)
master eligible節(jié)點的作用是什么?
- 參與集群選主
- 主節(jié)點可以管理集群狀態(tài)、管理分片信息、處理創(chuàng)建和刪除索引庫的請求
data節(jié)點的作用是什么?
- 數(shù)據(jù)的CRUD
coordinator節(jié)點的作用是什么?
-
路由請求到其它節(jié)點
-
合并查詢到的結(jié)果,返回給用戶
4.3.集群分布式存儲
當(dāng)新增文檔時,應(yīng)該保存到不同分片,保證數(shù)據(jù)均衡,那么coordinating node如何確定數(shù)據(jù)該存儲到哪個分片呢?
4.3.1.分片存儲測試
插入三條數(shù)據(jù):
測試可以看到,三條數(shù)據(jù)分別在不同分片:
結(jié)果:
4.3.2.分片存儲原理
elasticsearch會通過hash算法來計算文檔應(yīng)該存儲到哪個分片:
說明:
- _routing默認(rèn)是文檔的id
- 算法與分片數(shù)量有關(guān),因此索引庫一旦創(chuàng)建,分片數(shù)量不能修改!
新增文檔的流程如下:
解讀:
- 1)新增一個id=1的文檔
- 2)對id做hash運算,假如得到的是2,則應(yīng)該存儲到shard-2
- 3)shard-2的主分片在node3節(jié)點,將數(shù)據(jù)路由到node3
- 4)保存文檔
- 5)同步給shard-2的副本replica-2,在node2節(jié)點
- 6)返回結(jié)果給coordinating-node節(jié)點
4.4.集群分布式查詢
elasticsearch的查詢分成兩個階段:
-
scatter phase:分散階段,coordinating node會把請求分發(fā)到每一個分片
-
gather phase:聚集階段,coordinating node匯總data node的搜索結(jié)果,并處理為最終結(jié)果集返回給用戶
4.5.集群故障轉(zhuǎn)移
集群的master節(jié)點會監(jiān)控集群中的節(jié)點狀態(tài),如果發(fā)現(xiàn)有節(jié)點宕機,會立即將宕機節(jié)點的分片數(shù)據(jù)遷移到其它節(jié)點,確保數(shù)據(jù)安全,這個叫做故障轉(zhuǎn)移。
1)例如一個集群結(jié)構(gòu)如圖:
現(xiàn)在,node1是主節(jié)點,其它兩個節(jié)點是從節(jié)點。
2)突然,node1發(fā)生了故障:
宕機后的第一件事,需要重新選主,例如選中了node2:
node2成為主節(jié)點后,會檢測集群監(jiān)控狀態(tài),發(fā)現(xiàn):shard-1、shard-0沒有副本節(jié)點。因此需要將node1上的數(shù)據(jù)遷移到node2、node3:文章來源:http://www.zghlxwxcb.cn/news/detail-773471.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-773471.html
到了這里,關(guān)于黑馬程序員微服務(wù) 分布式搜索引擎3的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!