Java?API操作Hdfs的示例詳解
1.遍歷當前目錄下所有文件與文件夾
可以使用listStatus方法實現(xiàn)上述需求。
listStatus方法簽名如下
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException;可以看出listStatus只需要傳入?yún)?shù)Path即可,返回的是一個FileStatus的數(shù)組。
而FileStatus包含有以下信息
/** Interface that represents the client side information for a file.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileStatus implements Writable, Comparable {
private Path path;
private long length;
private boolean isdir;
private short block_replication;
private long blocksize;
private long modification_time;
private long access_time;
private FsPermission permission;
private String owner;
private String group;
private Path symlink;
....
從FileStatus中不難看出,包含有文件路徑,大小,是否是目錄,block_replication, blocksize…等等各種信息。
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object HdfsOperation {
val logger = LoggerFactory.getLogger(this.getClass)
def tree(sc: SparkContext, path: String) : Unit = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val fsPath = new Path(path)
val status = fs.listStatus(fsPath)
for(filestatus:FileStatus <- status) {
logger.error("getPermission is: {}", filestatus.getPermission)
logger.error("getOwner is: {}", filestatus.getOwner)
logger.error("getGroup is: {}", filestatus.getGroup)
logger.error("getLen is: {}", filestatus.getLen)
logger.error("getModificationTime is: {}", filestatus.getModificationTime)
logger.error("getReplication is: {}", filestatus.getReplication)
logger.error("getBlockSize is: {}", filestatus.getBlockSize)
if (filestatus.isDirectory) {
val dirpath = filestatus.getPath.toString
logger.error("文件夾名字為: {}", dirpath)
tree(sc, dirpath)
} else {
val fullname = filestatus.getPath.toString
val filename = filestatus.getPath.getName
logger.error("全部文件名為: {}", fullname)
logger.error("文件名為: {}", filename)
}
}
}
}
如果判斷fileStatus是文件夾,則遞歸調用tree方法,達到全部遍歷的目的。
2.遍歷所有文件
上面的方法是遍歷所有文件以及文件夾。如果只想遍歷文件,可以使用listFiles方法。
def findFiles(sc: SparkContext, path: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val fsPath = new Path(path)
val files = fs.listFiles(fsPath, true)
while(files.hasNext) {
val filestatus = files.next()
val fullname = filestatus.getPath.toString
val filename = filestatus.getPath.getName
logger.error("全部文件名為: {}", fullname)
logger.error("文件名為: {}", filename)
logger.error("文件大小為: {}", filestatus.getLen)
}
}
/**
* List the statuses and block locations of the files in the given path.
*
* If the path is a directory,
* if recursive is false, returns files in the directory;
* if recursive is true, return files in the subtree rooted at the path.
* If the path is a file, return the file's status and block locations.
*
* @param f is the path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files
*
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
...
從源碼可以看出,listFiles 返回一個可迭代的對象RemoteIterator<LocatedFileStatus>,而listStatus返回的是個數(shù)組。同時,listFiles返回的都是文件。
3.創(chuàng)建文件夾
def mkdirToHdfs(sc: SparkContext, path: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val result = fs.mkdirs(new Path(path))
if (result) {
logger.error("mkdirs already success!")
} else {
logger.error("mkdirs had failed!")
}
}
4.刪除文件夾
def deleteOnHdfs(sc: SparkContext, path: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val result = fs.delete(new Path(path), true)
if (result) {
logger.error("delete already success!")
} else {
logger.error("delete had failed!")
}
}
5.上傳文件
def uploadToHdfs(sc: SparkContext, localPath: String, hdfsPath: String): Unit = {
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.copyFromLocalFile(new Path(localPath), new Path(hdfsPath))
fs.close()
}
6.下載文件
def downloadFromHdfs(sc: SparkContext, localPath: String, hdfsPath: String) = {
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.copyToLocalFile(new Path(hdfsPath), new Path(localPath))
fs.close()
}
到此這篇關于Java API操作Hdfs詳細示例的文章就介紹到這了,更多相關Java API操作Hdfs內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JVM教程之Java代碼編譯和執(zhí)行的整個過程(二)
這篇文章主要介紹了JVM學習筆記第二篇,關于Java代碼編譯和執(zhí)行的整個過程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03
SpringMVC數(shù)據(jù)頁響應ModelAndView實現(xiàn)頁面跳轉
本文主要介紹了SpringMVC數(shù)據(jù)頁響應ModelAndView實現(xiàn)頁面跳轉,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-07-07
SpringBoot 動態(tài)配置Profile環(huán)境的方式
這篇文章主要介紹了SpringBoot 動態(tài)配置Profile環(huán)境的方式,本文通過圖文實例相結合給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-10-10
SpringMVC @GetMapping注解路徑?jīng)_突問題解決
MD5對密碼進行加密存儲是常見的一種加密方式,本文主要介紹了Java雙重MD5加密實現(xiàn)安全登錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-07-07

