問題
????????最近在一個(gè)大數(shù)據(jù)的項(xiàng)目開發(fā)中使用到了數(shù)據(jù)同步工具DataX,但在使用過程中發(fā)現(xiàn)了DataX對HIve分區(qū)表的支持不太友好。
????????具體體現(xiàn)在將數(shù)據(jù)庫中的數(shù)據(jù)同步到HIVE分區(qū)表時(shí),寫入目錄為HIVE表分區(qū)為dt=XXXX,如果不提前創(chuàng)建該分區(qū),會(huì)報(bào)目錄不存在的錯(cuò)誤,如下圖:
?
?文章來源地址http://www.zghlxwxcb.cn/news/detail-574573.html
原因分析?
? ? ? ? ?這個(gè)錯(cuò)誤是由于DataX不支持在HDFS上創(chuàng)建目錄導(dǎo)致的。
?文章來源:http://www.zghlxwxcb.cn/news/detail-574573.html
解決辦法
????????二次開發(fā)DataX,在寫入時(shí)檢測目錄,若目錄不存在自動(dòng)創(chuàng)建此分區(qū)目錄。
步驟:1.從GitHub下載datax源碼? 鏈接
? ? ? ? ? ?2.修改hdfswriter目錄下的HdfsWriter.java源碼,重寫里面的prepare方法:
@Override
public void prepare() {
//若路徑已經(jīng)存在,檢查path是否是目錄
if (hdfsHelper.isPathexists(path)) {
if (!hdfsHelper.isPathDir(path)) {
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("您配置的path: [%s] 不是一個(gè)合法的目錄, 請您注意文件重名, 不合法目錄名等情況.",
path));
}
//根據(jù)writeMode對目錄下文件進(jìn)行處理
Path[] existFilePaths = hdfsHelper.hdfsDirList(path, fileName);
boolean isExistFile = false;
if (existFilePaths.length > 0) {
isExistFile = true;
}
if ("append".equalsIgnoreCase(writeMode)) {
LOG.info(String.format("由于您配置了writeMode append, 寫入前不做清理工作, [%s] 目錄下寫入相應(yīng)文件名前綴 [%s] 的文件",
path, fileName));
} else if ("nonconflict".equalsIgnoreCase(writeMode) && isExistFile) {
LOG.info(String.format("由于您配置了writeMode nonConflict, 開始檢查 [%s] 下面的內(nèi)容", path));
List<String> allFiles = new ArrayList<String>();
for (Path eachFile : existFilePaths) {
allFiles.add(eachFile.toString());
}
LOG.error(String.format("沖突文件列表為: [%s]", StringUtils.join(allFiles, ",")));
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目錄不為空, 下面存在其他文件或文件夾.", path));
} else if ("truncate".equalsIgnoreCase(writeMode) && isExistFile) {
LOG.info(String.format("由于您配置了writeMode truncate, [%s] 下面的內(nèi)容將被覆蓋重寫", path));
hdfsHelper.deleteFiles(existFilePaths);
}
} else {
LOG.info(String.format("您配置的路徑: [%s] 不存在,自動(dòng)為您創(chuàng)建此路徑", path));
hdfsHelper.createPath(path);
}
}
? ? ? ? ? ?3.修改hdfswriter目錄下的HdfsHelper.java源碼,在里面添加createPath方法:
public boolean createPath(String filePath) {
Path path = new Path(filePath);
boolean exist = false;
try {
if (fileSystem.exists(path)) {
String message = String.format("文件路徑[%s]已存在,無需創(chuàng)建!",
"message:filePath =" + filePath);
LOG.info(message);
exist = true;
} else {
exist = fileSystem.mkdirs(path);
}
} catch (IOException e) {
String message = String.format("創(chuàng)建文件路徑[%s]時(shí)發(fā)生網(wǎng)絡(luò)IO異常,請檢查您的網(wǎng)絡(luò)是否正常!",
"message:filePath =" + filePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
return exist;
}
? ? ? ? ? ?4.打包修改后的源碼,并替換掉集群的datax安裝目錄datax/plugin/writer/hdfswriter/hdfswriter-0.0.1-SNAPSHOT.jar即可。(在DataX-master根目錄下的pom.xml文件核心組件、公共組件、reader/writer插件都以module的方式組裝到一起了,把不需要的注釋掉可加快打包速度)
?
處理結(jié)果
? ? ? ? 二次開發(fā)后完美運(yùn)行:
????????
?
?
到了這里,關(guān)于二次開發(fā)DataX以支持HIVE分區(qū)表的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!