?? 博主 "開著拖拉機(jī)回家"帶您 Go to New World.???
?? 個(gè)人主頁——??開著拖拉機(jī)回家_Linux,大數(shù)據(jù)運(yùn)維-CSDN博客 ?????
???? 希望本文能夠給您帶來一定的幫助??文章粗淺,敬請批評指正!????
???????????????? ?????????????? ????????????????????????
?????? ????????感謝點(diǎn)贊和關(guān)注 ,每天進(jìn)步一點(diǎn)點(diǎn)!加油!?????? ????????
目錄
?? 博主 "開著拖拉機(jī)回家"帶您 Go to New World.???
一、FileSystem文件抽象類
1.1文件讀取API
1.2文件操作API
1.3抽象FileSystem類的具體實(shí)現(xiàn)子類
1.4FileSystem IO輸入系統(tǒng)相關(guān)類
1.5FileSystem IO輸出系統(tǒng)相關(guān)類
二、HDFS的API操作
2.1測試集群版本信息
2.2文件上傳下載和移動
2.3文件讀寫操作
2.4文件狀態(tài)信息獲取
2.5實(shí)戰(zhàn)案例
一、FileSystem文件抽象類
為了提供對不同數(shù)據(jù)訪問的一致接口,Hadoop借鑒了Linux虛擬文件系統(tǒng)的概念,為此Hadopo提供了一個(gè)抽象的文件系統(tǒng)模型FileSystem,HDFS 是其中的一個(gè)實(shí)現(xiàn)。
FileSystem是Hadoop中所有文件系統(tǒng)的抽象父類,它定義了文件系統(tǒng)所具有的基本特征和基本操作。
1.1文件讀取API
HadoopFileSystem操作 |
Java操作 |
Linux操作 |
描述 |
URL.openStream FileSystem.open FileSystem.create FileSystem.append |
URL.openStream |
open |
打開一個(gè)文件 |
FSDataInputStream.read |
InputStream.read |
read |
讀取文件中的數(shù)據(jù) |
FSDataInputStream.write |
OutputStream.write |
write |
向文件中寫入數(shù)據(jù) |
FSDataInputStream.close FSDataOutputStream.close |
InputStream.close OutputStream.close |
close |
關(guān)閉一個(gè)文件 |
FSDataInputStream.seek |
RandomAccessFile.seek |
lseek |
改變文件讀寫位置 |
FileSystem.getContentSummary |
du/wc |
獲取文件存儲信息 |
1.2文件操作API
HadoopFileSystem操作 |
Java操作 |
Linux操作 |
描述 |
FileSystem.getFileStatus FileSystem.get* |
File.get* |
stat |
獲取文件/目錄的屬性 |
FileSystem.set* |
File.set* |
chomd |
修改文件屬性 |
FileSystem.createNewFile |
File.createNewFile |
create |
創(chuàng)建一個(gè)文件 |
FileSystem.delete |
File.delete |
remove |
刪除一個(gè)文件 |
FileSystem.rename |
File.renameTo |
rename |
移動或先修改文件/目錄名 |
FileSystem.mkdirs |
File.mkdir |
mkdir |
創(chuàng)建目錄 |
FileSystem.delete |
File.delete |
rmdir |
從一個(gè)目錄下刪除一個(gè)子目錄 |
FileSystem.listStatus |
File.list |
readdir |
讀取一個(gè)目錄下的項(xiàng)目 |
FileSystem.setWorkingDirectory |
getcwd/getwd |
返回當(dāng)前工作目錄 |
|
FileSystem.setWorkingDirectory |
chdir |
更改當(dāng)前的工作目錄 |
1.3抽象FileSystem類的具體實(shí)現(xiàn)子類
1.4FileSystem IO輸入系統(tǒng)相關(guān)類
1.5FileSystem IO輸出系統(tǒng)相關(guān)類
二、HDFS的API操作
2.1測試集群版本信息
2.2文件上傳下載和移動
/**
* 本地文件上傳到 HDFS
*
* @param srcPath 本地路徑 + 文件名
* @param dstPath Hadoop路徑
* @param fileName 文件名
*/
def copyToHDFS(srcPath: String, dstPath: String, fileName: String): Boolean = {
var path = new Path(dstPath)
val fileSystem: FileSystem = path.getFileSystem(conf)
val isFile = new File(srcPath).isFile
// 判斷路徑是否存在
val existDstPath: Boolean = fileSystem.exists(path)
if (!existDstPath) {
fileSystem.mkdirs(path)
}
// 本地文件存在
if (isFile) {
// HDFS 采用 路徑+ 文件名
path = new Path(dstPath + File.separator + fileName)
// false: 是否刪除 目標(biāo)文件,false: 不覆蓋
fileSystem.copyFromLocalFile(false, false, new Path(srcPath), path)
return true
}
false
}
/**
* Hadoop文件下載到本地
*
* @param srcPath hadoop 源文件
* @param dstPath 目標(biāo)文件
* @param fs 文件訪問對象
*/
def downLoadFromHDFS(srcPath: String, dstPath: String, fs: FileSystem): Unit = {
val srcPathHDFS = new Path(srcPath)
val dstPathLocal = new Path(dstPath)
// false: 不刪除源文件
fs.copyToLocalFile(false, srcPathHDFS, dstPathLocal)
}
/**
* 檢查Hadoop文件是否存在并刪除
*
* @param path HDFS文件
*/
def checkFileAndDelete(path: String, fs: FileSystem) = {
val dstPath: Path = new Path(path)
if (fs.exists(dstPath)) {
// false: 是否遞歸刪除,否
fs.delete(dstPath, false)
}
}
/**
* 獲取指定目錄下,正則匹配后的文件列表
*
* @param dirPath hdfs路徑
* @param regexRule 正則表達(dá)式 ,如:"^(?!.*[.]tmp$).*$" ,匹配非 .tmp結(jié)尾的文件
*/
def listStatusHDFS(dirPath: String, regexRule: String, fs: FileSystem): util.ArrayList[Path] = {
val path = new Path(dirPath)
val pattern: Pattern = Pattern.compile(regexRule)
// 匹配的文件
val fileList = new util.ArrayList[Path]()
val fileStatusArray: Array[FileStatus] = fs.listStatus(path)
for (fileStatus <- fileStatusArray) {
// 文件 全路徑
val filePath: Path = fileStatus.getPath()
val fileName: String = filePath.getName.toLowerCase
if (regexRule.equals("")) {
// 如果匹配規(guī)則為空 則獲取目錄下的全部文件
fileList.add(filePath)
log.info("match file : " + fileName)
} else {
// 正則匹配文件
if (pattern.matcher(fileName).matches()) {
fileList.add(filePath)
log.info("match file : " + fileName)
}
}
}
fileList
}
/**
* 文件移動或重命名到指定目錄, 如:文件00000 重命名為00001
*
* @param srcPath 源文件路徑
* @param dstPath 源文件路徑
* @param fs 文件操作對象
*/
def renameToHDFS(srcPath: String, dstPath: String, fs: FileSystem): Boolean = {
var renameFlag = false
val targetPath = new Path(dstPath)
// 目標(biāo)文件存在先刪除
if (fs.exists(targetPath)) {
fs.delete(targetPath, false)
}
renameFlag = fs.rename(new Path(srcPath), targetPath)
if (renameFlag) {
log.info("renamed file " + srcPath + " to " + targetPath + " success!")
} else {
log.info("renamed file " + srcPath + " to " + targetPath + " failed!")
}
renameFlag
}
2.3文件讀寫操作
Hadoop抽象文件系統(tǒng)也是使用流機(jī)制進(jìn)行文件的讀寫。Hadoop抽象文件系統(tǒng)中,用于讀文件數(shù)據(jù)的流是FSDataInputStream,對應(yīng)地,寫文件通過抽象類FSDataOutputStream實(shí)現(xiàn)。
/**
* 讀取HDFS文件
*
* @param inPutFilePath 源文件路徑
* @param fs 文件操作對象
*/
def readFromHDFS(inPutFilePath: String, OutputFilePath: String, fs: FileSystem) = {
var fSDataInputStream: FSDataInputStream = null
var bufferedReader: BufferedReader = null
val srcPath = new Path(inPutFilePath)
if (fs.exists(srcPath)) {
val fileStatuses: Array[FileStatus] = fs.listStatus(srcPath)
for (fileStatus <- fileStatuses) {
val filePath: Path = fileStatus.getPath
// 判斷文件大小
if (fs.getContentSummary(filePath).getLength > 0) {
fSDataInputStream = fs.open(filePath)
bufferedReader = new BufferedReader(new InputStreamReader(fSDataInputStream))
var line = bufferedReader.readLine()
while (line != null) {
print(line + "\n") // 打印
line = bufferedReader.readLine()
}
}
}
}
fSDataInputStream.close()
bufferedReader.close()
}
/**
* 讀取HDFS文件, 處理完成 重新寫入
*
* @param inPutFilePath 源文件路徑
* @param OutputFilePath 輸出文件到新路徑
* @param fs 文件操作對象
*/
def writeToHDFS(inPutFilePath: String, OutputFilePath: String, fs: FileSystem) = {
var fSDataInputStream: FSDataInputStream = null
var fSDataOutputStream: FSDataOutputStream = null
var bufferedReader: BufferedReader = null
var bufferedWriter: BufferedWriter = null
val srcPath = new Path(inPutFilePath)
var count = 0
if (fs.exists(srcPath)) {
val fileStatuses: Array[FileStatus] = fs.listStatus(srcPath)
for (fileStatus <- fileStatuses) {
val filePath: Path = fileStatus.getPath
// 判斷文件大小
if (fs.getContentSummary(filePath).getLength > 0) {
fSDataInputStream = fs.open(filePath)
bufferedReader = new BufferedReader(new InputStreamReader(fSDataInputStream))
val outputFilePath = new Path(OutputFilePath + count)
fSDataOutputStream = fs.create(outputFilePath)
bufferedWriter = new BufferedWriter(new OutputStreamWriter(fSDataOutputStream, "UTF-8"))
var line = bufferedReader.readLine()
while (line != null) {
val bytes: Array[Byte] = line.getBytes("UTF-8")
bufferedWriter.write(new String(bytes) + "\n")
line = bufferedReader.readLine()
}
bufferedWriter.flush()
count += 1
}
}
}
fSDataInputStream.close()
bufferedReader.close()
bufferedWriter.close()
}
測試結(jié)果如下:
2.4文件狀態(tài)信息獲取
FileSystem. getContentSummary()提供了類似Linux命令du、df提供的功能。du表示"disk usage",它會報(bào)告特定的文件和每個(gè)子目錄所使用的磁盤空間大??;命令df則是"diskfree"的縮寫,用于顯示文件系統(tǒng)上已用的和可用的磁盤空間的大小。du、df是Linux中查看磁盤和文件系統(tǒng)狀態(tài)的重要工具。
getContentSummary()方法的輸入是一個(gè)文件或目錄的路徑,輸出是該文件或目錄的一些存儲空間信息,這些信息定義在ContentSummary,包括文件大小、文件數(shù)、目錄數(shù)、文件配額,已使用空間和已使用文件配額等。
/**
* HDFS路徑下文件信息統(tǒng)計(jì)
*
* @param dirPath hdfs路徑
**/
def listHDFSStatus(dirPath: String, fs: FileSystem) = {
val path = new Path(dirPath)
// 匹配的文件
val contentSummary: ContentSummary = fs.getContentSummary(path)
println("/tmp/kangll 目錄下子目錄個(gè)數(shù): ", contentSummary.getDirectoryCount)
println("/tmp/kangll 目錄下文件個(gè)數(shù): ", contentSummary.getFileCount)
println("/tmp/kangll 目錄下文件大?。?", contentSummary.getLength)
println("/tmp/kangll 目錄下文件和子目錄個(gè)數(shù): ", contentSummary.getFileAndDirectoryCount)
}
/tmp/kangll目錄信息獲取結(jié)果:
2.5實(shí)戰(zhàn)案例
案例說明: HDFS 文件清理, 根據(jù)文件大小、個(gè)數(shù)、程序休眠時(shí)間控制 勻速 批量刪除 HDFS 文件,當(dāng)文件越大 ,需要配置 刪除個(gè)數(shù)更少,休眠時(shí)間更長,防止 NameNode 負(fù)載過大,減輕DataNode磁盤讀寫壓力,從而不影響線上業(yè)務(wù)情況下清理過期數(shù)據(jù)。
package com.kangll.common.utils
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.{Calendar, Date, Properties}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ContentSummary, FileStatus, FileSystem, Path}
import org.apache.log4j.Logger
import scala.collection.mutable.ListBuffer
/** ***************************************************************************************
*
* @auther kangll
* @date 2023/09/12 12:10
* @desc HDFS 文件清理, 根據(jù)文件大小、個(gè)數(shù)、程序休眠時(shí)間控制 勻速 批量刪除
* HDFS 文件,當(dāng)文件越大 ,需要配置 刪除個(gè)數(shù)更少,休眠時(shí)間更長,防止
* NameNode 負(fù)載過大,減輕DataNode磁盤讀寫壓力,從而不影響線上業(yè)務(wù)下刪除
*
*
* 1.遍歷文件夾下的文件個(gè)數(shù)據(jù), 當(dāng)遍歷的文件夾下的文件個(gè)數(shù)到達(dá)閾值時(shí) 將
* 文件所述的 父路徑直接刪除
*
* ****************************************************************************************/
object CleanHDFSFileUtil {
// 刪除文件總數(shù)統(tǒng)計(jì)
var HDFS_FILE_SUM = 0
// 批次刪除文件個(gè)數(shù)顯示
var HDFS_FILE_BATCH_DEL_NUM = 0
val start = System.currentTimeMillis()
/**
*
* @param fs 文件操作對象
* @param pathName 文件根路徑
* @param fileList 批次清理的 buffer
* @param saveDay 根據(jù)文件屬性 獲取文件創(chuàng)建時(shí)間 選擇文件保留最近的天數(shù)
* @param sleepTime 休眠時(shí)間,防止一次性刪除太多文件 導(dǎo)致 datanode 文件負(fù)載太大
* @param fileBatchCount 批次刪除文件的個(gè)數(shù), 相當(dāng)于是 上報(bào)到 namenode 文件清理隊(duì)列的大小,參數(shù)越大 隊(duì)列越大,datanode 磁盤負(fù)載相對來說就高
* @return
*/
def listPath(fs: FileSystem, pathName: String, fileList: ListBuffer[String], saveDay: Int, sleepTime: Long, fileBatchCount: Int): ListBuffer[String] = {
val fm = new SimpleDateFormat("yyyy-MM-dd")
// 獲取當(dāng)前時(shí)間
val currentDay = fm.format(new Date())
val dnow = fm.parse(currentDay)
val call = Calendar.getInstance()
call.setTime(dnow)
call.add(Calendar.DATE, -saveDay)
// 獲取保留天前的時(shí)期
val saveDayDate = call.getTime
// 遍歷文件
val fileStatuses = fs.listStatus(new Path(pathName))
for (status <- fileStatuses) {
// 獲取到文件名
val filePath = status.getPath
if (status.isFile) {
// 獲取到文件修改時(shí)間
val time: Long = status.getModificationTime
val hdfsFileDate = fm.parse(fm.format(new Date(time)))
if (saveDayDate.after(hdfsFileDate)) {
fileList += filePath.toString
// 獲取文件個(gè)數(shù)
val cs: ContentSummary = fs.getContentSummary(filePath)
HDFS_FILE_SUM += cs.getFileCount.toInt
HDFS_FILE_BATCH_DEL_NUM += cs.getFileCount.toInt
if (HDFS_FILE_BATCH_DEL_NUM >= fileBatchCount) {
val end = System.currentTimeMillis()
println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
println("++++++++++++++++ 遍歷文件數(shù)量達(dá)到 " + HDFS_FILE_BATCH_DEL_NUM + " 個(gè),刪除HDFS文件 ++++++++++++++++")
println("++++++++++++++++++++++++++++ 休眠 " + sleepTime + " S ++++++++++++++++++++++++++++")
println("++++++++++++++++++++++++ 刪除文件總數(shù):" + HDFS_FILE_SUM + " ++++++++++++++++++++++++++")
println("++++++++++++++++++++++++ 程序運(yùn)行時(shí)間:" + (end - start) / 1000 + " s ++++++++++++++++++++++++")
println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
HDFS_FILE_BATCH_DEL_NUM = 0
TimeUnit.MILLISECONDS.sleep(sleepTime)
}
// 文件刪除根據(jù)絕對路徑刪除
println("+++++ 刪除文件: " + filePath + "+++++")
// 遞歸刪除
fs.delete(filePath, true)
}
} else {
// 遞歸文件夾
listPath(fs, filePath.toString, fileList, saveDay, sleepTime, fileBatchCount)
}
}
println("+++++++++++++++++++++++++ 刪除文件總數(shù):" + HDFS_FILE_SUM + " +++++++++++++++++++++++++")
fileList
}
/**
* 刪除空文件夾
*
* @param fs 文件操作對象
* @param pathName 路徑
* @param pathSplitLength 文件按照"/"拆分后的長度
*/
def delEmptyDirectory(fs: FileSystem, pathName: String, pathSplitLength: Int) = {
// 遍歷文件
val fileStatuses = fs.listStatus(new Path(pathName))
for (status <- fileStatuses) {
if (status.isDirectory) {
val path: Path = status.getPath
// /kangll/winhadoop/temp/wmall_batch_inout/day/1660878372 = 7
val delPathSplitLength = path.toString.substring(6, path.toString.length).split("/").length
// filePath /kangll/winhadoop/temp/wmall_batch_inout/day 子時(shí)間戳文件夾兩個(gè)
// val hdfsPathListCount = fileStatuses.length
val hdfsPathListCount = fs.listStatus(path).length
if (delPathSplitLength == pathSplitLength && hdfsPathListCount == 0) {
println("+++++++++++++++++ 刪除空文件夾 : " + path + " +++++++++++++++++++")
fs.delete(path, true)
}
}
}
}
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("CleanHDFSFileUtil")
val conf = new Configuration()
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
val fs = FileSystem.get(conf)
val fileList = new ListBuffer[String]
val hdfsDir = if (args.size > 0) args(0).toString else System.exit(0).toString
val saveDay = if (args.size > 1) args(1).toInt else 2
val sleepTime = if (args.size > 2) args(2).toLong else 10
val fileBatchCount = if (args.size > 3) args(3).toInt else 5
/*
默認(rèn)不啟用文件夾刪除,參數(shù)為 文件夾絕對路徑Split后的數(shù)組長度
如 路徑 /winhadoop/temp/wmall_batch_inout/thirty" 配置為 7
*/
val pathSplitLength = if (args.size > 4) args(4).toInt else 20
// 刪除文件
listPath(fs, hdfsDir, fileList, saveDay, sleepTime, fileBatchCount)
// 刪除空文件夾
delEmptyDirectory(fs, hdfsDir, pathSplitLength)
fs.close()
}
}
調(diào)用腳本
#
# 腳本功能: 過期文件清理
# 作 者: kangll
# 創(chuàng)建時(shí)間: 2023-09-14
# 修改內(nèi)容: 控制刪除文件的批次個(gè)數(shù),程序休眠時(shí)間傳入
# 當(dāng)前版本: 1.0v
# 調(diào)度周期: 一天一次
# 腳本參數(shù): 刪除文件夾、文件保留天數(shù)、程序休眠時(shí)間、批次刪除個(gè)數(shù)
# 1.文件根路徑,子文件夾遞歸遍歷
# 2.文件保留天數(shù)
# 3.程序休眠時(shí)間 防止 DataNode 刪除文件負(fù)載過大,單位 秒
# 4.批次刪除文件個(gè)數(shù) ,如配置 100,當(dāng)滿足文件個(gè)數(shù)100時(shí), 整批執(zhí)行 delete,緊接著程序休眠
# 5.默認(rèn)不啟用文件夾刪除,也就是不傳參,參數(shù)為 文件夾絕對路徑Split后的數(shù)組長度
# /winhadoop/temp/wmall_batch_inout/thirty/時(shí)間戳/ Split后 長度為7,默認(rèn)刪除時(shí)間戳文件夾
#
### 對應(yīng)的新刪除程序
jarPath=/hadoop/project/del_spark2-1.0-SNAPSHOT.jar
### 集群日志
java -classpath $jarPath com.kangll.common.utils.CleanHDFSFileUtil /spark2-history 3 10 100
參考 :
hadoop抽象文件系統(tǒng)filesystem框架介紹_org.apache.hadoop.fs.filesystem_souy_c的博客-CSDN博客文章來源地址http://www.zghlxwxcb.cn/news/detail-722942.html
Hadoop FileSystem文件系統(tǒng)的概要學(xué)習(xí) - 回眸,境界 - 博客園文章來源:http://www.zghlxwxcb.cn/news/detail-722942.html
hadoop抽象文件系統(tǒng)filesystem框架介紹_org.apache.hadoop.fs.filesystem_souy_c的博客-CSDN博客
到了這里,關(guān)于【Hadoop】HDFS API 操作大全的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!