一、ElasticSearch
1 基本概念
全文檢索工具:快速儲存、搜索和分析海量數(shù)據(jù)。
- Index (索引) → Mysql的庫
- 動詞,相當于MySQL中的insert;
- 名詞,相當于MySQL中的Database。
- Type (類型) → Mysql的表(過時)
- 在Index中,可以定義一個或多個類型。類似于MySQL中的Table;每一種類型的數(shù)據(jù)放在一起。
- Document (文檔) → Mysql的記錄
- 保存在某個Index下,某種Type的一個數(shù)據(jù) (Document),文檔是JSON格式的,Document就像是MySQL中的某個Table里面的內(nèi)容。
- 倒排索引
-
詞 記錄 紅海 1,2,3,4,5 行動 1,2,3 探索 2,5 特別 3,5 記錄篇 4 特工 5 分詞:將整句分拆為單詞
保存的記錄
1 - 紅海行動
2 - 探索紅海行動
3 - 紅海特別行動
4 - 紅海紀錄篇
5 - 特工紅海特別探索檢索
紅海特工行動?
紅海行動?相關(guān)性得分
比如搜索“紅海特別行動”,找到詞“紅?!?,“特工”和“行動”,共涉及到記錄1, 2, 3, 4, 5。
對于記錄1和5,都命中了兩個詞。但是記錄1只拆分出了2個詞,記錄5拆分出了4個詞,所以記錄1的相關(guān)性得分會更高。
2 安裝
docker pull elasticsearch:7.4.2 # 存儲和檢索數(shù)據(jù)
docker pull kibana:7.4.2 # 可視化檢索數(shù)據(jù)
mkdir -p /mydata/elasticsearch/config # 用于掛載
mkdir -p /mydata/elasticsearch/data # 用于掛載
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml # 允許任何IP訪問
2.1 安裝ElasticSearch
docker run命令:
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \ # 9200-API端口 9300-集群通信端口
-e "discovery.type=single-node" \ # 指定參數(shù)
-e ES_JAVA_OPTS="-Xms64m -Xmx128m" \ # 指定參數(shù)
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2 # 指定后臺啟動鏡像
下面供復(fù)制使用:
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx128m" -v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /mydata/elasticsearch/data:/usr/share/elasticsearch/data -v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins -d elasticsearch:7.4.2
問題1 運行后
docker ps
沒有運行,使用docker logs elasticsearch
查看報錯原因如下:... "org.elasticsearch.bootstrap.StartupException: ElasticsearchException[failed to bind service]; nested: AccessDeniedException[/usr/share/elasticsearch/data/nodes];" ... "Caused by: java.nio.file.AccessDeniedException: /usr/share/elasticsearch/data/nodes"
解決 宿主機掛載目錄權(quán)限問題,給要被掛載的目錄所有用戶的讀寫權(quán)限:
chmod -R 777 /mydata
-R是給目錄下所有文件賦予權(quán)限
2.2 安裝Kibana
docker run --name kibana -e ELASTICSEARCH_HOSTS=http://172.16.212.10:9200 -p 5601:5601 -d kibana:7.4.2
3 初步檢索
3.1 _cat 查看ES的節(jié)點信息
請求 | 說明 |
---|---|
GET /_cat/nodes | 查看所有節(jié)點 |
GET /_cat/health | 查看ES健康狀況 |
GET /_cat/master | 查看主節(jié)點 |
GET /_cat/indices | 查看所有索引 → show databases; |
3.2 索引一個文檔
_
開頭的是元數(shù)據(jù)?,F(xiàn)在type已不推薦使用,使用 _doc
請求 | 說明 |
---|---|
PUT customer/external/1 body {“name”: “John”} |
在customer索引下的external類型下保存1號數(shù)據(jù)為請求體中的JSON內(nèi)容。如果之前存在對應(yīng)ID的數(shù)據(jù),則會進行更新;否則新增。必須攜帶ID。 |
POST customer/external/1body {“name”: “John”} |
同上,但是可以不帶ID,表示新增。 |
GET customer/external/1 |
查詢一條數(shù)據(jù)。返回信息中:_seq_no: 6 , 并發(fā)控制字段,每次更新就會+1,用來做樂觀鎖_primary_term: 1 , 同上,主分片重新分配、重啟就會變化更新時攜帶下面的參數(shù): ?if_seq_no=0&if_primary_term=1 ,比如只想修改序列號為1時的數(shù)據(jù),別人如果中途修改過就不再修改,返回409錯誤并提供新的版本號。 |
POST customer/external/1/_updatebody {“doc”: {“name”: “John”}} |
更新文檔。會對比原來數(shù)據(jù),如果更新前后沒有變化,那么序列號和版本都不會增加。前面兩種更新都會修改。 |
DELETE customer/external/1 |
刪除一條數(shù)據(jù)。 |
DELETE customer |
刪除索引。ES不支持刪除類型。 |
# 在Kibana中執(zhí)行POST customer/external/_bulk{“index”:{“_id”:“1”}} {“name”:{“name”:“John”}} {“index”:{“_id”:“2”}} {“name”:{“name”:“Jack”}} |
批量保存。上一條失敗不會影響下一條。語法格式: {action: {metadata}} {request body} actions: delete create title index
|
4 進階檢索
4.1 SearchAPI
select * from bank order by account_number asc
GET bank/_search?q=*&sort=account_number:asc
4.2 QueryDSL
4.2.1 基本查詢
select balance, firstname from bank order by account_number asc limit 0, 5
GET bank/_search
{
"query": {
"match_all": {}
},
"sort": [{
"account_number": "asc"
},
{
"balance": "desc"
}],
"from": 0,
"size": 5,
"_source": ["balance", "firstname"]
}
- match
select * from bank where balance = 16418
GET bank/_search { "query": { "match": { "balance": 16418 } } }
- match_phrase
select * from bank where address like '%mill lane%'
(MySQL和ES都忽略大小寫)GET bank/_search { "query": { "match_phrase": { "address": "mill lane" } } }
- multi_match
select * from bank where address like '%mill%' or city like '%mill%' or address like '%movico%' or city like '%movico%'
GET bank/_search { "query": { "multi_match": { "query": "mill movico", "fields": ["address", "city"] } } }
- bool
可以使用must
must not
should
,均為字面意思,符合查詢值的會貢獻相關(guān)性得分。
也可以使用filter
進行過濾,但不會計算相關(guān)性得分。GET bank/_search { "query": { "bool": { "must": [ { "match": { "gender": "F" } } ]}}}
- term
精確值查詢。但是查詢text
字段由于存在分詞分析的原因,會查詢不到,還是需要使用match
。
實踐GET bank/_search { "query": { "term": { "age": "28" } } }
全文檢索字段使用match
,其他非text
字段匹配用term
。
4.2.2 聚合
- terms聚合
- 搜索address中包含mill的所有人的年齡分布以及平均年齡
-
select * from bank
+select age, count(age) from age group by age
GET bank/_search { "query": { "match_all": {} }, "aggs": { "ageAgg": { "terms": { "field": "age", "size": 10 # 只看10條聚合結(jié)果 } } }, "size": 0 # 不看具體記錄只看聚合結(jié)果 }
-
- 對text類型字段做聚合需要添加后綴
.keyword
。 - 嵌套聚合:
with (select age, count(age) from bank group by age) as t1
select avg(balance) from bank, t1 on bank.age = t1.age group by balance
GET bank/_search { "query": { "match_all": {} }, "aggs": { "ageAgg": { "terms": { "field": "age", "size": 10 # 只看10條聚合結(jié)果 }, "aggs": { "balanceAvg": { "avg": { "field": "balance" } } } } }, "size": 0 # 不看具體記錄只看聚合結(jié)果 }
- avg聚合
- 求平均值
select avg(age) from bank
GET bank/_search { "query": { "match_all": {} }, "aggs": { "ageAvg": { "avg": { "field": "age", } } }, "size": 0 # 不看具體記錄只看聚合結(jié)果 }
其他聚合類型參考官方文檔。
4.3 Mapping
- 指定字段類型。
PUT /my_index
{
"mappings": {
"properties": {
"age": {"type": "integer"},
"email": {"type": "keyword"},
"name": {"type": "text"} # text類型會進行分詞分析
}
}
}
- 給已有映射添加字段
employee-id
:
PUT /my_index/_mapping
{
"properties": {
"employee-id": {
"type": "keyword",
"index": false # 控制這個字段是否可以被查詢(false為冗余字段)
}
}
}
- 對于已存在的索引,不能進行更新。只能創(chuàng)建新的索引,進行數(shù)據(jù)遷移。
POST _reindex
{
"source": {
"index": "bank",
"type": "account" # 過時,遷移后默認變成_doc
},
"dest": {
"index": "newbank"
}
}
4.4 分詞
默認按空格進行分詞,忽略句尾句號。但是這樣無法對中文進行分詞(被拆分為單字)。
POST _analyze
{
"analyzer": "standard",
"text": "中文之間是沒有空格的"
}
在GitHub上下載對應(yīng)ES版本的ik分詞器https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.4.2,使用wget下載到掛載插件目錄下
cd /mydata/elasticsearch/plugins
wget https://www.notion.so/1-ElasticSearch-a3b1bbe49f404ee8898d1b99b76d812a#061e5af6f9f940509ca3e89197754abc
unzip -d ik elasticsearch-analysis-ik-7.4.2.zip
rm elasticsearch-analysis-ik-7.4.2.zip
然后進入docker容器使用elasticsearch-plugin list
查看安裝好的插件。
使用ik分詞器
POST _analyze
{
"analyzer": "ik_max_word",
"text": "我是中國人"
}
我們經(jīng)常需要自定義詞庫,可以將自定義詞庫部署到nginx讓ES來訪問。
附錄——調(diào)整ES占用內(nèi)存最大為512M
刪除原來的容器,新建一個。
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx512m" -v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /mydata/elasticsearch/data:/usr/share/elasticsearch/data -v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins -d elasticsearch:7.4.2
附錄——安裝Nginx
- 隨便啟動一個nginx實例,為了復(fù)制其中配置文件
docker run -p 80:80 --name nginx -d nginx:1.10
- 將容器內(nèi)的配置文件拷貝到當前目錄
docker container cp nginx:/etc/nginx .
- 修改文件名,并移動到/mydata/nginx下
mv nginx conf
mkdir /mydata/nginx
mv conf /mydata/nginx
- 終止原容器并刪除
docker stop nginx
docker rm nginx
- 創(chuàng)建新的nginx
docker run -p 80:80 --name nginx -v /mydata/nginx/html:/usr/share/nginx/html -v /mydata/nginx/logs:/var/log/nginx -v /mydata/nginx/conf:/etc/nginx -d nginx:1.10
Nginx可以直接通過路徑訪問html目錄下的文件。
在nginx的html目錄下創(chuàng)建一個txt文件存儲自定義詞庫
cd /mydata/nginx/html
mkdir es
cd es
vim segmentation.txt # 詞匯換行存儲
然后修改ES中ik插件的配置文件
cd /mydata/elasticsearch/plugins/ik/config
vim IKAnalyzer.cfg.xml
# 將下面這條配置取消注釋,并配置為自己的自定義詞庫文件
<!--用戶可以在這里配置遠程擴展字典 -->
<entry key="remote_ext_dict">http://172.16.212.10/es/segmentation.txt</entry>
重啟ES就可以生效了docker restart elasticsearch
5 整合Java
使用官方提供的Java High Level REST Client
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-getting-started-maven.html
Maven導入:
<properties>
<java.version>1.8</java.version>
<!-- springdata對es版本做了管理,在這里覆蓋掉 -->
<elasticsearch.version>7.4.2</elasticsearch.version>
</properties>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
問題1 按照視頻中導入上方陪配置后報錯
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'configurationPropertiesBeans' defined in class path resource
解決 nacos和springboot版本沖突,引入依賴管理:
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
@Test
void indexData() throws IOException {
IndexRequest indexRequest = new IndexRequest("users");
indexRequest.id("1");
// indexRequest.source("userName", "zhangsan", "age", 18, "gender", "男");
User user = new User();
String jsonStr = JSON.toJSONString(user);
indexRequest.source(jsonStr, XContentType.JSON);
IndexResponse index = client.index(indexRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
System.out.println(index);
}
@Test
void searchData() throws IOException {
SearchRequest searchRequest = new SearchRequest();
// 指定在哪里檢索
searchRequest.indices("bank");
// 指定DSL,檢索條件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 構(gòu)造檢索條件
sourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
sourceBuilder.aggregation(AggregationBuilders.terms("ageAgg").field("age").size(10));
sourceBuilder.aggregation(AggregationBuilders.avg("balanceAvg").field("balance"));
searchRequest.source(sourceBuilder);
// 執(zhí)行檢索
SearchResponse response = client.search(searchRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
// 分析結(jié)果
SearchHits hits = response.getHits();
SearchHit[] hits1 = hits.getHits();
for (SearchHit hit : hits) {
String hitStr = hit.getSourceAsString();
System.out.println("hit: " + hitStr);
}
Aggregations aggregations = response.getAggregations();
Terms ageAgg = aggregations.get("ageAgg");
for (Terms.Bucket bucket : ageAgg.getBuckets()) {
String keyStr = bucket.getKeyAsString();
System.out.println("age: " + keyStr);
}
Avg balanceAvg = aggregations.get("balanceAvg");
System.out.println("balance avg: " + balanceAvg.getValue());
}
關(guān)于nested字段類型
二、Nginx
讓Nginx幫我們進行反向代理,所有來自gulimall.com的請求,都轉(zhuǎn)到商品服務(wù)。
nginx.conf
??|— 全局塊
配置影響nginx全局的指令。如:用戶組,nginx進程pid存放路徑,日志存放路徑。配置文件引入,允許生成worker process數(shù)等
??|— events塊
配置影響nginx服務(wù)器或與用戶的網(wǎng)絡(luò)連接。如:每個進程的最大連接數(shù),選取哪種事件驅(qū)動模型處理連接請求,是否允許同時接受多個網(wǎng)路連接,開啟多個網(wǎng)絡(luò)連接序列化等
??|— http塊
可以嵌套多個server,配置代理,緩存,日志定義等絕大多數(shù)功能和第三方模塊的配置,如文件引入,mime-type定義,日志自定義,是否使用sendile傳掩文件,連接超時時間,單連接請求數(shù)等
????|— http全局塊
如upstream,錯誤頁面,連接超時等
????|— server塊
配置虛擬主機的相關(guān)參數(shù)?!獋€http中可以有多個server
????????|— location
配置請求的路由,以及各種頁面的處理情況
????????|— location
????????|— ...
- 在nginx.conf總配置中,有一條
include /etc/nginx/conf.d/*.conf;
表示會讀取這conf.d目錄下所有的conf配置文件 - 在conf.d下創(chuàng)建一個gulimall.conf來配置代理轉(zhuǎn)發(fā),修改其中的server_name為gulimall.com,表示將所有請求頭中host為gulimall.com的進行監(jiān)聽
- 再修改location中的內(nèi)容配置代理路徑
listen 80; server_name gulimall.com; #charset koi8-r; #access_log /var/log/nginx/log/host.access.log main; location / { proxy_pass http://172.16.212.1:10000; }
缺點 如果后期微服務(wù)實例增多,需要再進入配置文件進行修改。
解決方式 給Nginx的上游服務(wù)器配置為網(wǎng)關(guān),將所有匹配請求轉(zhuǎn)發(fā)到網(wǎng)關(guān),由網(wǎng)關(guān)再進行轉(zhuǎn)發(fā)。
- 修改nginx.conf,添加上游服務(wù)器配置:
# 配置上游服務(wù)器 upstream gulimall { server 172.16.212.1:88; # 這個是網(wǎng)關(guān)URL }
- 再在gulimall.conf中將proxy_pass轉(zhuǎn)發(fā)目的地址改為gulimall
注意 Nginx代理給網(wǎng)關(guān)的時候,會丟失請求頭的host信息
解決方式 配置proxy_set_header Host $host
location / { proxy_set_header Host $host; # 添加header Host,內(nèi)容為原來的host內(nèi)容 proxy_pass http://gulimall; }
- 在網(wǎng)關(guān)服務(wù)的application.yaml中添加域名轉(zhuǎn)發(fā)配置:
- id: host_route uri: lb://product predicates: - Host=**.gulimall.com
動靜分離
將靜態(tài)資源如js, css和圖片放到Nginx來處理,減輕Tomcat的負擔。
- 首先將所有的靜態(tài)資源(index目錄)拷貝到nginx目錄中:
/mydata/nginx/html/static
- 修改gulimall.conf配置,添加static訪問路徑直接訪問nginx目錄中內(nèi)容的配置:
location /static/ { root /usr/share/nginx/html; }
三、壓力測試
1 性能指標
- 響應(yīng)時間 (Response Time: ST)
響應(yīng)時間指用戶從客戶端發(fā)起一個請求開始,到客戶端接收到從服務(wù)器端返回的響應(yīng)結(jié)束,整個過程所耗費的時間。 - HPS(Hits Per Second):每秒點擊次數(shù),單位是次秘。
- TPS(Transaction per Second):系統(tǒng)每秒處理交易數(shù),單位是筆/秒。
- QPS(Query per Second):系統(tǒng)每秒處理查詢次數(shù),單位是次/秒。
對于互聯(lián)網(wǎng)業(yè)務(wù)中,如果某些業(yè)務(wù)有且僅有一個請求連接,那么TPS=QPS=HPS,一般情記下用 TPS 來衡量整個業(yè)務(wù)流程,用 QPS 來衡量接口查詢次數(shù),用 HPS 來表示對服務(wù)器單擊請求。 - 無論TPS、QPS、HPS,此指標是衡量系統(tǒng)處理能力非常重要的指標,越大越好,根據(jù)經(jīng)驗,一般情況下:
- 金融行業(yè):1000TPS~50000TPS,不包括互聯(lián)網(wǎng)化的活動
- 保險行業(yè):100TPS~100000TPS,不包括互聯(lián)網(wǎng)化的活動
- 制造行業(yè):10TPS~5000TPS
- 互聯(lián)網(wǎng)電子商務(wù):10000TPS~1000000TPS
- 互聯(lián)網(wǎng)中型網(wǎng)站:1000TPS~5000OTPS
- 互聯(lián)網(wǎng)小型網(wǎng)站:500TPS~10000TPS
- 最大響應(yīng)時間(Max Response Time):指用戶發(fā)出請求或者指令到系統(tǒng)做出反應(yīng)(響應(yīng))的最大時間。
- 最少響應(yīng)時間(Minimum Response Time):指用戶發(fā)出請求或者指令到系統(tǒng)作出反應(yīng)(響應(yīng))的最少時間。
- 90%響應(yīng)時間(90% Response Time):是指所有用戶的響應(yīng)時間進行排序,第90%的響應(yīng)時間。
- 從外部看,性能測試主要關(guān)注如下三個指標
- 吞吐量:每秒鐘系統(tǒng)能夠處理的請求數(shù)、任務(wù)數(shù)
- 響應(yīng)時間:服務(wù)處理一個請求或一個任務(wù)的耗時
- 錯誤率:一批請求種結(jié)果出錯的請求所占比例
2 JMeter
brew install jmeter
-
在Options中可以將語言設(shè)置為簡體中文。
-
添加測試配置:
線程數(shù)
來模擬用戶數(shù)量Ramp-Up
設(shè)置執(zhí)行這些請求的時間,這里為200次循環(huán)次數(shù)
重復(fù)多少次 -
HTTP請求配置中:
影響性能考慮點數(shù)據(jù)庫、應(yīng)用程序、中間件(Tomcat、Nginx)、網(wǎng)絡(luò)和操作系統(tǒng)等方面
首先考慮自己的應(yīng)用屬于CPU密集型還是IO密集型
3 jvisualvm(VisualVM)
升級版的jconsole
問題1
在終端中執(zhí)行jvisualvm
后報錯如下:The operation couldn’t be completed. Unable to locate a Java Runtime that supports jvisualvm. Please visit http://www.java.com for information on installing Java.
并且環(huán)境變量中已有$JAVA_HOME
解決
高版本JDK不再自帶jvisualvm。從官網(wǎng) https://visualvm.github.io/download.html 下載VisualVM使用。下載后打開又報錯需要JDK來運行而不是JRE,修改程序目錄下
/etc/visualvm.conf
添加javahome的配置:
visualvm_jdkhome="/Library/Java/JavaVirtualMachines/jdk1.8.0_311.jdk/Contents/Home”
插件中心不需要修改,是用默認的就可以安裝。按照JDK修改后出現(xiàn)安裝插件缺失依賴的問題。
VisualVM中可以看到的線程狀態(tài)
運行Running | 正在運行的 |
---|---|
休眠Sleeping | sleep |
等待Wait | wait |
駐留Park | 線程池里面的空閑線程 |
監(jiān)視Monitor | 阻塞的線程,正在等待鎖 |
4 壓力測試實驗
- 對于nginx測試,可以使用
docker stats
查看容器的CPU和內(nèi)存占用 - 對于網(wǎng)關(guān)測試,直接發(fā)請求到localhost:88,忽略404
壓測內(nèi)容 | 壓測線程數(shù) | 吞吐量/s (耗時原因) | 90%響應(yīng)時間 | 99%響應(yīng)時間 |
---|---|---|---|---|
Nginx | 50 | 5165.8 | 13 | 52 |
Gateway | 50 | 20500.2 | 4 | 16 |
簡單服務(wù) | 50 | 24898.7 | 3 | 5 |
首級一級菜單渲染 | 50 | 634.9 (db, thymeleaf) | 119 | 200 |
首級渲染(開緩存) | 50 | 761.9 | 94 | 151 |
首級渲染(數(shù)據(jù)庫加索引,關(guān)日志) | 50 | 1856.6 | 44 | 81 |
三級分類數(shù)據(jù)獲取 | 50 | 7.7 (db) | 6680 | 7625 |
三級分類數(shù)據(jù)獲?。〝?shù)據(jù)庫加索引) | 50 | 13.7 | 4223 | 4910 |
三級分類數(shù)據(jù)獲取(關(guān)日志) | 50 | 23.7 | 2670 | 2802 |
三級分類數(shù)據(jù)獲?。▋?yōu)化業(yè)務(wù)減少DB查詢次數(shù)) | 50 | 126.8 | 338 | 599 |
三級分類數(shù)據(jù)獲取(使用Redis緩存) | 50 | 511.3 | 124 | 254 |
首頁全量數(shù)據(jù)獲取 | 50 | 360.4 (靜態(tài)資源) | 0 | 701 |
Nginx+Gateway | ||||
Gateway+簡單服務(wù) | 50 | 7267.1 | 12 | 49 |
全鏈路 | 50 | 354.9 | 40 | 61 |
結(jié)論
- 中間件越多,性能損失越大,大多都損失在網(wǎng)絡(luò)交互了
- 業(yè)務(wù)
- DB(MySQL優(yōu)化)
- 模版的渲染速度(緩存)
- 靜態(tài)資源
- Nginx
- 以后將所有項目的靜態(tài)資源都應(yīng)該放在Nginx里面
- 規(guī)則:/static/**所有請求都由Nginx直接返回
四、緩存
1 緩存使用
為了系統(tǒng)性能的提升,我們一般都會將部分的數(shù)據(jù)放入緩存中,加速訪問。而DB承擔數(shù)據(jù)落盤工作。
1.1 哪些數(shù)據(jù)適合放到緩存中?
- 即時性、數(shù)據(jù)一致性要求不高的
- 訪問量大且更新頻率不高的數(shù)據(jù)(讀多,寫少)
舉例
電商類應(yīng)用,商品分類,商品列表等適合緩存并加一個失效時間(根據(jù)更新頻率來定),后臺如果發(fā)布一個商品,買家需要5分鐘才能看到新的商品一般還是可以接受的。
1.2 整合Redis
- 引入
spring-boot-data-redis-starter
- 簡單配置redis的host等信息
spring: redis: host: 172.16.212.10 port: 6379
- 使用SpringBoot自動配置好的StringRedisTemplate來操作Redis
Redis客戶端可能會出現(xiàn)堆外內(nèi)存溢出OutOfDirectMemoryError問題的原因
- SpringBoot2.0以后默認使用lettuce作為操作Redis的客戶端,它使用netty進行網(wǎng)絡(luò)通信
- lettuce的bug導致堆外內(nèi)存溢出
在服務(wù)中設(shè)置了-Xmx300m
最大堆內(nèi)存,如果netty沒有指定堆外內(nèi)存,默認使用這個配置
解決方案
首先,不能使用 -Dio.netty.maxDirectMemory
去調(diào)大堆外內(nèi)存
- 升級lettuce客戶端
- 切換使用Jedis,修改POM文件如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
lettuce 和 jedis是什么關(guān)系
lettuce和Jedis都是操作Redis的底層客戶端,Spring再次封裝成RedisTemplate,同時引入了兩者可以根據(jù)需要選用
2 緩存失效問題
2.1 緩存穿透
(查詢不存在數(shù)據(jù))
出現(xiàn)原因
查詢一個一定不存在的數(shù)據(jù),由于緩存沒命中,去查數(shù)據(jù)庫,數(shù)據(jù)庫也不存在這條記錄,不會寫入緩存,之后再請求這個不存在數(shù)據(jù)都會到數(shù)據(jù)庫去查詢。利用不存在的數(shù)據(jù)進行攻擊,數(shù)據(jù)庫瞬時壓力增大,導致崩潰
解決
null結(jié)果緩存,并加入短暫過期時間
2.2 緩存雪崩
(大面積key同時失效)
出現(xiàn)原因
設(shè)置的key采用了相同的過期時間,導致某一時刻同時失效,請求全部轉(zhuǎn)發(fā)到數(shù)據(jù)庫,數(shù)據(jù)庫瞬時壓力過重雪崩
解決
原有的失效時間基礎(chǔ)上增就一個隨機值,比如1-5分鐘隨機,這樣每一個緩存的過期時間的重復(fù)率就會降低,就很難引發(fā)集體失效的事件
2.3 緩存擊穿
(某一個高頻熱點key失效)
出現(xiàn)原因
對于一些設(shè)置了過期時間的key,可能會在某些時間點被超高并發(fā)訪問,是一種熱點數(shù)據(jù)。如果這個key再大量請求同時進來之前正好失效,所有查詢進入到數(shù)據(jù)庫,數(shù)據(jù)庫崩潰
解決
加鎖:大量并發(fā)只讓一個人去查,其他人等待。查到以后釋放鎖,其他人獲取到鎖,先查緩存,就會有數(shù)據(jù),不用去數(shù)據(jù)庫
3 緩存數(shù)據(jù)一致性
3.1 雙寫模式
數(shù)據(jù)更新時同時修改數(shù)據(jù)庫和緩存。
數(shù)據(jù)更新
→ 寫數(shù)據(jù)庫
→ 寫緩存
讀到的最新數(shù)據(jù)有延遲:最終一致性。
臟數(shù)據(jù)問題
由于卡頓等原因,導致寫緩存2在最前,寫緩存1在后面就出現(xiàn)了不一致。
這是暫時的臟數(shù)據(jù)問題,但是在數(shù)據(jù)穩(wěn)定,緩存過期以后,又能得到最新的正確數(shù)據(jù)。
3.2 失效模式
數(shù)據(jù)更新時更新數(shù)據(jù)庫,刪掉緩存中的舊數(shù)據(jù)。數(shù)據(jù)更新
→ 寫數(shù)據(jù)庫
→ 刪緩存
問題
3.3 小結(jié)
- 無論是雙寫模式還是失效模式,都會導致緩存的不一致問題。即多個實例同時更新會出事。怎么辦?
- 如果是用戶緯度數(shù)據(jù)(訂單數(shù)據(jù)、用戶數(shù)據(jù)),這種并發(fā)幾率非常小,不用考慮這個問題,緩存數(shù)據(jù)加上過期時間,每隔一段時間觸發(fā)讀的主動更新即可
- 如果是菜單,商品介紹等基礎(chǔ)數(shù)據(jù),也可以去使用canal訂閱binlog的方式。
- 緩存數(shù)據(jù)+過期時間也足夠解決大部分業(yè)務(wù)對于緩存的要求。
- 通過加鎖保證并發(fā)讀寫,寫寫的時候按順序排好隊。讀讀無所謂。所以適合使用讀寫鎖。(業(yè)務(wù)不關(guān)心臟數(shù)據(jù),允許臨時臟數(shù)據(jù)可忽略)。
- 總結(jié)
- 我們能放入緩存的數(shù)據(jù)本就不應(yīng)該是實時性、一致性要求超高的。所以緩存數(shù)據(jù)的時候加上過期時間,保證每天拿到當前最新數(shù)據(jù)即可。
- 我們不應(yīng)該過度設(shè)計,增加系統(tǒng)的復(fù)雜性
- 遇到實時性、一致性要求高的數(shù)據(jù),就應(yīng)該查數(shù)據(jù)庫,即使慢點。
3.4 解決
使用Canal更新緩存
)
使用Canal解決數(shù)據(jù)異構(gòu)問題
我們系統(tǒng)的一致性解決方案
- 緩存的所有數(shù)據(jù)都有過期時間,數(shù)據(jù)過期下一次查詢觸發(fā)主動更新
- 讀寫數(shù)據(jù)的時候,加上分布式的讀寫鎖
4 SpringCache
4.1 配置
- 首先引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>
- 寫配置
- 自動配置了哪些
CacheAutoConfiguration會導入RedisCacheConfiguration
自動配好了緩存管理器RedisCacheManager - 配置使用redis作為緩存
spring: cache: type: redis
- 測試使用緩存
在要緩存結(jié)果的方法上面添加相應(yīng)的注解就可以實現(xiàn)@Cacheable
觸發(fā)將數(shù)據(jù)保存到緩存的操作@CacheEvict
觸發(fā)將數(shù)據(jù)從緩存刪除的操作@CachePut
不影響方法執(zhí)行更新緩存@Caching
組合以上多個操作@CacheConfig
在類級別共享緩存的相同配置@EnableCaching
開啟緩存功能,能夠掃描到這個注解就能生效
- 自動配置了哪些
4.2 @Cacheable
默認行為
- 如果緩存存在,方法不用調(diào)用
- key默認自動生成:
緩存名字::SimpleKey []
- 緩存的value的值,默認使用jdk序列化機制,將序列化后的數(shù)據(jù)存到Redis
自定義
- 指定生成的緩存使用的key:key屬性指定,接收一個SpEL
// key識別為表達式,字符串注意加單引號 @Cacheable(value = {"category"}, key = "'level1Categories'") // 使用方法名作為key @Cacheable(value = {"category"}, key = "#root.method.name")
- 指定緩存的數(shù)據(jù)的存活時間:在application.yaml中配置
spring: cache: type: redis redis: time-to-live: 3600000 #一小時
- 將數(shù)據(jù)保存為Json格式
@Configuration @EnableCaching // 視頻中在這啟用cache有關(guān)的配置類,在容器中注入CacheProperties // 實測不用,因為原來也會注入到容器中 // @EnableConfigurationProperties(CacheProperties.class) public class MyCacheConfig { @Bean public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); CacheProperties.Redis redisProperties = cacheProperties.getRedis(); // 將配置文件中的所有配置都生效(因為如果使用自己的配置,在org.springframework.boot.autoconfigure.cache.RedisCacheConfiguration中讀取配置文件設(shè)置的config就會被取代,所以在這里重新執(zhí)行讀取配置的操作) if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }
原理CacheAutoConfiguration
→ RedisCacheConfiguration
→ 自動配置了 RedisCacheManager
→ 初始化所有的緩存
→ 每個緩存決定是用什么配置
→ 如果 redisCacheConfiguration
有就用已有的,沒有就使用默認配置
→ 想改緩存的配置,只需要給容器中放一個 RedisCacheConfiguration
即可
→ 就會應(yīng)用到當前 RedisCacheManager
管理的所有緩存分區(qū)中
其他的配置
spring:
cache:
type: redis
redis:
time-to-live: 3600000
# 如果指定了前綴,就用我們指定的前綴加上緩存名字作為前綴(視頻中的版本為替換掉緩存名字)
key-prefix: cache_
use-key-prefix: true
# 是否緩存空值,防止緩存穿透
cache-null-values: true
4.3 @CacheEvict
可以用來實現(xiàn)失效模式。
在更新方法上添加這個注解,調(diào)用時就會刪除掉指定的緩存。
@Override
@Transactional
// 一定要記得加單引號
@CacheEvict(value = "category", key = "'getLevel1Categories'")
public void updateCascade(CategoryEntity category) ...
批量刪除
同一個注解不能重復(fù)添加,想要批量刪除可以使用 @Caching
@Caching(evict = {
@CacheEvict(value = "category", key = "'getLevel1Categories'"),
@CacheEvict(value = "category", key = "'getCatalogJson'")
})
或者
// 這樣會刪除category分區(qū)下的所有key
@Caching(value = "category", allEntries = true)
實踐
存儲同一類型的數(shù)據(jù),都可以指定成同一個分區(qū)。分區(qū)名默認就是緩存的前綴。
4.4 @CachePut
可以用來實現(xiàn)雙寫模式,方法執(zhí)行后會將結(jié)果放入緩存,方法需要有返回值。
4.5 不足
- 讀模式:
緩存問題 描述 通用解決 SpringCache解決 緩存穿透 查詢一個null數(shù)據(jù) 緩存空數(shù)據(jù) 配置文件中設(shè)置緩存空數(shù)據(jù) 緩存擊穿 大量并發(fā)進來同時查詢一個正好過期的數(shù)據(jù) 加鎖 在 @Cacheable 參數(shù)中添加 sync = true 緩存雪崩 大量的key同時過期 加隨機時間 配置文件中設(shè)置過期時間 - 寫模式:
- 讀寫加鎖
- 引入Canal,感知到MySQL的更新去更新緩存
- 讀多寫多,直接去數(shù)據(jù)庫查詢就行
總結(jié)
- 常規(guī)數(shù)據(jù)(讀多寫少,即時性、一致性要求不高的數(shù)據(jù)),完全可以使用SpringCache
寫模式(只要緩存的數(shù)據(jù)有過期時間就足夠了) - 特殊數(shù)據(jù):特殊設(shè)計
五、分布式鎖
本地鎖如synchronized和ReentrantLock不適用于分布式微服務(wù)的情況,所以要引入分布式鎖。
1 Redis實現(xiàn)
private Map<String, List<Catalog2Vo>> getCatalogJsonWithRedisLock() {
// 獲取UUID用于刪除鎖時的驗證
String uuid = UUID.randomUUID().toString();
while (true) {
// 加鎖(要設(shè)定一個鎖的有效時間,防止設(shè)置鎖所在的機器斷電不能釋放鎖)
Boolean locked = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
// 判斷是否加鎖成功
if (Boolean.TRUE.equals(locked)) {
try {
// 加鎖成功
} finally {
// lua腳本解鎖
String script = "if redis.call(\"get\", KEYS[1]) == ARGV[1]" +
"\nthen" +
"\n return redis.call(\"del\", KEYS[1])" +
"\nelse" +
"\n return 0" +
"\nend";
redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class), Collections.singletonList("lock"), uuid);
}
}
}
}
使用lua腳本保證 【獲取UUID和刪除鎖】 操作原子性的意義
為了避免出現(xiàn)這樣的情況
2 Redisson實現(xiàn)
2.1 配置與使用
- 首先引入依賴:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version> </dependency>
- 然后配置Bean:
@Configuration public class RedissonConfig { @Bean(destroyMethod = "shutdown") public RedissonClient redisson() { // 1. 創(chuàng)建配置 Config config = new Config(); config.useSingleServer().setAddress("redis://172.16.212.10:6379"); // 2. 根據(jù)Config創(chuàng)建出RedissonClient實例 return Redisson.create(config); } }
2.2 加解鎖操作
@ResponseBody
@GetMapping("/hello")
public String hello() {
// 1. 獲取一把鎖,只要鎖的名字一樣,就是同一把鎖
RLock lock = redisson.getLock("my-lock");
// 2. 加鎖
lock.lock();
或者lock.lock(10, TimeUnit.SECONDS); // 10s自動解鎖,但是一定要大于業(yè)務(wù)的執(zhí)行時間
// 1) 鎖的自動續(xù)期,如果業(yè)務(wù)超長,運行期間自動給鎖續(xù)到30s。不用擔心業(yè)務(wù)時間長,鎖自動過期被刪掉
// 2) 加鎖的業(yè)務(wù)只要運行完成,就不會給當前鎖續(xù)期,即使不手動解鎖,鎖默認在30s以后自動刪除
try {
Thread.sleep(30000);
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
return "hello";
}
lock.lock()與自動續(xù)期
- 如果我們傳遞了鎖的超時時間,就發(fā)送給redis執(zhí)行腳本,進行占鎖,默認超時就是我們指定的時間
- 如果我們未指定鎖的超時時間,就使用
30 * 1000
(LockWatchdogTimeout
看門狗的默認時間)。
只要占鎖成功,就會啟動一個定時任務(wù)(重新給鎖設(shè)置過期時間,新的過期時間就是看門狗的默認時間),定時任務(wù)會每隔LockWatchdogTimeout / 3
(10s)重新執(zhí)行一遍續(xù)期 - 在鎖已經(jīng)設(shè)置的情況下,另一個嘗試獲取鎖的線程會在
tryAquire
方法得到當前被占用鎖的剩余有效時間ttl
,然后通過future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS)
阻塞ttl
這么長的時間,之后會重新嘗試獲取鎖;如果ttl
為null說明獲取到了鎖就不需要阻塞了 - 在
RedissonLock
的unlockAsync
方法中,調(diào)用了cancelExpirationRenewal(threadId)
來結(jié)束續(xù)期
最佳實戰(zhàn)
建議使用 lock.lock(10, TimeUnit.SECONDS)
省掉了整個續(xù)期操作。設(shè)置一個較大的過期時間比如30s,即使是業(yè)務(wù)超時了也說明這個業(yè)務(wù)出現(xiàn)問題了。
問題
使用Redisson分布式鎖時出現(xiàn)這個報錯:
... Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.redisson.api.RedissonClient] ... Caused by: org.redisson.client.RedisConnectionException: Unable to init enough connections amount! Only 10 of 24 were initialized. ...
解決
最低連接數(shù)要求過高,在Redisson配置類中添加
setConnectionMinimumIdleSize(1)
(數(shù)值根據(jù)情況設(shè)置)@Bean(destroyMethod = "shutdown") public RedissonClient redisson() { // 1. 創(chuàng)建配置 Config config = new Config(); config.useSingleServer().setAddress("redis://172.16.212.10:6379").setConnectionMinimumIdleSize(1); // 2. 根據(jù)Config創(chuàng)建出RedissonClient實例 return Redisson.create(config); }
鎖的粒度越細越快
具體緩存的是某個數(shù)據(jù),如11號商品: product-11-lock
2.3 讀寫鎖
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
// 獲取寫鎖
RLock wLock = lock.writeLock();
// 獲取讀鎖 RLock rLock = lock.readLock();
lock.lock();
lock.unlock();
- 保證一定能讀到最新的數(shù)據(jù),修改期間,寫鎖是一個排他鎖(互斥鎖)。讀鎖是一個共享鎖。
?????????????后 先 |
讀 | 寫 |
---|---|---|
讀 | 相當于無鎖,并發(fā)讀,只會在redis中記錄好,所有當前的讀鎖,他們都會同時加鎖成功 | 有讀鎖,寫也需要等待 |
寫 | 等待寫鎖釋放 | 阻塞 |
2.4 信號量
注意要在使用之前在Redis中添加這個key和數(shù)值。
set park 3
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.acquire(); // 阻塞式獲取一個信號,獲取一個值,占一個車位
park.tryAcquire(); // 非阻塞式
return "ok";
}
@GetMapping("/go")
@ResponseBody
public String go() {
RSemaphore park = redisson.getSemaphore("park");
park.release(); // 釋放一個車位
return "ok";
}
六、CompletableFuture
1 創(chuàng)建一個異步操作
-
runAsync
不能獲得返回值,但可以調(diào)用返回的CompletableFuture對象的get()
阻塞等待執(zhí)行完成。public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
-
supplyAsync
可以獲得返回值。public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2 計算完成回調(diào)
whenComplete
- BiConsumer的第一個參數(shù)為結(jié)果,第二個參數(shù)為異常。
- 能夠得到異常信息,但是無法修改飯回信息。
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)
-
exceptionally
可以感知異常,同時返回默認值。
示例public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<Integer> done = CompletableFuture.supplyAsync(() -> { System.out.println("bef"); // 會打印 int a = 1 / 0; System.out.println("aft"); // 不會打印 return a; }, executor).whenComplete((result, exception) -> { System.out.println("done"); }).exceptionally((exception) -> { return 1; }); System.out.println(done.get()); // 打印1
3 方法執(zhí)行完成后的處理handle
二合一。
CompletableFuture<Integer> done = CompletableFuture.supplyAsync(() -> {
return 1 / 0;
}, executor).handle((result, exception) -> {
System.out.println("done");
return 1;
});
System.out.println(done.get()); // 打印1
4 線程串行化方法
方法 | 感知上一步結(jié)果 | 有返回值 |
---|---|---|
thenApply | ? | ? |
thenAccept | ? | ? |
thenRun | ? | ? |
帶有Async默認是異步執(zhí)行的。同之前。 | ||
以上都要前置任務(wù)成功完成。 |
5 兩個任務(wù)都必須完成再執(zhí)行future1.combine(future2, (result1, result2) -> …)
方法 | 感知上一步結(jié)果 | 有返回值 |
---|---|---|
thenCombine | ? | ? |
thenAcceptBoth | ? | ? |
runAfterBoth | ? | ? |
6 一個完成就執(zhí)行
方法 | 感知上一步結(jié)果 | 有返回值 |
---|---|---|
applyToEither | ? | ? |
acceptEither | ? | ? |
runAfterEither | ? | ? |
7 多任務(wù)組合allOf
所有的事都做完再執(zhí)行anyOf
有一個成功就執(zhí)行
七、SpringSession
1 分布式下Session共享問題解決
- session復(fù)制
- 優(yōu)點
- Tomcat原生支持,只需要修改配置文件
- 缺點
- 同步需要數(shù)據(jù)傳輸,占用大量網(wǎng)絡(luò)帶寬
- 每一臺服務(wù)器保存所有session,受內(nèi)存限制
- 大型分布式下所有服務(wù)器保存全量數(shù)據(jù)不可取
- 客戶端存儲
- 優(yōu)點
- 節(jié)省服務(wù)端資源
- 缺點
- 都是缺點(不要使用這種方式!)
- 每次http請求都要攜帶完整信息,浪費帶寬
- cookie長度限制4K,不能保存大量信息
- 泄漏、篡改、竊取等安全隱患
- hash一致性
- 優(yōu)點
- 只需要修改nginx配置
- 負載均衡,只要hash值分布均勻
- 支持水平擴展
- 缺點
- 服務(wù)器重啟導致部分session丟失
- 水平擴展rehash后,session重新分布,部分用戶路由不到正確session
但是本來session就是有有效期的,所以兩種反向代理的方式可以使用
- 統(tǒng)一存儲
- 優(yōu)點
- 支持水平擴展,數(shù)據(jù)庫/緩存水平切分即可
- 服務(wù)器重啟不會導致session丟失
- 缺點
- 增加了一次網(wǎng)絡(luò)調(diào)用
2 使用SpringSession實現(xiàn)統(tǒng)一存儲
可以配置session在瀏覽器上的cookie存儲的Domain屬性,從而讓多個子域名共享session id。
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
spring:
session:
store-type: redis
timeout: PT30M
@Configuration
public class MySessionConfig {
@Bean
public CookieSerializer cookieSerializer() {
DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer();
cookieSerializer.setDomainName(".gulimall.com");
cookieSerializer.setCookieName("GULISESSION");
return cookieSerializer;
}
@Bean
public RedisSerializer<Object> springSessionDefaultRedisSerializer() {
return new GenericJackson2JsonRedisSerializer();
}
}
八、RabbitMQ
1 工作流程
2 安裝
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
docker update rabbitmq --restart=always
4369, 25672 (Erlang發(fā)現(xiàn)&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后臺端口)
61613, 61614(STOMP協(xié)議端口)
1883, 8883 (MQTT協(xié)議端口)
https://www.rabbitmq.com/networking.html
3 Exchange
3.1 Exchange類型
- exchange分發(fā)消息時根據(jù)類型的不同分發(fā)策略有區(qū)別,目前共四種類型: direct, fanout, topic, headers。
- headers 匹配AMQP消息的header而不是路由鍵,headers交換器和direct交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接著另外三種類型。
3.1.1 Direct Exchange
- 消息中的路由鍵(routing key)如果和Binding中的binding key一致,交換器就將消息發(fā)到對應(yīng)的隊列中。
- 如果一個隊列綁定到交換機要求路由鍵為 “dog”,則只轉(zhuǎn)發(fā)routing key標記為 “dog”的消息,不會轉(zhuǎn)發(fā)“dog.puppy”,也不會轉(zhuǎn)發(fā)“dog.guard”等等。
- 它是完全匹配、單播的模式。點對點
3.1.2 Fanout Exchange
- 每個發(fā)到fanout交換器的消息都會分到所有綁定的隊列上去。
- 像子網(wǎng)廣播,每臺子網(wǎng)的主機都獲得了一份復(fù)制的消息。
- fanout類型轉(zhuǎn)發(fā)消息是最快的。廣播
3.1.3 Topic Exchange
- 將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。
- 它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:
#
(匹配0個或多個單詞)和*
(匹配一個單詞)。主題
4 整合Spring Boot
4.1 配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
引入生效大致原理
- 引入amqp場景,
RabbitAutoConfiguration
就會自動生效 - 給容器自動配置:
RabbitTemplate
AmqpAdmin
CachingConnectionFactory
RabbitMessagingTemplate
Spring配置
- 配置文件
spring: rabbitmq: host: 172.16.212.10 port: 5672 virtual-host: /
-
@EnableRabbit
開啟rabbit
4.2 AmqpAdmin
4.2.1 創(chuàng)建
@Autowired
private AmqpAdmin amqpAdmin
public void createExchange() {
DirectExchange directExchange = new DirectExchange(
name: "hello-java-exchange",
durable: true,
autoDelete: false);
amqpAdmin.declareExchange(directExchange);
}
public void createQueue() {
Queue queue = new Queue(
name: "hello-java-queue",
durable: true,
autoDelete: false
);
amqpAdmin.declareQueue(queue);
}
public void createBinding() {
// 將exchange指定的交換機和destination目的地進行綁定,使用routingKey作為指定的路由鍵
Binding binding = new Binding(
destination: "hello-java-queue",
Binding.DestinationType.QUEUE,
exchange: "hello-java-exchange",
routingKey: "hello.java"
);
amqpAdmin.declareBinding(binding);
}
4.2.2 收發(fā)消息
- 發(fā)送消息
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend(
exchange: "hello-java-exchange",
routingKey: "hello.java",
new Object() // 如果發(fā)送的消息是一個對象,就會使用序列化機制,對象必須實現(xiàn)Serializable
);
}
不使用序列化而使用JSON格式:
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- 監(jiān)聽消息
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(
Message message,
T content // 當時發(fā)送的消息類型T在接收時Spring會自動轉(zhuǎn)化
Channel channel // 當前傳輸數(shù)據(jù)的通道
) {
byte[] body = message.getBody();
}
- 同一個消息,只能有一個客戶端收到
- 當一個消息被處理完,才可以接收下一個消息
@RabbitListener 和 @RabbitHandler@RabbitListener
可以標注在方法或類上@RabbitHandler
可以標注在方法上
二者配合使用實現(xiàn)對重載區(qū)分不同的消息
@RabbitListener(queues = ...)
public class RabbitTest {
@RabbitHandler
public void handle(Car car) {
...
}
@RabbitHandler
public void handle(Plane plane) {
...
}
}
5 可靠抵達
保證消息不丟失,可靠抵達,可以使用事務(wù)消息,性能下降250倍,為此引入確認機制
- publisher confirmCallback 確認模式
- publisher returnCallback 未投遞到 queue 退回模式
- consumer ack機制
5.1 ConfirmCallback 服務(wù)器收到消息
在application.properties中設(shè)置spring.rabbitmq.publisher-confirms=true
- 在建 connectionFactory的時候沒置
PublisherConfirms(true)
選項, 開啟 confirmCallback。 - CorrelationData: 用來表示當前消息唯一性。
- 消息只要被 broker 接收到就會執(zhí)行 confirmCallback,如果是 cluster模 式,需要所有 broker 接收到オ會凋用 confirmCallback。
- 被broker接收到只能表示 message 已經(jīng)到達服務(wù)器,并不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的
returnCallback
。
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // @Configuration對象創(chuàng)建完成以后執(zhí)行的方法
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(
CorrelationData correlationData, // 當前消息的唯一關(guān)聯(lián)數(shù)據(jù)(這個是消息的唯一id)
boolean ack, // 消息是否成功收到
String cause // 失敗的原因
) {
// callback
}
});
}
5.2 ReturnCallback 消息未能抵達隊列
在application.properties中配置
- 開啟發(fā)送端消息抵達隊列的確認
spring.rabbitmq.publisher-returns=true
- 只要抵達隊列,以異步發(fā)送優(yōu)先回調(diào)我們這個return confirm
spring.rabbitmq.template.mandatory=true
@PostConstruct // @Configuration對象創(chuàng)建完成以后執(zhí)行的方法
public void initRabbitTemplate() {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void confirm(
Message message, // 投遞失敗的消息詳細信息
int replyCode, // 回復(fù)的狀態(tài)碼
String replyText, // 回復(fù)的文本內(nèi)容
String exchange, // 當時這個消息發(fā)送給哪個交換機
String routingKey // 當時這個消息用哪個路由鍵
) {
// callback
}
});
}
5.3 Ack 客戶端消息確認
默認是自動確認的,只要消息收到,客戶端會自動確認,服務(wù)端會移除這個消息。
問題
我們收到很多消息,自動回復(fù)給服務(wù)器ack,只有一個消息處理成功,宕機了,發(fā)生消息丟失。
開啟手動確認模式spring.rabbitmq.listener.simple.acknowledge-mode=manual
只要沒明確告訴mq貨物被簽收(沒有ack),消息一直是unacked狀態(tài)。即使Consumer宕機,消息也不會丟失,會重新變?yōu)镽eady,下一次有新的Consumer連接進來就發(fā)給他。文章來源:http://www.zghlxwxcb.cn/news/detail-432019.html
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(
Message message,
T content, // 當時發(fā)送的消息類型T在接收時Spring會自動轉(zhuǎn)化
Channel channel // 當前傳輸數(shù)據(jù)的通道
) {
// deliveryTag是channel內(nèi)按順序自增的
channel.basicAck(
message.getMessageProperties().getDeliveryTag(),
multiple: false
requeue: true
);
}
Ack消息確認機制總結(jié)文章來源地址http://www.zghlxwxcb.cn/news/detail-432019.html
- 消費者獲取到消息,成功處理,可以回復(fù)Ack給Broker
- basic.ack用于肯定確認;broker將移除此消息
- basic.nack用于否定確認;可以指定broker是否丟棄此消息,可以批量
- basic.reject用于否定確認;同上,但不能批量
- 默認自動ack,消息被消費者收到,就會從broker的queue中移除
- queue無消費者,消息依然會被存儲,直到消費者消費
- 消費者收到消息,默認會自動ack。但是如果無法確定此消息是否被處理完成,或者成功處理。我們可以開啟手動ack模式
- 消息處理成功,
ack()
,接受下一個消息,此消息broker就會移除 - 消息處理失敗,
nack()
/reject()
,重新發(fā)送給其他人進行處理,或者容錯處理后ack - 消息一直沒有調(diào)用
ack()
/nack()
方法,broker認為此消息正在被處理,不會投遞給別人。此時客戶端斷開,消息不會被broker移除,會投遞給別人
- 消息處理成功,
到了這里,關(guān)于【筆記/后端】谷粒商城高級篇的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!