SpringBoot集成Hadoop對HDFS的文件操作方法
一、對HDFS操作設(shè)計以下幾個主要的類:
Configuration
:封裝了客戶端或者服務(wù)器的配置信息;
FileSystem
:此類的對象是一個文件系統(tǒng)對象,可以用該對象的一些方法來對文件進行操作通過FileSystem
的靜態(tài)方法get
獲得該對象,例:FileSystem hdfs = FileSystem.get(conf)
;
FSDataInputStream
:這是HDFS中的輸入流,通過由FileSystem
的open
方法獲取;
FSDataOutputStream
:這是HDFS中的輸出流,通過由FileSystem
的create
方法獲取。
二、依賴配置
Maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hdfs</groupId> <artifactId>HadoopTest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>HadoopTest</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>cn.bestwu</groupId> <artifactId>ik-analyzers</artifactId> <version>5.1.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
Application.properties
# tomcat thread = 200 server.tomcat.max-threads=1000 # edit tomcat port server.port=8900 # session time 30 server.session-timeout=60 spring.application.name=hadoop spring.servlet.multipart.max-file-size=50MB spring.servlet.multipart.max-request-size=50MB hdfs.path=hdfs://localhost:9000 hdfs.username=linhaiy logging.config=classpath:logback.xml
三、HDFS文件操作接口開發(fā)
Config
package com.hadoop.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * HDFS配置類 * @author linhaiy * @date 2019.05.18 */ @Configuration public class HdfsConfig { @Value("${hdfs.path}") private String path; public String getPath() { return path; } public void setPath(String path) { this.path = path; } }
Entity
package com.hadoop.hdfs.entity; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 用戶實體類 * @author linhaiy * @date 2019.05.18 */ public class User implements Writable { private String username; private Integer age; private String address; public User() { super(); // TODO Auto-generated constructor stub } public User(String username, Integer age, String address) { super(); this.username = username; this.age = age; this.address = address; } @Override public void write(DataOutput output) throws IOException { // 把對象序列化 output.writeChars(username); output.writeInt(age); output.writeChars(address); } @Override public void readFields(DataInput input) throws IOException { // 把序列化的對象讀取到內(nèi)存中 username = input.readUTF(); age = input.readInt(); address = input.readUTF(); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User [username=" + username + ", age=" + age + ", address=" + address + "]"; } }
Service
package com.hadoop.hdfs.service; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; import com.hadoop.util.JsonUtil; @Component public class HdfsService { @Value("${hdfs.path}") private String path; @Value("${hdfs.username}") private String username; private static String hdfsPath; private static String hdfsName; private static final int bufferSize = 1024 * 1024 * 64; /** * 獲取HDFS配置信息 * @return */ private static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); return configuration; } /** * 獲取HDFS文件系統(tǒng)對象 * @return * @throws Exception */ public static FileSystem getFileSystem() throws Exception { // 客戶端去操作hdfs時是有一個用戶身份的,默認情況下hdfs客戶端api會從jvm中獲取一個參數(shù)作為自己的用戶身份 // DHADOOP_USER_NAME=hadoop // 也可以在構(gòu)造客戶端fs對象時,通過參數(shù)傳遞進去 FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName); return fileSystem; } /** * 在HDFS創(chuàng)建文件夾 * @param path * @return * @throws Exception */ public static boolean mkdir(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } if (existFile(path)) { return true; } FileSystem fs = getFileSystem(); // 目標路徑 Path srcPath = new Path(path); boolean isOk = fs.mkdirs(srcPath); fs.close(); return isOk; } /** * 判斷HDFS文件是否存在 * @param path * @return * @throws Exception */ public static boolean existFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } FileSystem fs = getFileSystem(); Path srcPath = new Path(path); boolean isExists = fs.exists(srcPath); return isExists; } /** * 讀取HDFS目錄信息 * @param path * @return * @throws Exception */ public static List<Map<String, Object>> readPathInfo(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目標路徑 Path newPath = new Path(path); FileStatus[] statusList = fs.listStatus(newPath); List<Map<String, Object>> list = new ArrayList<>(); if (null != statusList && statusList.length > 0) { for (FileStatus fileStatus : statusList) { Map<String, Object> map = new HashMap<>(); map.put("filePath", fileStatus.getPath()); map.put("fileStatus", fileStatus.toString()); list.add(map); } return list; } else { return null; } } /** * HDFS創(chuàng)建文件 * @param path * @param file * @throws Exception */ public static void createFile(String path, MultipartFile file) throws Exception { if (StringUtils.isEmpty(path) || null == file.getBytes()) { return; } String fileName = file.getOriginalFilename(); FileSystem fs = getFileSystem(); // 上傳時默認當(dāng)前目錄,后面自動拼接文件的目錄 Path newPath = new Path(path + "/" + fileName); // 打開一個輸出流 FSDataOutputStream outputStream = fs.create(newPath); outputStream.write(file.getBytes()); outputStream.close(); fs.close(); } /** * 讀取HDFS文件內(nèi)容 * @param path * @return * @throws Exception */ public static String readFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目標路徑 Path srcPath = new Path(path); FSDataInputStream inputStream = null; try { inputStream = fs.open(srcPath); // 防止中文亂碼 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String lineTxt = ""; StringBuffer sb = new StringBuffer(); while ((lineTxt = reader.readLine()) != null) { sb.append(lineTxt); } return sb.toString(); } finally { inputStream.close(); fs.close(); } } /** * 讀取HDFS文件列表 * @param path * @return * @throws Exception */ public static List<Map<String, String>> listFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目標路徑 Path srcPath = new Path(path); // 遞歸找到所有文件 RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true); List<Map<String, String>> returnList = new ArrayList<>(); while (filesList.hasNext()) { LocatedFileStatus next = filesList.next(); String fileName = next.getPath().getName(); Path filePath = next.getPath(); Map<String, String> map = new HashMap<>(); map.put("fileName", fileName); map.put("filePath", filePath.toString()); returnList.add(map); } fs.close(); return returnList; } /** * HDFS重命名文件 * @param oldName * @param newName * @return * @throws Exception */ public static boolean renameFile(String oldName, String newName) throws Exception { if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { return false; } FileSystem fs = getFileSystem(); // 原文件目標路徑 Path oldPath = new Path(oldName); // 重命名目標路徑 Path newPath = new Path(newName); boolean isOk = fs.rename(oldPath, newPath); fs.close(); return isOk; } /** * 刪除HDFS文件 * @param path * @return * @throws Exception */ public static boolean deleteFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } if (!existFile(path)) { return false; } FileSystem fs = getFileSystem(); Path srcPath = new Path(path); boolean isOk = fs.deleteOnExit(srcPath); fs.close(); return isOk; } /** * 上傳HDFS文件 * @param path * @param uploadPath * @throws Exception */ public static void uploadFile(String path, String uploadPath) throws Exception { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) { return; } FileSystem fs = getFileSystem(); // 上傳路徑 Path clientPath = new Path(path); // 目標路徑 Path serverPath = new Path(uploadPath); // 調(diào)用文件系統(tǒng)的文件復(fù)制方法,第一個參數(shù)是否刪除原文件true為刪除,默認為false fs.copyFromLocalFile(false, clientPath, serverPath); fs.close(); } /** * 下載HDFS文件 * @param path * @param downloadPath * @throws Exception */ public static void downloadFile(String path, String downloadPath) throws Exception { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) { return; } FileSystem fs = getFileSystem(); // 上傳路徑 Path clientPath = new Path(path); // 目標路徑 Path serverPath = new Path(downloadPath); // 調(diào)用文件系統(tǒng)的文件復(fù)制方法,第一個參數(shù)是否刪除原文件true為刪除,默認為false fs.copyToLocalFile(false, clientPath, serverPath); fs.close(); } /** * HDFS文件復(fù)制 * @param sourcePath * @param targetPath * @throws Exception */ public static void copyFile(String sourcePath, String targetPath) throws Exception { if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) { return; } FileSystem fs = getFileSystem(); // 原始文件路徑 Path oldPath = new Path(sourcePath); // 目標路徑 Path newPath = new Path(targetPath); FSDataInputStream inputStream = null; FSDataOutputStream outputStream = null; try { inputStream = fs.open(oldPath); outputStream = fs.create(newPath); IOUtils.copyBytes(inputStream, outputStream, bufferSize, false); } finally { inputStream.close(); outputStream.close(); fs.close(); } } /** * 打開HDFS上的文件并返回byte數(shù)組 * @param path * @return * @throws Exception */ public static byte[] openFileToBytes(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目標路徑 Path srcPath = new Path(path); try { FSDataInputStream inputStream = fs.open(srcPath); return IOUtils.readFullyToByteArray(inputStream); } finally { fs.close(); } } /** * 打開HDFS上的文件并返回java對象 * @param path * @return * @throws Exception */ public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } String jsonStr = readFile(path); return JsonUtil.fromObject(jsonStr, clazz); } /** * 獲取某個文件在HDFS的集群位置 * @param path * @return * @throws Exception */ public static BlockLocation[] getFileBlockLocations(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目標路徑 Path srcPath = new Path(path); FileStatus fileStatus = fs.getFileStatus(srcPath); return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); } @PostConstruct public void getPath() { hdfsPath = this.path; } @PostConstruct public void getName() { hdfsName = this.username; } public static String getHdfsPath() { return hdfsPath; } public String getUsername() { return username; } }
Controller
package com.hadoop.hdfs.controller; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.BlockLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import com.hadoop.hdfs.entity.User; import com.hadoop.hdfs.service.HdfsService; import com.hadoop.util.Result; @RestController @RequestMapping("/hadoop/hdfs") public class HdfsAction { private static Logger LOGGER = LoggerFactory.getLogger(HdfsAction.class); /** * 創(chuàng)建文件夾 * @param path * @return * @throws Exception */ @RequestMapping(value = "mkdir", method = RequestMethod.POST) @ResponseBody public Result mkdir(@RequestParam("path") String path) throws Exception { if (StringUtils.isEmpty(path)) { LOGGER.debug("請求參數(shù)為空"); return new Result(Result.FAILURE, "請求參數(shù)為空"); } // 創(chuàng)建空文件夾 boolean isOk = HdfsService.mkdir(path); if (isOk) { LOGGER.debug("文件夾創(chuàng)建成功"); return new Result(Result.SUCCESS, "文件夾創(chuàng)建成功"); } else { LOGGER.debug("文件夾創(chuàng)建失敗"); return new Result(Result.FAILURE, "文件夾創(chuàng)建失敗"); } } /** * 讀取HDFS目錄信息 * @param path * @return * @throws Exception */ @PostMapping("/readPathInfo") public Result readPathInfo(@RequestParam("path") String path) throws Exception { List<Map<String, Object>> list = HdfsService.readPathInfo(path); return new Result(Result.SUCCESS, "讀取HDFS目錄信息成功", list); } /** * 獲取HDFS文件在集群中的位置 * @param path * @return * @throws Exception */ @PostMapping("/getFileBlockLocations") public Result getFileBlockLocations(@RequestParam("path") String path) throws Exception { BlockLocation[] blockLocations = HdfsService.getFileBlockLocations(path); return new Result(Result.SUCCESS, "獲取HDFS文件在集群中的位置", blockLocations); } /** * 創(chuàng)建文件 * @param path * @return * @throws Exception */ @PostMapping("/createFile") public Result createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file) throws Exception { if (StringUtils.isEmpty(path) || null == file.getBytes()) { return new Result(Result.FAILURE, "請求參數(shù)為空"); } HdfsService.createFile(path, file); return new Result(Result.SUCCESS, "創(chuàng)建文件成功"); } /** * 讀取HDFS文件內(nèi)容 * @param path * @return * @throws Exception */ @PostMapping("/readFile") public Result readFile(@RequestParam("path") String path) throws Exception { String targetPath = HdfsService.readFile(path); return new Result(Result.SUCCESS, "讀取HDFS文件內(nèi)容", targetPath); } /** * 讀取HDFS文件轉(zhuǎn)換成Byte類型 * @param path * @return * @throws Exception */ @PostMapping("/openFileToBytes") public Result openFileToBytes(@RequestParam("path") String path) throws Exception { byte[] files = HdfsService.openFileToBytes(path); return new Result(Result.SUCCESS, "讀取HDFS文件轉(zhuǎn)換成Byte類型", files); } /** * 讀取HDFS文件裝換成User對象 * @param path * @return * @throws Exception */ @PostMapping("/openFileToUser") public Result openFileToUser(@RequestParam("path") String path) throws Exception { User user = HdfsService.openFileToObject(path, User.class); return new Result(Result.SUCCESS, "讀取HDFS文件裝換成User對象", user); } /** * 讀取文件列表 * @param path * @return * @throws Exception */ @PostMapping("/listFile") public Result listFile(@RequestParam("path") String path) throws Exception { if (StringUtils.isEmpty(path)) { return new Result(Result.FAILURE, "請求參數(shù)為空"); } List<Map<String, String>> returnList = HdfsService.listFile(path); return new Result(Result.SUCCESS, "讀取文件列表成功", returnList); } /** * 重命名文件 * @param oldName * @param newName * @return * @throws Exception */ @PostMapping("/renameFile") public Result renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName) throws Exception { if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { return new Result(Result.FAILURE, "請求參數(shù)為空"); } boolean isOk = HdfsService.renameFile(oldName, newName); if (isOk) { return new Result(Result.SUCCESS, "文件重命名成功"); } else { return new Result(Result.FAILURE, "文件重命名失敗"); } } /** * 刪除文件 * @param path * @return * @throws Exception */ @PostMapping("/deleteFile") public Result deleteFile(@RequestParam("path") String path) throws Exception { boolean isOk = HdfsService.deleteFile(path); if (isOk) { return new Result(Result.SUCCESS, "delete file success"); } else { return new Result(Result.FAILURE, "delete file fail"); } } /** * 上傳文件 * @param path * @param uploadPath * @return * @throws Exception */ @PostMapping("/uploadFile") public Result uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath) throws Exception { HdfsService.uploadFile(path, uploadPath); return new Result(Result.SUCCESS, "upload file success"); } /** * 下載文件 * @param path * @param downloadPath * @return * @throws Exception */ @PostMapping("/downloadFile") public Result downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath) throws Exception { HdfsService.downloadFile(path, downloadPath); return new Result(Result.SUCCESS, "download file success"); } /** * HDFS文件復(fù)制 * @param sourcePath * @param targetPath * @return * @throws Exception */ @PostMapping("/copyFile") public Result copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath) throws Exception { HdfsService.copyFile(sourcePath, targetPath); return new Result(Result.SUCCESS, "copy file success"); } /** * 查看文件是否已存在 * @param path * @return * @throws Exception */ @PostMapping("/existFile") public Result existFile(@RequestParam("path") String path) throws Exception { boolean isExist = HdfsService.existFile(path); return new Result(Result.SUCCESS, "file isExist: " + isExist); } }
四、一些測試結(jié)果截圖
到此這篇關(guān)于SpringBoot集成Hadoop——對HDFS的文件操作的文章就介紹到這了,更多相關(guān)SpringBoot 操作HDFS內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中synchronized關(guān)鍵字引出的多種鎖 問題
synchronized關(guān)鍵字是JAVA中常用的同步功能,提供了簡單易用的鎖功能。這篇文章主要介紹了Java中synchronized關(guān)鍵字引出的多種鎖問題,需要的朋友可以參考下2019-07-07MyBatis-Plus動態(tài)表名使用selectPage方法不生效問題解析與解決方案
MyBatis-Plus是MyBatis的增強工具,動態(tài)表名是MyBatis-Plus的一個重要功能之一,一些開發(fā)者在使用selectPage方法時可能會遇到動態(tài)表名不生效的問題,本文將深入分析這個問題的原因,并提供相應(yīng)的解決方案,需要的朋友可以參考下2023-12-12Java中LambdaQueryWrapper的常用方法詳解
這篇文章主要給大家介紹了關(guān)于Java中LambdaQueryWrapper常用方法的相關(guān)資料,lambdaquerywrapper是一個Java庫,用于構(gòu)建類型安全的Lambda表達式查詢,需要的朋友可以參考下2023-11-11