Java/Web調(diào)用Hadoop進(jìn)行MapReduce示例代碼
Hadoop環(huán)境搭建詳見此文章http://www.dbjr.com.cn/article/33649.htm。
我們已經(jīng)知道Hadoop能夠通過Hadoop jar ***.jar input output的形式通過命令行來調(diào)用,那么如何將其封裝成一個(gè)服務(wù),讓Java/Web來調(diào)用它?使得用戶可以用方便的方式上傳文件到Hadoop并進(jìn)行處理,獲得結(jié)果。首先,***.jar是一個(gè)Hadoop任務(wù)類的封裝,我們可以在沒有jar的情況下運(yùn)行該類的main方法,將必要的參數(shù)傳遞給它。input 和output則將用戶上傳的文件使用Hadoop的JavaAPI put到Hadoop的文件系統(tǒng)中。然后再通過Hadoop的JavaAPI 從文件系統(tǒng)中取得結(jié)果文件。
搭建JavaWeb工程。本文使用Spring、SpringMVC、MyBatis框架, 當(dāng)然,這不是重點(diǎn),就算沒有使用任何框架也能實(shí)現(xiàn)。
項(xiàng)目框架如下:

項(xiàng)目中使用到的jar包如下:


在Spring的配置文件中,加入
<bean id="multipartResolver" class="org.springframework.web.multipart.commons.CommonsMultipartResolver"> <property name="defaultEncoding" value="utf-8" /> <property name="maxUploadSize" value="10485760000" /> <property name="maxInMemorySize" value="40960" /> </bean>
使得項(xiàng)目支持文件上傳。
新建一個(gè)login.jsp 點(diǎn)擊登錄后進(jìn)入user/login

user/login中處理登錄,登錄成功后,【在Hadoop文件系統(tǒng)中創(chuàng)建用戶文件夾】,然后跳轉(zhuǎn)到console.jsp
package com.chenjie.controller;
import java.io.IOException;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.chenjie.pojo.JsonResult;
import com.chenjie.pojo.User;
import com.chenjie.service.UserService;
import com.chenjie.util.AppConfig;
import com.google.gson.Gson;
/**
* 用戶請(qǐng)求控制器
*
* @author Chen
*
*/
@Controller
// 聲明當(dāng)前類為控制器
@RequestMapping("/user")
// 聲明當(dāng)前類的路徑
public class UserController {
@Resource(name = "userService")
private UserService userService;// 由Spring容器注入一個(gè)UserService實(shí)例
/**
* 登錄
*
* @param user
* 用戶
* @param request
* @param response
* @throws IOException
*/
@RequestMapping("/login")
// 聲明當(dāng)前方法的路徑
public String login(User user, HttpServletRequest request,
HttpServletResponse response) throws IOException {
response.setContentType("application/json");// 設(shè)置響應(yīng)內(nèi)容格式為json
User result = userService.login(user);// 調(diào)用UserService的登錄方法
request.getSession().setAttribute("user", result);
if (result != null) {
createHadoopFSFolder(result);
return "console";
}
return "login";
}
public void createHadoopFSFolder(User user) throws IOException {
Configuration conf = new Configuration();
conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));
conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));
FileSystem fileSystem = FileSystem.get(conf);
System.out.println(fileSystem.getUri());
Path file = new Path("/user/" + user.getU_username());
if (fileSystem.exists(file)) {
System.out.println("haddop hdfs user foler exists.");
fileSystem.delete(file, true);
System.out.println("haddop hdfs user foler delete success.");
}
fileSystem.mkdirs(file);
System.out.println("haddop hdfs user foler creat success.");
}
}
console.jsp中進(jìn)行文件上傳和任務(wù)提交、

文件上傳和任務(wù)提交:
package com.chenjie.controller;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.multipart.commons.CommonsMultipartResolver;
import com.chenjie.pojo.User;
import com.chenjie.util.Utils;
@Controller
// 聲明當(dāng)前類為控制器
@RequestMapping("/hadoop")
// 聲明當(dāng)前類的路徑
public class HadoopController {
@RequestMapping("/upload")
// 聲明當(dāng)前方法的路徑
//文件上傳
public String upload(HttpServletRequest request,
HttpServletResponse response) throws IOException {
List<String> fileList = (List<String>) request.getSession()
.getAttribute("fileList");//得到用戶已上傳文件列表
if (fileList == null)
fileList = new ArrayList<String>();//如果文件列表為空,則新建
User user = (User) request.getSession().getAttribute("user");
if (user == null)
return "login";//如果用戶未登錄,則跳轉(zhuǎn)登錄頁面
CommonsMultipartResolver multipartResolver = new CommonsMultipartResolver(
request.getSession().getServletContext());//得到在Spring配置文件中注入的文件上傳組件
if (multipartResolver.isMultipart(request)) {//如果請(qǐng)求是文件請(qǐng)求
MultipartHttpServletRequest multiRequest = (MultipartHttpServletRequest) request;
Iterator<String> iter = multiRequest.getFileNames();//得到文件名迭代器
while (iter.hasNext()) {
MultipartFile file = multiRequest.getFile((String) iter.next());
if (file != null) {
String fileName = file.getOriginalFilename();
File folder = new File("/home/chenjie/CJHadoopOnline/"
+ user.getU_username());
if (!folder.exists()) {
folder.mkdir();//如果文件不目錄存在,則在服務(wù)器本地創(chuàng)建
}
String path = "/home/chenjie/CJHadoopOnline/"
+ user.getU_username() + "/" + fileName;
File localFile = new File(path);
file.transferTo(localFile);//將上傳文件拷貝到服務(wù)器本地目錄
// fileList.add(path);
}
handleUploadFiles(user, fileList);//處理上傳文件
}
}
request.getSession().setAttribute("fileList", fileList);//將上傳文件列表保存在Session中
return "console";//返回console.jsp繼續(xù)上傳文件
}
@RequestMapping("/wordcount")
//調(diào)用Hadoop進(jìn)行mapreduce
public void wordcount(HttpServletRequest request,
HttpServletResponse response) {
System.out.println("進(jìn)入controller wordcount ");
User user = (User) request.getSession().getAttribute("user");
System.out.println(user);
// if(user == null)
// return "login";
WordCount c = new WordCount();//新建單詞統(tǒng)計(jì)任務(wù)
String username = user.getU_username();
String input = "hdfs://chenjie-virtual-machine:9000/user/" + username
+ "/wordcountinput";//指定Hadoop文件系統(tǒng)的輸入文件夾
String output = "hdfs://chenjie-virtual-machine:9000/user/" + username
+ "/wordcountoutput";//指定Hadoop文件系統(tǒng)的輸出文件夾
String reslt = output + "/part-r-00000";//默認(rèn)輸出文件
try {
Thread.sleep(3*1000);
c.main(new String[] { input, output });//調(diào)用單詞統(tǒng)計(jì)任務(wù)
Configuration conf = new Configuration();//新建Hadoop配置
conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加Hadoop配置,找到Hadoop部署信息
conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//Hadoop配置,找到文件系統(tǒng)
FileSystem fileSystem = FileSystem.get(conf);//得打文件系統(tǒng)
Path file = new Path(reslt);//找到輸出結(jié)果文件
FSDataInputStream inStream = fileSystem.open(file);//打開
URI uri = file.toUri();//得到輸出文件路徑
System.out.println(uri);
String data = null;
while ((data = inStream.readLine()) != null) {
//System.out.println(data);
response.getOutputStream().println(data);//講結(jié)果文件寫回用戶網(wǎng)頁
}
// InputStream in = fileSystem.open(file);
// OutputStream out = new FileOutputStream("result.txt");
// IOUtils.copyBytes(in, out, 4096, true);
inStream.close();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
@RequestMapping("/MapReduceStates")
//得到MapReduce的狀態(tài)
public void mapreduce(HttpServletRequest request,
HttpServletResponse response) {
float[] progress=new float[2];
try {
Configuration conf1=new Configuration();
conf1.set("mapred.job.tracker", Utils.JOBTRACKER);
JobStatus jobStatus = Utils.getJobStatus(conf1);
// while(!jobStatus.isJobComplete()){
// progress = Utils.getMapReduceProgess(jobStatus);
// response.getOutputStream().println("map:" + progress[0] + "reduce:" + progress[1]);
// Thread.sleep(1000);
// }
JobConf jc = new JobConf(conf1);
JobClient jobClient = new JobClient(jc);
JobStatus[] jobsStatus = jobClient.getAllJobs();
//這樣就得到了一個(gè)JobStatus數(shù)組,隨便取出一個(gè)元素取名叫jobStatus
jobStatus = jobsStatus[0];
JobID jobID = jobStatus.getJobID(); //通過JobStatus獲取JobID
RunningJob runningJob = jobClient.getJob(jobID); //通過JobID得到RunningJob對(duì)象
runningJob.getJobState();//可以獲取作業(yè)狀態(tài),狀態(tài)有五種,為JobStatus.Failed 、JobStatus.KILLED、JobStatus.PREP、JobStatus.RUNNING、JobStatus.SUCCEEDED
jobStatus.getUsername();//可以獲取運(yùn)行作業(yè)的用戶名。
runningJob.getJobName();//可以獲取作業(yè)名。
jobStatus.getStartTime();//可以獲取作業(yè)的開始時(shí)間,為UTC毫秒數(shù)。
float map = runningJob.mapProgress();//可以獲取Map階段完成的比例,0~1,
System.out.println("map=" + map);
float reduce = runningJob.reduceProgress();//可以獲取Reduce階段完成的比例。
System.out.println("reduce="+reduce);
runningJob.getFailureInfo();//可以獲取失敗信息。
runningJob.getCounters();//可以獲取作業(yè)相關(guān)的計(jì)數(shù)器,計(jì)數(shù)器的內(nèi)容和作業(yè)監(jiān)控頁面上看到的計(jì)數(shù)器的值一樣。
} catch (IOException e) {
progress[0] = 0;
progress[1] = 0;
}
request.getSession().setAttribute("map", progress[0]);
request.getSession().setAttribute("reduce", progress[1]);
}
//處理文件上傳
public void handleUploadFiles(User user, List<String> fileList) {
File folder = new File("/home/chenjie/CJHadoopOnline/"
+ user.getU_username());
if (!folder.exists())
return;
if (folder.isDirectory()) {
File[] files = folder.listFiles();
for (File file : files) {
System.out.println(file.getName());
try {
putFileToHadoopFSFolder(user, file, fileList);//將單個(gè)文件上傳到Hadoop文件系統(tǒng)
} catch (IOException e) {
System.err.println(e.getMessage());
}
}
}
}
//將單個(gè)文件上傳到Hadoop文件系統(tǒng)
private void putFileToHadoopFSFolder(User user, File file,
List<String> fileList) throws IOException {
Configuration conf = new Configuration();
conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));
conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));
FileSystem fileSystem = FileSystem.get(conf);
System.out.println(fileSystem.getUri());
Path localFile = new Path(file.getAbsolutePath());
Path foler = new Path("/user/" + user.getU_username()
+ "/wordcountinput");
if (!fileSystem.exists(foler)) {
fileSystem.mkdirs(foler);
}
Path hadoopFile = new Path("/user/" + user.getU_username()
+ "/wordcountinput/" + file.getName());
// if (fileSystem.exists(hadoopFile)) {
// System.out.println("File exists.");
// } else {
// fileSystem.mkdirs(hadoopFile);
// }
fileSystem.copyFromLocalFile(true, true, localFile, hadoopFile);
fileList.add(hadoopFile.toUri().toString());
}
}
啟動(dòng)Hadoop:

運(yùn)行結(jié)果:
可以在任意平臺(tái)下,登錄該項(xiàng)目地址,上傳文件,得到結(jié)果。






運(yùn)行成功。
源代碼:https://github.com/tudoupaisimalingshu/CJHadoopOnline
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java SE使用數(shù)組實(shí)現(xiàn)高速數(shù)字轉(zhuǎn)換功能
隨著大數(shù)據(jù)時(shí)代的到來,數(shù)字轉(zhuǎn)換功能變得越來越重要,在Java開發(fā)中,數(shù)字轉(zhuǎn)換功能也是經(jīng)常用到的,下面我們就來學(xué)習(xí)一下如何使用Java SE數(shù)組實(shí)現(xiàn)高速的數(shù)字轉(zhuǎn)換功能吧2023-11-11
idea啟動(dòng)項(xiàng)目報(bào)端口號(hào)沖突或被占用的解決方法
這篇文章主要介紹了idea啟動(dòng)項(xiàng)目報(bào)端口號(hào)沖突或被占用的解決方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-10-10
關(guān)于post請(qǐng)求內(nèi)容無法重復(fù)獲取的解決方法
這篇文章主要介紹了關(guān)于post請(qǐng)求內(nèi)容無法重復(fù)獲取的解決方法,文中通過代碼示例給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-03-03
關(guān)于JAVA8的 Stream學(xué)習(xí)
這篇文章主要介紹了JAVA8 Stream學(xué)習(xí)方法的相關(guān)資料,需要的朋友可以參考下面文章內(nèi)容2021-09-09
java怎樣動(dòng)態(tài)獲取泛型參數(shù)的類型
在Java中,泛型信息在編譯時(shí)會(huì)被擦除,但可以通過特定API獲取運(yùn)行時(shí)的泛型參數(shù)類型,主要API包括Class的getGenericSuperclass()和getGenericInterfaces()方法,以及ParameterizedType的getActualTypeArguments()方法2024-09-09
Spring實(shí)戰(zhàn)之使用注解實(shí)現(xiàn)聲明式事務(wù)操作示例
這篇文章主要介紹了Spring實(shí)戰(zhàn)之使用注解實(shí)現(xiàn)聲明式事務(wù)操作,結(jié)合實(shí)例形式詳細(xì)分析了spring使用注解實(shí)現(xiàn)聲明式事務(wù)相關(guān)配置、接口實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01
JDK動(dòng)態(tài)代理過程原理及手寫實(shí)現(xiàn)詳解
這篇文章主要為大家介紹了JDK動(dòng)態(tài)代理過程原理及手寫實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09

