1.背景描述
es在本公司承載三個(gè)部分的業(yè)務(wù),站內(nèi)查詢,訂單數(shù)據(jù)統(tǒng)計(jì),elk日志分析。
2020年團(tuán)隊(duì)決定對(duì)elasticsearch升級(jí)。es(elasticsearch縮寫,下同)當(dāng)前版本為1.x,升級(jí)到5.x版本。
5.x支持如下新特性:
支持lucene 6.x,磁盤空間少一半,索引時(shí)間少一半,查詢性能提升25%
Java rest client (high level api)
Painless 腳本相比groovy腳本,更安全,更簡潔,更好的性能
?
對(duì)于站內(nèi)查詢和訂單數(shù)據(jù)統(tǒng)計(jì),當(dāng)前業(yè)務(wù)架構(gòu)是
mysql -> canal -> kafka -> (es Index server) -> es
(可以考慮使用kafka connector 代替canal)
1.1 如何配置 mysql -> canal -> kafka
1.1.1. 配置mysql
開啟binlog
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
授權(quán)給canal用戶,讓其有復(fù)制權(quán)限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1.1.2 配置canal
下載 https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
修改?conf/canal.properties
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka # 由kafka消費(fèi)
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
修改?conf/example/instance.properties
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=mysql_test # 同步的數(shù)據(jù)庫
# mq config
canal.mq.topic=canal_topic # 在kafka的topic
啟動(dòng)canal
./bin/start.sh
1.1.2 啟動(dòng)zookeeper 和 kafka
brew services start zookeeper
brew services start kafka
1.1.3 測試
在db中添加數(shù)據(jù),可以使用kafka 腳本看到同步數(shù)據(jù)
INSERT INTO `mysql_test`.`user` (`id`, `name`) VALUES ('6', 'Bob');
? bin ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canal_topic
{"data":[{"id":"6","name":"Bob","age":null}],"database":"mysql_test","es":1684221427000,"id":5,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(32)","age":"int"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"user","ts":1684221427082,"type":"INSERT"}
2.難點(diǎn)
1.在升級(jí)的時(shí)候如何不影響當(dāng)前業(yè)務(wù)。
2.如果升級(jí)失敗,能夠快速進(jìn)行回滾。
3.具體步驟
主體方案采用雙寫機(jī)制
3.1.部署es新集群
下載5.x版本的es,在新的機(jī)器上部署新的集群。
配置機(jī)器:
disable swapping:swapoff -a
內(nèi)存鎖定:mlockall: true
修改文件句柄數(shù):ulimit -a
分配一半的內(nèi)存給es,留下一半的內(nèi)存給文件系統(tǒng), ES_JAVA_OPTS="-Xms16g -Xmx16g"
3.2.pull代碼,升級(jí)代碼到es新版本
由于從1.x到5.x版本跨度比較大,許多java api都發(fā)生了變化,需要修復(fù)。??
常見字段類型修改:? text/keyword 代替 string
不再支持type類型
java api alias語義變化
3.3.重建索引
??我們使用索引重建程序來新建索引。重建索引具體步驟如下,我們稱線上索引為online index, 新創(chuàng)建的索引為new index。
??3.3.1.init
????刷新索引名映射關(guān)系,檢查當(dāng)前alias只有一個(gè)物理索引。
????根據(jù)預(yù)定義的mapping,創(chuàng)建索引new index。
????設(shè)置在線索引記錄數(shù)據(jù)變更日志,即記錄線上索引消費(fèi)kafka數(shù)據(jù),并存儲(chǔ)為change log文件.
??3.3.2.全量索引數(shù)據(jù)庫上的數(shù)據(jù)到new index
????從mysql查出數(shù)據(jù)同步到es中,如果有多個(gè)分表,就按照表順序同步??梢蚤_啟多線程批量插入。
??3.3.3.對(duì)new index索引優(yōu)化
????refresh, flush 索引。調(diào)用force-merge api,進(jìn)行段合并。
??3.3.4.重放change log到new index中
????根據(jù)change log 轉(zhuǎn)換為es query,寫入到new index。????
??3.3.5.暫停線上索引的寫入
????因?yàn)閛nline index和new index 使用的是相同的kafka consumer group,所以必須停掉online index的消費(fèi)功能。
??3.3.6.關(guān)閉change log
????停止記錄在線索引記錄數(shù)據(jù)變更日志。
??3.3.7.第二階段重放change log
????根據(jù)change log 轉(zhuǎn)換為es query,寫入到new index。?
??3.3.8.刪除change log?
????刪除線索引記錄數(shù)據(jù)變更日志。
??3.3.9.設(shè)置副本數(shù)?
????new index創(chuàng)建索引的時(shí)候默認(rèn)副本數(shù)為0,現(xiàn)在動(dòng)態(tài)調(diào)整副本數(shù)為業(yè)務(wù)需要的值。比如對(duì)現(xiàn)實(shí)搜索業(yè)務(wù)設(shè)置兩個(gè)副本,對(duì)訂單統(tǒng)計(jì)類索引不需要副本。
PUT /new_index/_settings
{
"number_of_replicas": 2
}
????此階段可能會(huì)比較耗時(shí),需要等待幾分鐘才能進(jìn)行下一步操作。更好的做法是調(diào)用health api 查看分片狀態(tài)。
GET _cluster/health
{
"cluster_name" : "testcluster",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 1,
"active_shards" : 1,
"relocating_shards" : 0, // 重新定位的分片
"initializing_shards" : 0, // 初始化中的分片
"unassigned_shards" : 1, // 未分配的分片
"delayed_unassigned_shards": 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50.0
}
??3.3.10.別名切換?
POST /_aliases
{
"actions": [
{ "remove": { "index": "online_index", "alias": "my_index" }},
{ "add": { "index": "new_index", "alias": "my_index" }}
]
}
??3.3.11.運(yùn)行在線索引 (從kafka里面讀取數(shù)據(jù))
????new_index 開始從kafka里面消費(fèi)最新數(shù)據(jù)。由于之前的操作可能會(huì)有延時(shí),需要等待幾分鐘才能同步到最新數(shù)據(jù)。
??3.3.12.刪除舊的索引
????刪除old_index
詳細(xì)代碼步驟如下
// 1.init
logger.info("初始化");
ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
logger.info("刷新索引名映射關(guān)系");
if (!indexContext.refreshIndexName()) {
throw new IndexException("刷新索引映射關(guān)系失敗");
}
rebuildIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("初始化重建索引環(huán)境,當(dāng)前重建索引名:" + rebuildIndexName);
logger.info("創(chuàng)建索引,索引名:" + rebuildIndexName);
boolean isCreate = false;
try {
isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
} catch (Throwable t) {
logger.info("創(chuàng)建索引失敗,本次失敗可以不處理,將會(huì)自動(dòng)重試 ...");
}
logger.info("設(shè)置在線索引記錄數(shù)據(jù)變更日志");
indexContext.startChangeLog();
// 2. 重建索引
logger.info("全量索引數(shù)據(jù)庫上的數(shù)據(jù) ...");
long startRebulidTime = System.currentTimeMillis();
rebuild();
logger.info(" ------ 完成全量索引數(shù)據(jù)庫上的數(shù)據(jù),對(duì)應(yīng)索引" + rebuildIndexName + ",耗時(shí)" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
+ " 秒 ------ ");
// 3. 索引優(yōu)化 -- 是否調(diào)到變更重放完畢后做優(yōu)化
logger.info("優(yōu)化索引 ...");
long startOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
logger.info(" ------ 完成" + rebuildIndexName + "索引優(yōu)化,耗時(shí) " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
+ " 秒 ------ ");
// TODO 字符集設(shè)置
BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));
// 4. 重放變更日志
logger.info("重放本地?cái)?shù)據(jù)變更日志[第一階段] ...");
long startReplay1Time = System.currentTimeMillis();
int replayChangeLogCount = replayChangeLogFirst(logReader);
logger.info(" ------ 完成[第一階段]的變更日志重放,行數(shù)" + replayChangeLogCount + " 耗時(shí) "
+ ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒 ------ ");
// 5. 暫停在線索引
logger.info("暫停在線索引");
indexContext.pauseOnlineIndex();
isPauseOnline.set(true);
// 6. 設(shè)置 在線索引只做索引更新 以及 關(guān)閉 change log
logger.info("停止變更日志");
indexContext.stopChangeLog();
// 7. 繼續(xù)重放 change log
logger.info("重放本地?cái)?shù)據(jù)變更日志[第二階段] ...");
long startReplay2Time = System.currentTimeMillis();
replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
logger.error("變更日志,處于錯(cuò)誤的狀態(tài),統(tǒng)計(jì)的日志行數(shù):" + indexContext.getWriteChangeLogCount() + ", 但實(shí)際只有:" + replayChangeLogCount);
}
logger.info(" ------ 完成[第二階段]的變更日志重放,行數(shù)" + replayChangeLogCount + " 耗時(shí) "
+ ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒 ------ ");
// 8. 刪除變更日志, OnlineIndex.startChangeLog 有做環(huán)境清理,這里不執(zhí)行
logger.info("簡單優(yōu)化索引 ...");
long startSimpleOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);
logger.info(" ------ 完成" + rebuildIndexName + "索引簡單優(yōu)化,耗時(shí) " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
+ " 秒 ------ ");
// 9. 設(shè)置副本數(shù) (懷疑比較耗時(shí)~~~待確認(rèn))
logger.info("設(shè)置副本數(shù) ...");
int replicas = 3;
if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
replicas = 1;
} else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
replicas = 2;
} else {
String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
if (NumberUtils.isNumber(replicasStr)) {
replicas = NumberUtils.toInt(replicasStr);
}
}
ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);
// 執(zhí)行索引切換流程
// 預(yù)發(fā)、線上環(huán)境阻塞等待2分鐘同步數(shù)據(jù)后,再執(zhí)行索引切換和刪除舊索引邏輯
try {
if(IDCUtil.isBuildOrProduction()){
Thread.sleep(120 * 1000);
}
} catch (InterruptedException e) {
}
// 10. 別名切換
logger.info("索引切換:將" + rebuildIndexName + "設(shè)置為線上索引");
if (!indexContext.switchIndex(rebuildIndexName)) {
throw new IndexException("索引切換失?。簩? + rebuildIndexName + "設(shè)置為線上索引失敗");
}
// 11. 運(yùn)行在線索引
logger.info("運(yùn)行在線索引");
indexContext.keepRuningOnlineIndex();
isPauseOnline.set(false);
// 12. 刪除原有在線索引
String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("刪除原有在線索引,索引名:" + oldOnlineIndexName);
if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
throw new IndexException("刪除索引失敗,索引名:" + oldOnlineIndexName);
}
思考
如果只是簡單地新建索引,完全可以這樣做(使用不同的消費(fèi)組)?
??1.記錄時(shí)間戳?
??2.全量索引數(shù)據(jù)的數(shù)據(jù)
??3.根據(jù)前面的時(shí)間戳找到kafka中的下標(biāo),下標(biāo)得時(shí)間戳必須 < 記錄的時(shí)間戳
sh kafka_2.11-2.3.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list broker1:9092,broker2:9092 -topic topicName -time 1585186237000
??4.根據(jù)上一步的下標(biāo)開始索引數(shù)據(jù)
3.4.使用新集群進(jìn)行業(yè)務(wù)測試
部署新的客戶端服務(wù)調(diào)用新的es集群,檢查業(yè)務(wù)是否正常。對(duì)站內(nèi)查詢檢查搜索結(jié)果是否一致,對(duì)統(tǒng)計(jì)類查詢查看統(tǒng)計(jì)結(jié)果是否一致。
針對(duì)不同業(yè)務(wù)場景下,做不同的測試
1.對(duì)比新老集群,索引數(shù)據(jù)量是否一致
2.搜索業(yè)務(wù),查看熱門關(guān)鍵詞搜索結(jié)果
3.統(tǒng)計(jì)業(yè)務(wù),對(duì)比索引數(shù)據(jù)量,常用聚合統(tǒng)計(jì)查詢結(jié)果是否一致
4.對(duì)于elk業(yè)務(wù),可以單獨(dú)升級(jí)
3.5.發(fā)布線上客戶端搜索代碼,修改es地址為新集群地址
??上線,觀察業(yè)務(wù)是否穩(wěn)定。
3.6.下線舊的es集群
??釋放舊的es集群的資源。文章來源:http://www.zghlxwxcb.cn/news/detail-414294.html
4.總結(jié)
??es升級(jí)這份工作是兩年之前做的,現(xiàn)在來進(jìn)行總結(jié),部分細(xì)節(jié)可能會(huì)有疏漏。但是總結(jié)起來,依然后很多收獲,從架構(gòu),代碼細(xì)節(jié)上都有改進(jìn)的空間。es重建代碼可以做得更通用,然后開源出來。文章來源地址http://www.zghlxwxcb.cn/news/detail-414294.html
到了這里,關(guān)于elasticsearch升級(jí)和索引重建。的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!