java實現(xiàn)MapReduce對文件進(jìn)行切分的示例代碼
比如有海量的文本文件,如訂單,頁面點擊事件的記錄,量特別大,很難搞定。
那么我們該怎樣解決海量數(shù)據(jù)的計算?
1、獲取總行數(shù)
2、計算每個文件中存多少數(shù)據(jù)
3、split切分文件
4、reduce將文件進(jìn)行匯總

例如這里有百萬條數(shù)據(jù),單個文件操作太麻煩,所以我們需要進(jìn)行切分
在切分文件的過程中會出現(xiàn)文件不能整個切分的情況,可能有剩下的數(shù)據(jù)并沒有被讀取到,所以我們每個切分128條數(shù)據(jù),不足128條再保留到一個文件中

創(chuàng)建MapTask
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MapTask extends Thread {
//用來接收具體的哪一個文件
private File file;
private int flag;
public MapTask(File file, int flag) {
this.file = file;
this.flag = flag;
}
@Override
public void run() {
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
/**
* 統(tǒng)計班級人數(shù)HashMap存儲
*/
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
br.close();
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
Set<Map.Entry<String, Integer>> entries = map.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String key = entry.getKey();
Integer value = entry.getValue();
bw.write(key + ":" + value);
bw.newLine();
}
bw.flush();
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
創(chuàng)建Map
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Map {
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 多線程連接池(線程池)
ExecutorService executorService = Executors.newFixedThreadPool(8);
// 獲取文件列表
File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
File[] files = file.listFiles();
//創(chuàng)建多線程對象
int flag = 0;
for (File f : files) {
//為每一個文件啟動一個線程
MapTask mapTask = new MapTask(f, flag);
executorService.submit(mapTask);
flag++;
}
executorService.shutdown();
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
創(chuàng)建ClazzSum
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
public class ClazzSum {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
String line;
HashMap<String, Integer> map = new HashMap<String, Integer>();
while ((line = br.readLine()) != null) {
String clazz = line.split(",")[4];
if (!map.containsKey(clazz)) {
map.put(clazz, 1);
} else {
map.put(clazz, map.get(clazz) + 1);
}
}
System.out.println(map);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
創(chuàng)建split128
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;
public class Split128 {
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(
new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));
//用作標(biāo)記文件,也作為文件名稱
int index = 0;
BufferedWriter bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
ArrayList<String> list = new ArrayList<String>();
String line;
//用作累計讀取了多少行數(shù)據(jù)
int flag = 0;
int row = 0;
while ((line = br.readLine()) != null) {
list.add(line);
flag++;
// flag = 140
if (flag == 140) {// 一個文件讀寫完成,生成新的文件
row = 0 + 128 * index;
for (int i = row; i <= row + 127; i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
/**
* 生成新的文件
* 計數(shù)清零
*/
index++;
flag = 12;
bw = new BufferedWriter(
new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
}
}
//文件讀取剩余128*1.1范圍之內(nèi)
for (int i = list.size() - flag; i < list.size(); i++) {
bw.write(list.get(i));
bw.newLine();
}
bw.flush();
bw.close();
}
}
創(chuàng)建Reduce
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
public class Reduce {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
HashMap<String, Integer> map = new HashMap<String, Integer>();
File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
File[] files = file.listFiles();
for (File f : files) {
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
String clazz = line.split(":")[0];
int sum = Integer.valueOf(line.split(":")[1]);
if (!map.containsKey(clazz)) {
map.put(clazz, sum);
} else {
map.put(clazz, map.get(clazz) + sum);
}
}
}
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(map);
}
}


最后將文件切分了8份,這里采用了線程池,建立線程連接,多個線程同時啟動,比單一文件采用多線程效率更高更好使。
到此這篇關(guān)于java實現(xiàn)MapReduce對文件進(jìn)行切分的示例代碼的文章就介紹到這了,更多相關(guān)java MapReduce 文件切分內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot利用Aop捕捉注解實現(xiàn)業(yè)務(wù)異步執(zhí)行
在開發(fā)過程中,盡量會將比較耗時且并不會影響請求的響應(yīng)結(jié)果的業(yè)務(wù)放在異步線程池中進(jìn)行處理,那么到時什么任務(wù)在執(zhí)行的時候會創(chuàng)建單獨的線程進(jìn)行處理呢?這篇文章主要介紹了Springboot利用Aop捕捉注解實現(xiàn)業(yè)務(wù)異步執(zhí)行2023-04-04
springboot用controller跳轉(zhuǎn)html頁面的實現(xiàn)
這篇文章主要介紹了springboot用controller跳轉(zhuǎn)html頁面的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
java并發(fā)學(xué)習(xí)之Executor源碼解析
這篇文章主要為大家介紹了java并發(fā)學(xué)習(xí)之Executor源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
java連接mysql數(shù)據(jù)庫詳細(xì)步驟解析
以下是對java連接mysql數(shù)據(jù)庫的具體詳細(xì)步驟進(jìn)行了分析介紹,需要的朋友可以過來參考下2013-08-08
java將數(shù)據(jù)寫入內(nèi)存,磁盤的方法
下面小編就為大家分享一篇java將數(shù)據(jù)寫入內(nèi)存,磁盤的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-01-01
fastjson轉(zhuǎn)換對象實體@JsonProperty不生效問題及解決
這篇文章主要介紹了fastjson轉(zhuǎn)換對象實體@JsonProperty不生效問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-08-08

