簡介
ETL是英文Extract-Transform-Load的縮寫,用來描述將數(shù)據(jù)從源端經(jīng)過抽取(extract)、轉(zhuǎn)換(transform)、加載(load)至目的端的過程,它能夠?qū)Ω鞣N分布的、異構(gòu)的源數(shù)據(jù)(如關(guān)系數(shù)據(jù))進行抽取,按照預(yù)先設(shè)計的規(guī)則將不完整數(shù)據(jù)、重復(fù)數(shù)據(jù)以及錯誤數(shù)據(jù)等“臟"數(shù)據(jù)內(nèi)容進行清洗,得到符合要求的“干凈”數(shù)據(jù),并加載到數(shù)據(jù)倉庫中進行存儲,這些“干凈”數(shù)據(jù)就成為了數(shù)據(jù)分析、數(shù)據(jù)挖掘的基石。
kettle是一個開源ETL工具。kettle提供了基于java的圖形化界面,使用很方便。kettle提供了基于 JAVA的腳步編寫功能,可以靈活地自定義ETL過程,使自行定制、批量處理等成為可能,這才是一個程序員需要做的工作,而不僅是象使用word一樣操作 kettle用戶界面。
環(huán)境集成:
參考:java集成kettle教程(附示例代碼)_kettle java_成偉平2022的博客-CSDN博客
代碼:
pom.xml添加:
<!--mysql數(shù)據(jù)庫鏈接驅(qū)動以及連接池-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.11</version>
</dependency>
<!-- kettle 工具本地jar包加載 -->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>8.2.0.7-719</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kettle-core-8.2.0.7-719.jar</systemPath>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>8.2.0.7-719</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kettle-engine-8.2.0.7-719.jar</systemPath>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>metastore</artifactId>
<version>8.2.0.7-719</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/metastore-8.2.0.7-719.jar</systemPath>
</dependency>
<!--kettle需要用到的其它依賴-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>
<dependency>
<groupId>net.sourceforge.jexcelapi</groupId>
<artifactId>jxl</artifactId>
<version>2.6.12</version>
</dependency>
@RestController
@RequestMapping("${application.admin-path}/etl-kettl")
//@Api(tags = "ETL-Kettle的demo接口")
public class KettleDemoContrllor {
@Resource
KettleService kettleService;
@GetMapping("/execKtr")
//@ApiOperation("執(zhí)行ktr文件")
private Object runKtr(String filename) throws Exception {
return R.buildOkData(kettleService.runTaskKtr(filename,null).toString());
}
@GetMapping("/execKjb")
//@ApiOperation("執(zhí)行kjb文件")
private Object runKjb(String filename) throws Exception {
return R.buildOkData(kettleService.runTaskKjb(filename, null).toString());
}
}
public interface KettleService {
/**
* 開始執(zhí)行ETL任務(wù)(ktr文件)
*
* @param taskFileName 執(zhí)行的任務(wù)文件名(ktr)
* @param params 執(zhí)行任務(wù)輸入的參數(shù)
* @return 運行結(jié)果
* @throws Exception 沒有找到配置文件,Kettle的運行異常不會拋出
*/
Object runTaskKtr(String taskFileName, Map<String, String> params) throws Exception;
/**
* 開始執(zhí)行ETL任務(wù)(kjb文件)
*
* @param taskFileName 執(zhí)行的任務(wù)文件名(kjb)
* @param params 執(zhí)行任務(wù)輸入的參數(shù)
* @return 運行結(jié)果
* @throws Exception 沒有找到配置文件,Kettle的運行異常不會拋出
*/
Object runTaskKjb(String taskFileName, Map<String, String> params) throws Exception;
}
@Service
public class KettleServiceImpl implements KettleService {
@Value("${kettle.script.path}")
private String kettleScriptPath;
private static final Logger logger = LoggerFactory.getLogger("kettle-service-log");
private final List<KtrMeta> KTR_METAS = new ArrayList<>();
private final List<KjbMeta> KJB_METAS = new ArrayList<>();
private List<String> getFiles(String path, String subName) {
List<String> files = new ArrayList<>();
File file = new File(path);
File[] tempList = file.listFiles();
if (tempList == null){
return files;
}
for (File value : tempList) {
if (value.isFile()) {
if (Objects.equals(value.toString().substring(value.toString().length() - 3), subName)) {
files.add(value.getName());
}
}
}
return files;
}
//采用單列模式,項目啟動時加載環(huán)境,加載所有的轉(zhuǎn)換配置、任務(wù)配置,后續(xù)執(zhí)行就會快一點
//@PostConstruct
public void init() throws KettleException {
logger.info("----------------------開始初始化ETL配置------------------------");
KettleEnvironment.init();
List<String> ktrFiles = getFiles(kettleScriptPath, "ktr");
List<String> kjbFiles = getFiles(kettleScriptPath, "kjb");
logger.info("需要加載的轉(zhuǎn)換為:" + ktrFiles.toString());
logger.info("需要加載的任務(wù)為:" + kjbFiles.toString());
logger.info("----------------------開始加載ETL配置--------------------------");
for (String ktrFile : ktrFiles) {
KtrMeta ktrMeta = new KtrMeta();
ktrMeta.setName(ktrFile);
ktrMeta.setTransMeta(new TransMeta(kettleScriptPath + ktrFile));
KTR_METAS.add(ktrMeta);
logger.info("成功加載轉(zhuǎn)換配置:" + ktrFile);
}
for (String kjbFile : kjbFiles) {
KjbMeta kjbMeta = new KjbMeta();
kjbMeta.setName(kjbFile);
kjbMeta.setJobMeta(new JobMeta(kettleScriptPath + kjbFile, null));
KJB_METAS.add(kjbMeta);
logger.info("成功加載任務(wù)配置:" + kjbFile);
}
logger.info("----------------------全部ETL配置加載完畢-----------------------");
}
@Override
public Object runTaskKtr(String ktrFileName, Map<String, String> params) {
logger.info("開始執(zhí)行轉(zhuǎn)換:" + ktrFileName);
TransMeta transMeta = null;
for (KtrMeta ktrMeta : KTR_METAS) {
if(Objects.equals(ktrFileName,ktrMeta.getName())){
transMeta = ktrMeta.getTransMeta();
break;
}
}
//如果在緩存的列表里面沒找到需要自信的配置,嘗試手動加載
try {
if (transMeta == null) {
logger.warn("資源池沒有找到配置文件:" + ktrFileName+" 嘗試二次加載!");
KettleEnvironment.init();
transMeta = new TransMeta(kettleScriptPath + File.separator + ktrFileName);
if(transMeta==null) throw new RuntimeException("未找到需要執(zhí)行的轉(zhuǎn)換配置文件:");
}
Trans trans = new Trans(transMeta);
if (params != null) {
for (Map.Entry<String, String> entry : params.entrySet()) {
trans.setParameterValue(entry.getKey(), entry.getValue());
}
}
//trans.prepareExecution(null);
//trans.startThreads(); //啟用新的線程加載
trans.execute(null);
trans.waitUntilFinished();
return trans.getResult();
}catch (Exception e)
{
e.printStackTrace();
return e.getMessage();
}
}
@Override
public Object runTaskKjb(String objFileName, Map<String, String> params) throws Exception {
logger.info("開始執(zhí)行任務(wù):" + objFileName);
JobMeta jobMeta = null;
for (KjbMeta kjbMeta : KJB_METAS) {
if(Objects.equals(objFileName,kjbMeta.getName())){
jobMeta = kjbMeta.getJobMeta();
}
}
try {
if (jobMeta == null) {
logger.warn("資源池沒有找到配置文件:" + objFileName+" 嘗試二次加載!");
KettleEnvironment.init();
jobMeta = new JobMeta(kettleScriptPath + File.separator + objFileName,null);
if(jobMeta==null) throw new RuntimeException("未找到需要執(zhí)行的任務(wù)配置文件:"+objFileName);
}
Job job = new Job(null, jobMeta);
if (params != null) {
for (Map.Entry<String, String> entry : params.entrySet()) {
job.setParameterValue(entry.getKey(), entry.getValue());
}
}
job.start();
job.waitUntilFinished();
return job.getResult();
}catch (Exception e)
{
e.printStackTrace();
return e.getMessage();
}
}
}
@Data
public class KtrMeta {
private TransMeta transMeta;
private String name;
}
@Data
public class KjbMeta {
private JobMeta jobMeta;
private String name;
}
總結(jié):
集成后感覺沒什么必要集成到項目里面去。關(guān)鍵還是需要學(xué)會工具的使用,以便進行數(shù)據(jù)收集與治理。
參考:1_ETL和Kettle概述_嗶哩嗶哩_bilibili文章來源:http://www.zghlxwxcb.cn/news/detail-629334.html
下載: kettle工具下載文章來源地址http://www.zghlxwxcb.cn/news/detail-629334.html
到了這里,關(guān)于Springboot整合ETL引擎Kettle的使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!