模糊查詢優(yōu)化 Flink+ES
公司需要對商品名稱進行模糊模糊查詢,考慮到商品表存量數據千萬級,直接數據庫模糊查詢效率肯定極其低下,所以選擇使用ElasticSearch
對商品信息進行模糊查詢。
因為需要代替原有的查詢接口,保持原有查詢接口的入參出參,所以需要全量+增量同步MySQL數據到ES進行索引創(chuàng)建。方案選擇使用Flink
進行全量+增量同步。
原有方案是通過訂閱Kafka
的topic(現成的,結合debezium
監(jiān)聽binlog日志)+Flink CDC 實現,但是這只能針對增量數據進行監(jiān)聽。雖然修改debezium的參數可以達到全量后增量的效果,但因為有其他系統(tǒng)監(jiān)聽這個topic,不得不放棄該方案,只使用該方案做增量數據同步。
Flink CDC
因為一開始對Flink CDC不太熟悉,所以一直認為CDC只能用于增量查詢,并且我修改MySQL數據源startOption通過調試,也只會獲取最近的增量數據,或者說數據庫目前存的最早的binlog(binlog一般是會定期清理的)。
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname(FlinkParameterToolUtils.getParameterValue(env, NACOS_MYSQL_CONFIG_PREFIX + "hostname"))
.serverTimeZone("GMT+8")
.port(Integer.parseInt(FlinkParameterToolUtils.getParameterValue(env, NACOS_MYSQL_CONFIG_PREFIX + "port")))
.username(FlinkParameterToolUtils.getParameterValue(env, NACOS_MYSQL_CONFIG_PREFIX + "username"))
.password(FlinkParameterToolUtils.getParameterValue(env, NACOS_MYSQL_CONFIG_PREFIX + "password"))
.tableList(FlinkParameterToolUtils.getParameterValue(env, NACOS_MYSQL_CONFIG_PREFIX + "tableList"))
.databaseList(FlinkParameterToolUtils.getParameterValue(env, NACOS_MYSQL_CONFIG_PREFIX + "databaseList"))
.deserializer(jdd)
.startupOptions(StartupOptions.initial())
.build();
不一定會有這種情況,如果有可以加上debezium的參數試試:
就算我修改為
StartupOptions.initial()
依舊沒辦法獲取所有的歷史數據。后面請教前輩才知道還需要添加debezium的參數??梢詫崿F先全量再增量,基于debezium。
全量數據會對數據當前狀態(tài)進行快照,并劃分為多個chunk進行數據源同步,增量則是監(jiān)聽binlog日志。
//設置debezium屬性
Properties properties = new Properties();
//啟動增量快照讀取
/**
* 在執(zhí)行增量快照讀取時,MySQL CDC source 需要一個用于分片的的算法。 MySQL CDC Source 使用主鍵列將表劃分為多個分片(chunk)。 默認情況下,MySQL CDC source 會識別表的主鍵列,并使用主鍵中的第一列作為用作分片列。 如果表中沒有主鍵, 增量快照讀取將失敗,你可以禁用 scan.incremental.snapshot.enabled 來回退到舊的快照讀取機制。
* */
properties.setProperty("scan.incremental.snapshot.enabled", "true");
//先全量再增量
properties.setProperty("scan.startup.mode", "initial");
//將數字類型轉為string,不然讀出來數字值有問題
properties.setProperty("decimal.handling.mode", "string");
MySqlSource<String> source = MySqlSource.<String>builder()
//...
.debeziumProperties(properties)
.build();
但是考慮到如果flink重啟,或者flink對該任務資源分配的問題導致該任務報錯重啟那么又會重新執(zhí)行該任務,又重新全量跑數據的風險。該任務在跑完全量后還是得先cancel掉,再使用監(jiān)聽kafka的方式進行數據增量同步。(這里解釋有問題,后續(xù)還是使用的CDC的方案,因為flink有一個savepoint的機制不僅可以防止重啟后重復全量同步還可以動態(tài)新增待監(jiān)聽的數據表)
動態(tài)加表
官網介紹:MySQL CDC 連接器 — CDC Connectors for Apache Flink? documentation (ververica.github.io)
可以不使用命令的方式完成
-
停止job時不能直接在dashboard使用cancel,應該調用rest api的方式停止Job,此時會將當前job打一個快照(savepoint),用于后續(xù)重啟應用時恢復。
curl -X POST -u username:password http://host:port/xxx/jobs/"JOBID"/stop
-
執(zhí)行完成后,flink會自動savepoint并結束任務,在dashboard的CheckPoint欄->OverView->Latest Savepoint可以查看到savepoint保存的地址
-
在創(chuàng)建source時需要加上配置,才能掃描新表,不然沒用。
MySqlSource<String> source = MySqlSource.<String>builder() ... .scanNewlyAddedTableEnabled(true);
-
修改完nacos配置后,重新submit job,注意重新提交時需要附上savepoint的地址。
Flink JDBC
因為一開始,并不知道需要調整debezium的參數達到全量同步,所以考慮自定義source,分頁查詢數據表的方式達到效果。只看下優(yōu)化后的吧。
/**
* 需要繼承RichSourceFunction<String>
* 代碼采用了build創(chuàng)建的方式,方便寫入一些nacos的配置項,提高可讀性
*
**/
public class MySQLSourceFunction extends RichSourceFunction<String> {
private DruidDataSource dataSource;
private String username;
private String password;
private String drivername;
private String dbURL;
private String queryTemplate;
// 動態(tài)開始Id(始終表示當前查詢最小ID),每次SQL循環(huán)會更新
private long startId = 0L;
private String[] params;
private String[] paramsType;
private volatile Queue<String> buffer;
private ExecutorService executorService;
private volatile boolean isRunning = true;
public static MySQLSourceFunction.MysqlSourceFunctionBuilder buildMySQLSourceFunction() {
return new MySQLSourceFunction.MysqlSourceFunctionBuilder();
}
/**
* open方法進行一些參數的初始化
* 包括數據庫鏈接、數據緩存等
**/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();
properties.setProperty("url", dbURL);
properties.setProperty("username", username);
properties.setProperty("password", password);
properties.setProperty("initialSize", "5");
properties.setProperty("maxActive", "10");
// 這里設置了數據庫連接池
this.dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties);
this.buffer = new ConcurrentLinkedQueue<>();
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
public void run(SourceContext<String> sourceContext) {
executorService.execute(() -> {
while (isRunning) {
if (buffer.isEmpty()) {
try {
generateSourceData();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
});
while (isRunning) {
while (!buffer.isEmpty()) {
sourceContext.collect(buffer.poll());
}
}
}
private void generateSourceData() throws SQLException {
// 執(zhí)行SQL語句返回結果集
try (DruidPooledConnection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(queryTemplate)) {
generateParams(ps);
// 啟用流式查詢
ps.setFetchSize(Integer.MIN_VALUE);
ResultSet resultSet = ps.executeQuery();
log.info("當前執(zhí)行到最大ID為{}", startId);
// 獲取ResultSet對象的列的數量、類型和屬性。
ResultSetMetaData md = resultSet.getMetaData();
int columnCount = md.getColumnCount();
int rowCount = 0;
JSONObject jsonObject = new JSONObject();
while (resultSet.next()) {
jsonObject.clear();
rowCount++;
for (int i = 1; i <= columnCount; i++) {
jsonObject.put(md.getColumnLabel(i), resultSet.getObject(i));
// 動態(tài)計算startId
if (md.getColumnLabel(i).equals("id")) {
startId = Math.max(resultSet.getLong(i), startId);
}
}
buffer.offer(JSONObject.toJSONString(jsonObject.toJavaObject(GoodsMp.class)));
}
if (rowCount == 0 && buffer.isEmpty()) {
this.isRunning = false;
}
}
}
/**
* 插入SQL參數
*
* @param ps PreparedStatement
*/
private void generateParams(PreparedStatement ps) throws SQLException {
// 插入參數
for (int i = 0; i < params.length; i++) {
String paramType = paramsType[i];
String param = params[i];
if (paramType.startsWith("startId:")) {
// 配置了動態(tài)開始ID
paramType = paramType.split(":", 2)[1];
if (startId > 0L) {
param = String.valueOf(startId);
}
}
// 參數類型判斷
if (ParamsTypeEnum.Int.getType().equals(paramType.toLowerCase())) {
ps.setInt(i + 1, Integer.parseInt(param));
} else if (ParamsTypeEnum.Long.getType().equals(paramType.toLowerCase())) {
ps.setLong(i + 1, Long.parseLong(param));
} else if (ParamsTypeEnum.String.getType().equals(paramType.toLowerCase())) {
ps.setString(i + 1, param);
}
}
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
super.close();
this.isRunning = false;
if (dataSource != null) {
//關閉連接和釋放資源
dataSource.close();
}
}
public static class MysqlSourceFunctionBuilder {
private final MySQLSourceFunction source = new MySQLSourceFunction();
public MysqlSourceFunctionBuilder() {
}
public MySQLSourceFunction.MysqlSourceFunctionBuilder setUsername(String username) {
this.source.username = username;
return this;
}
public MySQLSourceFunction.MysqlSourceFunctionBuilder setPassword(String password) {
this.source.password = password;
return this;
}
public MySQLSourceFunction.MysqlSourceFunctionBuilder setDrivername(String drivername) {
this.source.drivername = drivername;
return this;
}
public MySQLSourceFunction.MysqlSourceFunctionBuilder setDBUrl(String dbURL) {
this.source.dbURL = dbURL;
return this;
}
public MySQLSourceFunction.MysqlSourceFunctionBuilder setQuery(String query) {
this.source.queryTemplate = query;
return this;
}
public MySQLSourceFunction.MysqlSourceFunctionBuilder setParams(String params) {
this.source.params = params.split(",");
return this;
}
public MysqlSourceFunctionBuilder setParamsType(String parameterValue) {
this.source.paramsType = parameterValue.split(",");
return this;
}
public MySQLSourceFunction finish() {
if (this.source.username == null) {
log.info("Username was not supplied separately.");
}
if (this.source.password == null) {
log.info("Password was not supplied separately.");
}
if (this.source.dbURL == null) {
throw new IllegalArgumentException("No database URL supplied");
} else if (this.source.queryTemplate == null) {
throw new IllegalArgumentException("No query supplied");
} else if (this.source.drivername == null) {
throw new IllegalArgumentException("No driver supplied");
} else {
return this.source;
}
}
}
}
優(yōu)化點1:分頁查詢優(yōu)化
分頁查詢
select * from goods order by id limit offset,size
一開始我是使用的這種sql去循環(huán)查詢數據,但是發(fā)現隨著分頁數量的增大,上述sql的查詢速度越慢,千萬級別的數據表到后續(xù)的查詢速度就不可恭維了。雖然id是主鍵。但是隨著分頁數量的增大依舊很慢,比如offset到了200萬那么查詢速度就已經到了10幾秒了。
以為數據庫會先order by ,然后再遍歷前offset條數據,再取offset 后 數量(size)的數據,也就是說如果是 limit 200000,1000 數據庫會遍歷200000條數據,這性能就很慢了。
那么如何優(yōu)化呢?首先我們知道id主鍵是聚簇索引,MySQL InnoDB對于聚簇索引使用的是B+樹,葉子節(jié)點存的是整行的數據,所以在跳過offset時,數據庫需要處理大量結果集 selcet * 消耗內存等系統(tǒng)資源,所以可以如下優(yōu)化。
select * from goods a inner join
(select id from goods order by id limit 2000000,1000) b on a.id=b.id;
雖然說查詢性能 select id from goods order by id limit 2000000,1000
與select * from goods order by id limit 2000000,1000
是一致的但是返回的結果集的大小不一致,可能與業(yè)務有點特殊,該表字段較多,導致結果集很大,所以在跳過offset的時消耗系統(tǒng)資源較多,會導致查詢速度遠慢于第二個sql語句。
這個查詢速度差不多得2秒左右,已經優(yōu)化很多,但還能不能優(yōu)化呢?
可以看到前面分頁查詢的主要性能缺陷在于,需要遍歷前offset的數據,隨之分頁數量增大那么查詢性能越慢。那么如果我可以不讓他遍歷前offset的數據,不就加大了查詢速度嘛。下面看看第三條SQL
select * from goods where id>=xxx order by id desc limit 1000;
這條sql直接到了毫秒級的查詢速度。因為id是有索引的,只要我知道之前的查詢最大的id,那么我就可以當成where條件防止遍歷前offset的數據導致的查詢效率慢的問題。
我這里的業(yè)務是全量遍歷所以很容易記錄上一次查詢到的id,如上述代碼的startId。
優(yōu)化點2:數據庫鏈接優(yōu)化
查詢速度解決了,flink的source開始嗖嗖跑。一開始并沒有使用連接池,而是使用的DriverManager創(chuàng)建數據庫鏈接,并且循環(huán)查詢時ps一直沒有重新獲取,導致,在flink運行過程中當數據量達到300萬條時,JVM的task heap空間就已經達到2G,無法再進行同步,job宕機重啟。
并且我是逮著一個PreparedStatement ps一直用。相當于我每次循環(huán)都不會新建鏈接也不會重新獲取prepareStatement ,resultSet也沒有關閉。這個問題查了很久。
最后通過工具,jprofiler實時監(jiān)控本地運行flink發(fā)現,存在大量的buffer未被GC掉,根據溯源發(fā)現是查詢到的結果集導致的。所以想到應該是 ps及rs循環(huán)沒有關閉導致,結果集與其存在引用關系,很難GC掉,導致隨著數據查詢的增多,JVM內存占用空間加大。
所以最后使用了 數據庫連接池以及每次重新獲取ps的方式來斷掉之前處理過的結果集的引用關系讓JVM GC掉才解決內存不斷增長,job自動關閉的問題。
// 使用try限制 connection與ps的作用域
try (DruidPooledConnection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(queryTemplate)) {
generateParams(ps);
// 啟用流式查詢,提高性能
ps.setFetchSize(Integer.MIN_VALUE);
}
優(yōu)化點3:合理使用Flink反壓機制
flink的反壓機制可以看看這個文檔Flink詳解系列之九–反壓機制和處理 - 知乎 (zhihu.com)
總的來說反壓是在實時數據處理中,數據管道某個節(jié)點上游產生數據的速度大于該節(jié)點處理數據速度的一種現象。反壓會從該節(jié)點向上游傳遞,一直到數據源,并降低數據源的攝入速度。
在自定義數據源中,主要發(fā)送數據源信息的代碼在
sourceContext.collect(buffer.poll()); //buffer是本地查詢到的結果集緩存
可以說,當下游數據處理趕不上上有數據傳輸時,flink會自動限制該方法collect的速度,通過阻塞的方式限制流數據向下游傳輸。
因為我們上述優(yōu)化優(yōu)化了sql的查詢速度,所以導致sql查詢速度很快,而es的寫是很消耗性能的(空間換時間)所以明顯下游es寫索引的處理速度不及上游數據庫查詢的速度,造成反壓,可能會造成數據源內存堆積從而導致OOM。優(yōu)化如下:
// open
this.buffer = new ConcurrentLinkedQueue<>();
// run
executorService.execute(() -> {
while (isRunning) {
if (buffer.isEmpty()) {
try {
generateSourceData();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
});
while (isRunning) {
while (!buffer.isEmpty()) {
sourceContext.collect(buffer.poll());
}
}
起一個子線程進行生成數據存在緩存隊列中,限制當buffer消耗完成,也就是下游正常接收時才進入循環(huán)查詢數據。這樣當collect無法請求傳輸數據時(反壓),就會減緩對buffer數據的collect,而buffer會等到消耗完成才會進行新的查詢,這樣就解決了上下游處理時間不一致的情況。
ElasticSearch
當所有數據都跑到es了,就要開始寫查詢es的接口了。
我這里有兩個字段需要進行模糊查詢,一個是名稱,一個是編號。名稱是中文,編號是數字。而es無法對非text類型進行分詞,所以在創(chuàng)建索引是將編號字段改為了text類型并使用ngram定義了分詞器,中文使用ik分詞。
searchInfo.getParams().forEach((key, value) -> {
if (wildcardQueryParams.contains(key)) {
wildcardQueries.put(key, String.valueOf(value));
} else {
matchQueries.put(key, String.valueOf(value));
}
});
發(fā)現這種查詢并不算快,400ms左右??紤]es是否能像MySQL一樣,比如先精準查詢再模糊查詢?
答案是肯定的。
首先先摒棄matchQuery作為精準查詢的方法,改為使用termQuery,因為matchQuery
會使用分析器對查詢文本進行分析,生成相應的詞項。而termQuery
不會進行任何分析,直接在倒排索引中查找完全匹配的詞項。
同時termQuery
既可以做過濾條件、也可以做查詢條件,那么有什么區(qū)別?應該用什么?
- 過濾條件(filter context)
- 用于精確匹配的情況,如數字、日期或布爾值等。
- 不計算分數,只關注文檔是否匹配。
- 性能非常高,因為Elasticsearch可以直接從倒排索引中查找匹配的文檔。
- 查詢條件(query context)
- 用于全文搜索和模糊匹配的情況。
- 計算分數,并考慮相關性因素。
- 執(zhí)行速度通常比過濾條件慢,因為它需要進行額外的操作。
根據分析,我們業(yè)務使用過濾條件更合理。然后就執(zhí)行先后順序的問題,理論上說先執(zhí)行精準查詢再執(zhí)行模糊查詢的效率更高
// es查詢
HashMap<String, String> wildcardQueries = new HashMap<>();
HashMap<String, String> termQueries = new HashMap<>();
searchInfo.getParams().forEach((key, value) -> {
if (wildcardQueryParams.contains(key)) {
wildcardQueries.put(key, String.valueOf(value));
} else {
termQueries.put(key, String.valueOf(value));
}
});
// 先添加term過濾查詢
termQueries.forEach((key, value) -> queryBuilder.filter(QueryBuilders.termQuery(key, value)));
// 再添加wildcard查詢
wildcardQueries.forEach((key, value) -> queryBuilder.must(QueryBuilders.wildcardQuery(key, value)));
現在基本上就維持在80ms左右,還是優(yōu)化了很多。
es hits total值問題
在分頁查詢時獲取到的hits是有數據的但是總數為0
這個是個官方的bug,ES_記一次分頁查詢(getHits().getTotalHits() 獲取總條目)為0的問題-CSDN博客
需要升級client版本至6.8.4,我的版本是7.x還是有這個問題,后續(xù)發(fā)現RequestConverters這個類我全局搜有兩個,發(fā)現我有兩個版本的client,那么答案呼之欲出了,把不需要的版本去掉就行。
總數大于10000時與數據庫總數對不上
當我查詢時發(fā)現,我數據庫查詢同樣的條件總數為83000條,但是es查詢出來是10000條,很明顯不對。
原因:es 7.x版本后對分頁查詢做了限制當文檔數大于10000時,只顯示10000。
解決方案:查詢時加上參數track_total_hits:true文章來源:http://www.zghlxwxcb.cn/news/detail-753631.html
searchSourceBuilder.trackTotalHits(true);
至此,結束。文章來源地址http://www.zghlxwxcb.cn/news/detail-753631.html
到了這里,關于記一次模糊查詢踩坑 Flink+ES的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!