背景:使用JdbcTemplate查詢500萬(wàn)數(shù)據(jù),然后插入到數(shù)據(jù)庫(kù)。
這么多的數(shù)據(jù)按照普通的方式直接查詢?nèi)缓蟛迦?,服?wù)器肯定會(huì)掛掉,我嘗試過使用分頁(yè)查詢的方式去進(jìn)行分批查詢插入,雖然也能達(dá)到保證服務(wù)器不掛掉的效果,但是有一個(gè)嚴(yán)重的問題,每次查詢的數(shù)據(jù)很難保證順序性,第一次一查詢的數(shù)據(jù)可能又出現(xiàn)在第N次的查詢結(jié)果中,雖然可以通過在查詢sql中加上排序,可以保證多次查詢的順序不變,但是這種分頁(yè)查詢方式還是不夠嚴(yán)謹(jǐn),因?yàn)樵诙啻尾樵冞^程中,可能數(shù)據(jù)有新增或刪除,即使保證了排序唯一性,也會(huì)導(dǎo)致數(shù)據(jù)少取或取重復(fù)問題。
這個(gè)過程中需要解決的問題:
一、內(nèi)存溢出
使用jdbcTemplate.queryForList查詢一次讀取500萬(wàn)條數(shù)據(jù),會(huì)占用大量?jī)?nèi)存,一般的服務(wù)器都會(huì)內(nèi)存溢出報(bào)錯(cuò),jdbcTemplate默認(rèn)使用RowMapperResultSetExtractor來(lái)處理ResultSet結(jié)果集,會(huì)將數(shù)據(jù)全部讀取到內(nèi)存:
因此我們需要自己寫一個(gè)實(shí)現(xiàn)類繼承ResultSetExtractor,去實(shí)現(xiàn)讀取ResultSet的邏輯。
一、批量插入速度慢
我們使用jdbcTemplate的batchUpdate方法批量保存數(shù)據(jù)時(shí),要想真正進(jìn)行批量保存需要幾個(gè)條件
1.首先要數(shù)據(jù)庫(kù)本身要支持批量更新,一般主流數(shù)據(jù)庫(kù)都會(huì)支持。
2.插入的sql語(yǔ)句不要使用子查詢
插入語(yǔ)句只使用insert into table() values()這種,不要在values中使用select語(yǔ)句
3.數(shù)據(jù)源連接設(shè)置rewriteBatchedStatements=true這個(gè)參數(shù)
在oracle驅(qū)動(dòng)中rewriteBatchedStatements參數(shù)默認(rèn)是開啟的,mysql沒有開啟,需要在數(shù)據(jù)源url連接中手動(dòng)設(shè)置:
?自定義ResultSetExtractor如下:
package com.zhou.db.model;
import com.zhou.db.util.SqlUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.support.JdbcUtils;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 查詢數(shù)據(jù)自定義處理ResultSet
* @author lang.zhou
* @since 2023/1/9 17:42
*/
@Slf4j
public abstract class DataMapCallBackExtractor implements ResultSetExtractor<List<Map<String,Object>>> {
/**
* 每次讀取10000條時(shí)開始插入
*/
@Getter
private int batchQuerySize = 1000;
@Getter
private List<DbColumn> columnList = new ArrayList<>(0);
/**
* 數(shù)據(jù)條數(shù)
*/
@Getter
private int dataCount = 0;
public DataMapCallBackExtractor() {
}
public DataMapCallBackExtractor(int batchQuerySize) {
if(batchQuerySize > 1000){
this.batchQuerySize = batchQuerySize;
}
}
@Override
public List<Map<String,Object>> extractData(ResultSet rs) throws SQLException, DataAccessException {
ResultSetMetaData resultSetMetaData = rs.getMetaData();
//結(jié)果集列數(shù)
int count = resultSetMetaData.getColumnCount();
//已經(jīng)執(zhí)行回調(diào)的次數(shù)
int times = 0;
//讀取列信息
for (int i = 1; i < count + 1; i++) {
columnList.add(SqlUtil.readResultColumn(resultSetMetaData,i));
}
//讀取列信息后回調(diào)
this.columnInfoCallback(columnList);
List<Map<String, Object>> list = new ArrayList<>();
while(rs.next()){
//總條數(shù)增加
dataCount ++;
Map<String, Object> e = new LinkedHashMap<>(count);
//讀取這一行的數(shù)據(jù)
for (int i = 1; i < count + 1; i++) {
e.putIfAbsent(JdbcUtils.lookupColumnName(resultSetMetaData, i), JdbcUtils.getResultSetValue(rs, i));
}
list.add(e);
//讀取滿10000條時(shí)開始插入數(shù)據(jù)
if(list.size() >= batchQuerySize){
times ++;
this.dataCallback(list,times,dataCount);
//處理完成清空已讀取的數(shù)據(jù),釋放內(nèi)存
list.clear();
}
}
//可能最后一次讀取不滿10000條,插入剩余的數(shù)據(jù)
if(list.size() > 0){
times ++;
this.dataCallback(list,times,dataCount);
list.clear();
}
return new ArrayList<>(0);
}
/**
* 讀取batchQuerySize條數(shù)據(jù)后自定義處理回調(diào)
*/
public abstract void dataCallback(List<Map<String, Object>> list, int times, int n);
/**
* 讀取列信息后回調(diào)
*/
public void columnInfoCallback(List<DbColumn> columnList){
}
}
?我們拿到ResultSet后,每次只讀取10000條數(shù)據(jù)存到List中,然后將這些數(shù)據(jù)插入數(shù)據(jù)庫(kù),在插入結(jié)束之后清空這個(gè)List,jvm會(huì)回收這些數(shù)據(jù)釋放內(nèi)存,一直重復(fù)這個(gè)過程直到結(jié)果集讀取完畢,這樣能保證內(nèi)存中只流程10000條數(shù)據(jù),就避免了內(nèi)存泄漏的情況產(chǎn)生。
分批插入代碼,提升插入速度:
/**
* 數(shù)據(jù)分批插入
*/
public void batchSizeUpdate(List<Map<String,Object>> list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate, int batchSize){
int size = list.size();
int n = size / batchSize;
int l = size % batchSize;
if(l > 0){
n++;
}
log.info("總共分"+n+"次插入");
for (int i = 0; i < n; i++) {
int start = i*batchSize;
int end = (i+1)*batchSize;
if(end > size){
end = size;
}
batchUpdate(list.subList(start,end),sql, namedParameterJdbcTemplate);
log.info("第"+(i+1)+"次插入完畢");
}
}
private void batchUpdate(List<Map<String,Object>> list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate){
Map<String,?> [] param = new Map[list.size()];
for(int c= 0;c<list.size();c++){
param[c] = list.get(c);
}
namedParameterJdbcTemplate.batchUpdate(sql,param);
}
?最終調(diào)用方式:
//一次讀取10000條后進(jìn)行回調(diào)
DataMapCallBackExtractor extractor = new DataMapCallBackExtractor(10000){
@Override
public void dataCallback(List<Map<String, Object>> list, int times, int n) {
log.info("第{}次讀取{}條,共{}條",times,list.size(),n);
//分批插入,一次1000條
batchSizeUpdate(list,insertSql,insertJdbcTemplate);
}
@Override
public void columnInfoCallback(List<DbColumn> columnList) {
//讀取結(jié)果集之前回調(diào),拿到列信息進(jìn)行一些處理
//比如拼接插入sql
}
};
jdbcTemplate.query(sql, new HashMap<>(0),extractor);
?SqlUtil中讀取列信息的代碼:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-768619.html
/**
* 從ResultSet中讀取sql列信息
* @param rs 結(jié)果集
* @param i 列位置
*/
@SneakyThrows
public static DbColumn readResultColumn(ResultSetMetaData rs,int i){
DbColumn c = new DbColumn();
c.setName(rs.getColumnName(i));
c.setComments(rs.getColumnLabel(i));
int type = rs.getColumnType(i);
c.setDataType(rs.getColumnTypeName(i));
c.setNullable(rs.isNullable(i) == ResultSetMetaData.columnNoNulls ? "N" : "Y");
if(type == Types.VARCHAR || type == Types.CHAR || type == Types.LONGVARCHAR || type == Types.CLOB){
c.setDataLength(rs.getColumnDisplaySize(i));
}else if(type == Types.NUMERIC || type == Types.INTEGER || type == Types.BIGINT || type == Types.DECIMAL
|| type == Types.DOUBLE || type == Types.FLOAT || type == Types.REAL || type == Types.SMALLINT || type == Types.TINYINT){
c.setDataLength(rs.getPrecision(i));
}else if(type == Types.DATE || type == Types.TIMESTAMP){
c.setDataLength(rs.getPrecision(i));
}
c.setDataScale(rs.getScale(i));
return c;
}
字段列信息實(shí)體:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-768619.html
import lombok.Data;
import java.util.Objects;
/**
* 數(shù)據(jù)庫(kù)表字段基本信息
* @author lang.zhou
* @since 2022/10/17 14:31
*/
@Data
public class DbColumn {
private String tableName;
/**
* 字段名
*/
private String name;
/**
* 字段描述
*/
private String comments;
/**
* 可為空
*/
private String nullable = "Y";
private String dataType = null;
private Integer isPk = 0;
private Integer dataLength = 0;
private Integer dataScale = 0;
public boolean isPk(){
return name != null && isPk > 0;
}
public boolean isDate(){
return name != null && ("DATE".equalsIgnoreCase(dataType) || "TIMESTAMP".equalsIgnoreCase(dataType) || "DATETIME".equalsIgnoreCase(dataType));
}
public boolean isNumber(){
return name != null && ("NUMBER".equalsIgnoreCase(dataType) || "DECIMAL".equalsIgnoreCase(dataType) || "INTEGER".equalsIgnoreCase(dataType)
|| "INT".equalsIgnoreCase(dataType)|| "BIGINT".equalsIgnoreCase(dataType)|| "DOUBLE".equalsIgnoreCase(dataType)|| "LONG".equalsIgnoreCase(dataType)));
}
public boolean isChar(){
return name != null && "CHAR".equalsIgnoreCase(dataType);
}
public boolean allowNull(){
return !isPk() && Objects.equals(nullable,"Y");
}
}
到了這里,關(guān)于java使用jdbcTemplate查詢并插入百萬(wàn)級(jí)數(shù)據(jù)解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!