Hadoop系列文章目錄
1、hadoop3.1.4簡單介紹及部署、簡單驗(yàn)證
2、HDFS操作 - shell客戶端
3、HDFS的使用(讀寫、上傳、下載、遍歷、查找文件、整個目錄拷貝、只拷貝文件、列出文件夾下文件、刪除文件及目錄、獲取文件及文件夾屬性等)-java
4、HDFS-java操作類HDFSUtil及junit測試(HDFS的常見操作以及HA環(huán)境的配置)
5、HDFS API的RESTful風(fēng)格–WebHDFS
6、HDFS的HttpFS-代理服務(wù)
7、大數(shù)據(jù)中常見的文件存儲格式以及hadoop中支持的壓縮算法
8、HDFS內(nèi)存存儲策略支持和“冷熱溫”存儲
9、hadoop高可用HA集群部署及三種方式驗(yàn)證
10、HDFS小文件解決方案–Archive
11、hadoop環(huán)境下的Sequence File的讀寫與合并
12、HDFS Trash垃圾桶回收介紹與示例
13、HDFS Snapshot快照
14、HDFS 透明加密KMS
15、MapReduce介紹及wordcount
16、MapReduce的基本用法示例-自定義序列化、排序、分區(qū)、分組和topN
17、MapReduce的分區(qū)Partition介紹
18、MapReduce的計(jì)數(shù)器與通過MapReduce讀取/寫入數(shù)據(jù)庫示例
19、Join操作map side join 和 reduce side join
20、MapReduce 工作流介紹
21、MapReduce讀寫SequenceFile、MapFile、ORCFile和ParquetFile文件
22、MapReduce使用Gzip壓縮、Snappy壓縮和Lzo壓縮算法寫文件和讀取相應(yīng)的文件
23、hadoop集群中yarn運(yùn)行mapreduce的內(nèi)存、CPU分配調(diào)度計(jì)算與優(yōu)化
本示例介紹java通過api操作hdfs。
主要包含HDFS的讀寫、上傳、下載、遍歷、查找文件、整個目錄拷貝、只拷貝文件、列出文件夾下文件、刪除文件及目錄、獲取文件及文件夾屬性等。
本文分為2個部分,即環(huán)境準(zhǔn)備和示例。
一、配置Windows下Hadoop環(huán)境
在windows上做HDFS客戶端應(yīng)用開發(fā),需要設(shè)置Hadoop環(huán)境,而且要求是windows平臺編譯的Hadoop,不然會報(bào)以下的錯誤:
#缺少winutils.exe
Could not locate executable null \bin\winutils.exe in the hadoop binaries
#缺少hadoop.dll
Unable to load native-hadoop library for your platform… using builtin-Java classes where applicable
1、解壓hadoop-3.1.4_winutils.zip文件
將已經(jīng)編譯好的Windows版本Hadoop解壓到到一個沒有中文、沒有空格的路徑下面
該文件由于不能上傳,可以參考我的筆記:https://note.youdao.com/s/Tp6Y92QO
2、配置環(huán)境變量
在windows上面配置hadoop的環(huán)境變量: HADOOP_HOME,并將%HADOOP_HOME%\bin添加到path中。
3、復(fù)制hadoop.dll文件
把hadoop3.1.4文件夾中bin目錄下的hadoop.dll文件放到系統(tǒng)盤: C:\Windows\System32 目錄
以上,完成了windows環(huán)境的配置。
二、示例
核心是從HDFS提供的api中構(gòu)造一個HDFS的訪問客戶端對象,然后通過該客戶端對象操作(增刪改查)HDFS上的文件。
1、客戶端核心類
- Configuration 配置對象類,用于加載或設(shè)置參數(shù)屬性
- FileSystem 文件系統(tǒng)對象基類。針對不同文件系統(tǒng)有不同具體實(shí)現(xiàn)。該類封裝了文件系統(tǒng)的相關(guān)操作方法。
在Java中操作HDFS,主要涉及以下Class:文章來源:http://www.zghlxwxcb.cn/news/detail-575800.html
- Configuration:該類的對象封轉(zhuǎn)了客戶端或者服務(wù)器的配置
- FileSystem:該類的對象是一個文件系統(tǒng)對象,可以用該對象的一些方法來對文件進(jìn)行操作,通過FileSystem的靜態(tài)方法get獲得該對象。
FileSystem fs = FileSystem.get(conf);
- get方法
從conf中的一個參數(shù) fs.defaultFS的配置值判斷具體是什么類型的文件系統(tǒng)。如果我們的代碼中沒有指定fs.defaultFS,并且工程classpath下也沒有給定相應(yīng)的配置,conf中的默認(rèn)值就來自于hadoop的jar包中的core-default.xml,默認(rèn)值為: file:///,則獲取的將不是一個DistributedFileSystem的實(shí)例,而是一個本地文件系統(tǒng)的客戶端對象。文章來源地址http://www.zghlxwxcb.cn/news/detail-575800.html
#獲取FileSystem方式,有2中方式
#第一種
public void getFileSystem1() throws IOException {
Configuration configuration = new Configuration();
//指定我們使用的文件系統(tǒng)類型:
configuration.set("fs.defaultFS", "hdfs://server1:8020/");
//獲取指定的文件系統(tǒng)
FileSystem fileSystem = FileSystem.get(configuration);
System.out.println(fileSystem.toString());
}
#第二種
public void getFileSystem2() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://server1:8020"), new Configuration());
System.out.println("fileSystem:"+fileSystem);
}
2、創(chuàng)建工程及示例
1)、pom.xml導(dǎo)入Maven依賴
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2)、創(chuàng)建java測試類
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.hadoop.hdfs.sentiment.dfs.impl.MgrHdfsImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HdfsTest {
private static Configuration conf = null;
private static FileSystem fileSystem = null;
private static final String HADOOP_USER_NAME = "alanchan";
private static final String DEFAULTFS = "hdfs://server1:8020";
private static final String BASICPATH = "/test_hadoop_client_java";
private static final String BASICFILEPATH = "/test_hadoop_client_java/java.txt";
private static final String LOCALFILEPATH = "D:/workspace/bigdata-component/hadoop/testhadoopclient_java.txt";
private static final String RENAMEILEPATH = "/test_hadoop_client_java_NEW/bigdata_rename.txt";
private static final String COPYEILEPATH = "/test_hadoop_client_java/bigdata.txt";
private static final String COPYEILEPATHTO = "/test_hadoop_client_java_Copy/bigdata.txt";
private static final String COPYEILEPATHTO2 = "/test_hadoop_client_java_Copy2/bigdata.txt";
// 初始化方法 用于和hdfs集群建立連接
@Before
public void connect2HDFS() throws IOException {
// 設(shè)置客戶端身份 以具備權(quán)限在hdfs上進(jìn)行操作
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
// 創(chuàng)建配置對象實(shí)例
conf = new Configuration();
// 設(shè)置操作的文件系統(tǒng)是HDFS 并且指定HDFS操作地址
conf.set("fs.defaultFS", DEFAULTFS);
// 創(chuàng)建FileSystem對象實(shí)例
fileSystem = FileSystem.get(conf);
}
@Test
public void mkdir() throws IOException {
// FileSystem.exists指定HDFS文件/文件夾是否存在
// Path file = new Path("hdfsPath");
// boolean isExists = fs.exists(file);
// 首先判斷文件夾是否存在,如果不存在再創(chuàng)建
if (!fileSystem.exists(new Path(BASICPATH))) {
// 創(chuàng)建文件夾
fileSystem.mkdirs(new Path(BASICPATH));
}
}
//
@Test
public void putFile2HDFS() throws IOException {
// 創(chuàng)建本地文件路徑
Path src = new Path(LOCALFILEPATH);
// hdfs上傳路徑
Path dst = new Path(BASICFILEPATH);
// 文件上傳動作(local--->hdfs)
fileSystem.copyFromLocalFile(src, dst);
}
@Test
public void testHdfs() throws Exception {
MgrHdfsImpl hdfs = new MgrHdfsImpl();
Path path = new Path(BASICPATH + "/hdfs.txt");
String content = "hdfs://server1:8020/sentiment/data_p/task_20220830163507/weibo_data_2.txt";
hdfs.writeFile(path, content);
log.info(" readFile={}", hdfs.readFileToString(path));
}
@Test
public void writeFile() throws Exception {
FSDataOutputStream in = fileSystem.create(new Path(BASICPATH + "/a.txt"));
in.write("hdfs://server1:8020/sentiment/data_p/task_20220830163507/weibo_data_2.txt".getBytes());
in.flush();
in.close();
}
@Test
public void readFile() throws Exception {
// FSDataInputStream out = fileSystem.open(new Path(BASICFILEPATH));
FSDataInputStream out = fileSystem
.open(new Path("hdfs://server1:8020/sentiment/data_p/willDoing_20220830170414"));
// IOUtils.copyBytes(out,System.out,1024);
BufferedReader br = new BufferedReader(new InputStreamReader(out));
String line;
String result = "";
while ((line = br.readLine()) != null) {
// 遍歷抓取到的每一行并將其存儲到result里面
result += line + "\n";
}
// String content = out.readUTF();
System.out.println("讀文件: " + result);
// System.out.println("讀文件: " + content);
out.close();
}
//
// // 獲取文件夾下文件大小
@Test
public void getFileSize() throws IllegalArgumentException, IOException {
log.info("summary={}", fileSystem.getContentSummary(new Path(BASICPATH)).getLength());
}
//
@Test
public void getFile2Local() throws IOException {
// 源路徑:hdfs的路徑
Path src = new Path(BASICFILEPATH);
// 目標(biāo)路徑:local本地路徑
Path dst = new Path(LOCALFILEPATH);
// 文件下載動作(hdfs--->local)
fileSystem.copyToLocalFile(src, dst);
}
//
@Test
public void rename() throws Exception {
Path srcPath = new Path(BASICFILEPATH);
Path destpath = new Path(RENAMEILEPATH);
if (fileSystem.exists(srcPath)) {
fileSystem.rename(srcPath, destpath);
}
}
//沒有遞歸
@Test
public void listFiles() throws Exception {
Path destPath = new Path(BASICPATH);
if (fileSystem.exists(destPath)) {
FileStatus[] listStatus = fileSystem.listStatus(destPath);
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
log.info("file is ={}", fileStatus.getPath().getName());
} else {
log.info("dir is = {}", fileStatus.getPath().getName());
}
}
}
}
//
@Test
public void copyFiles() throws Exception {
Path srcPath = new Path(COPYEILEPATH);
Path destpath = new Path(COPYEILEPATHTO);
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
//
public void copyFiles(String src, String dest) throws Exception {
Path srcPath = new Path(src);
Path destpath = new Path(dest);
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
//
@Test
public void copyFiles2() throws Exception {
Path srcPath = new Path(COPYEILEPATH);
Path destpath = new Path(COPYEILEPATHTO2);
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
byte[] b = new byte[1024];
int hasRead = 0;
while ((hasRead = in.read(b)) > 0) {
out.write(b, 0, hasRead);
}
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
private static final String BASICPATH_COPYDIR_SRC = "/testhdfs_copyDir/src";
private static final String BASICPATH_COPYDIR_DEST = "/testhdfs_copyDir/dest";
// 遞歸遍歷文件夾
@Test
public void listDir() throws Exception {
Path path = new Path(BASICPATH_COPYDIR_SRC);
listDir(path);
}
public void listDir(Path path) throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
listDir(fileStatus.getPath());
log.info("目錄 = {}", fileStatus.getPath());
} else {
log.info("文件完整路徑 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
}
}
}
// HDFS API 遍歷文件夾中的文件
@Test
public void listDir2() throws Exception {
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem
.listFiles(new Path(BASICPATH_COPYDIR_SRC), true);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
log.info("2 --------- 文件完整路徑 = {},文件名={}", next.getPath(), next.getPath().getName());
}
}
// 文件夾拷貝,包含文件夾
@Test
public void copyDir() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR_SRC);
Path destpath = new Path(BASICPATH_COPYDIR_DEST);
// public static boolean copy(FileSystem srcFS, Path src,FileSystem dstFS, Path dst,boolean deleteSource,Configuration conf) throws IOException {
FileUtil.copy(fileSystem, srcPath, fileSystem, destpath, false, conf);
listDir(destpath);
}
// 拷貝文件及目錄,但不包含BASICPATH_COPYDIR_SRC的第一層目錄
@Test
public void copyFilesIncludeDir() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR_SRC);
Path destpath = new Path(BASICPATH_COPYDIR_DEST);
FileStatus[] fileStatuses = fileSystem.listStatus(srcPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, false, conf);
log.info("目錄 = {}", fileStatus.getPath());
} else {
FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, false, conf);
log.info("文件完整路徑 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
}
}
listDir(destpath);
}
// 拷貝源文件夾下的所有文件到目標(biāo)文件夾,不含源文件夾下的文件夾
@Test
public void copyDirOnlyFiles() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR_SRC);
Path destpath = new Path(BASICPATH_COPYDIR_DEST);
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
while (sourceFiles.hasNext()) {
FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, destpath, false, conf);
}
listDir(destpath);
}
private static final String BASICPATH_COPYDIR = "/testhdfs_copyDir";
// 查找文件
@Test
public void search() throws Exception {
Path srcPath = new Path(BASICPATH_COPYDIR);
String searchFileName = "2022年度本市工程系列計(jì)算機(jī)技術(shù)及應(yīng)用專業(yè)高級職稱評審工作已啟動.docx";
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
while (sourceFiles.hasNext()) {
Path srcFile = sourceFiles.next().getPath();
String srcFileName = srcFile.getName();
if (searchFileName.equals(srcFileName)) {
log.info("文件路徑={},查找文件名={}", srcFile, searchFileName);
}
}
}
private static final String TODELETEFILE = "/test_hadoop_client_java/bigdata.txt";
@Test
public void delete() throws Exception {
// 判斷文件是否存在
if (fileSystem.exists(new Path(TODELETEFILE))) {
fileSystem.delete(new Path(TODELETEFILE), true);
}
}
@After
public void close() {
// 首先判斷文件系統(tǒng)實(shí)例是否為null 如果不為null 進(jìn)行關(guān)閉
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
到了這里,關(guān)于3、HDFS的使用(讀寫、上傳、下載、遍歷、查找文件、整個目錄拷貝、只拷貝文件、列出文件夾下文件、刪除文件及目錄、獲取文件及文件夾屬性等)-java的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!