java使用多線程讀取超大文件
接上次寫的“JAVA讀取超大文件”。在讀取超過10G的文件時(shí)會發(fā)現(xiàn)一次讀一行的速度實(shí)在是不能接受,想到使用多線程+FileChannel來做一個使用多線程版本。
基本思路如下:
1.計(jì)算出文件總大小
2.分段處理,計(jì)算出每個線程讀取文件的開始與結(jié)束位置
(文件大小/線程數(shù))*N,N是指第幾個線程,這樣能得到每個線程在讀該文件的大概起始位置
使用"大概起始位置",作為讀文件的開始偏移量(fileChannel.position("大概起始位置")),來讀取該文件,直到讀到第一個換行符,記錄下這個換行符的位置,作為該線程的準(zhǔn)確起 始位置.同時(shí)它也是上一個線程的結(jié)束位置.最后一個線程的結(jié)束位置也直接設(shè)置為-1
3.啟動線程,每個線程從開始位置讀取到結(jié)束位置為止
代碼如下:
讀文件工具類
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Observable;
/**
* Created with IntelliJ IDEA.
* User: okey
* Date: 14-4-2
* Time: 下午3:12
* 讀取文件
*/
public class ReadFile extends Observable {
private int bufSize = 1024;
// 換行符
private byte key = "\n".getBytes()[0];
// 當(dāng)前行數(shù)
private long lineNum = 0;
// 文件編碼,默認(rèn)為gb2312
private String encode = "gb2312";
// 具體業(yè)務(wù)邏輯監(jiān)聽器
private ReaderFileListener readerListener;
public void setEncode(String encode) {
this.encode = encode;
}
public void setReaderListener(ReaderFileListener readerListener) {
this.readerListener = readerListener;
}
/**
* 獲取準(zhǔn)確開始位置
* @param file
* @param position
* @return
* @throws Exception
*/
public long getStartNum(File file, long position) throws Exception {
long startNum = position;
FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
fcin.position(position);
try {
int cache = 1024;
ByteBuffer rBuffer = ByteBuffer.allocate(cache);
// 每次讀取的內(nèi)容
byte[] bs = new byte[cache];
// 緩存
byte[] tempBs = new byte[0];
String line = "";
while (fcin.read(rBuffer) != -1) {
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果發(fā)現(xiàn)有上次未讀完的緩存,則將它加到當(dāng)前讀取的內(nèi)容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 獲取開始位置之后的第一個換行符
int endIndex = indexOf(newStrByte, 0);
if (endIndex != -1) {
return startNum + endIndex;
}
tempBs = substring(newStrByte, 0, newStrByte.length);
startNum += 1024;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
fcin.close();
}
return position;
}
/**
* 從設(shè)置的開始位置讀取文件,一直到結(jié)束為止。如果 end設(shè)置為負(fù)數(shù),剛讀取到文件末尾
* @param fullPath
* @param start
* @param end
* @throws Exception
*/
public void readFileByLine(String fullPath, long start, long end) throws Exception {
File fin = new File(fullPath);
if (fin.exists()) {
FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();
fcin.position(start);
try {
ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
// 每次讀取的內(nèi)容
byte[] bs = new byte[bufSize];
// 緩存
byte[] tempBs = new byte[0];
String line = "";
// 當(dāng)前讀取文件位置
long nowCur = start;
while (fcin.read(rBuffer) != -1) {
nowCur += bufSize;
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果發(fā)現(xiàn)有上次未讀完的緩存,則將它加到當(dāng)前讀取的內(nèi)容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 是否已經(jīng)讀到最后一位
boolean isEnd = false;
// 如果當(dāng)前讀取的位數(shù)已經(jīng)比設(shè)置的結(jié)束位置大的時(shí)候,將讀取的內(nèi)容截取到設(shè)置的結(jié)束位置
if (end > 0 && nowCur > end) {
// 緩存長度 - 當(dāng)前已經(jīng)讀取位數(shù) - 最后位數(shù)
int l = newStrByte.length - (int) (nowCur - end);
newStrByte = substring(newStrByte, 0, l);
isEnd = true;
}
int fromIndex = 0;
int endIndex = 0;
// 每次讀一行內(nèi)容,以 key(默認(rèn)為\n) 作為結(jié)束符
while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
byte[] bLine = substring(newStrByte, fromIndex, endIndex);
line = new String(bLine, 0, bLine.length, encode);
lineNum++;
// 輸出一行內(nèi)容,處理方式由調(diào)用方提供
readerListener.outLine(line.trim(), lineNum, false);
fromIndex = endIndex + 1;
}
// 將未讀取完成的內(nèi)容放到緩存中
tempBs = substring(newStrByte, fromIndex, newStrByte.length);
if (isEnd) {
break;
}
}
// 將剩下的最后內(nèi)容作為一行,輸出,并指明這是最后一行
String lineStr = new String(tempBs, 0, tempBs.length, encode);
readerListener.outLine(lineStr.trim(), lineNum, true);
} catch (Exception e) {
e.printStackTrace();
} finally {
fcin.close();
}
} else {
throw new FileNotFoundException("沒有找到文件:" + fullPath);
}
// 通知觀察者,當(dāng)前工作已經(jīng)完成
setChanged();
notifyObservers(start+"-"+end);
}
/**
* 查找一個byte[]從指定位置之后的一個換行符位置
*
* @param src
* @param fromIndex
* @return
* @throws Exception
*/
private int indexOf(byte[] src, int fromIndex) throws Exception {
for (int i = fromIndex; i < src.length; i++) {
if (src[i] == key) {
return i;
}
}
return -1;
}
/**
* 從指定開始位置讀取一個byte[]直到指定結(jié)束位置為止生成一個全新的byte[]
*
* @param src
* @param fromIndex
* @param endIndex
* @return
* @throws Exception
*/
private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
int size = endIndex - fromIndex;
byte[] ret = new byte[size];
System.arraycopy(src, fromIndex, ret, 0, size);
return ret;
}
}
讀文件線程
/**
* Created with IntelliJ IDEA.
* User: okey
* Date: 14-4-2
* Time: 下午4:50
* To change this template use File | Settings | File Templates.
*/
public class ReadFileThread extends Thread {
private ReaderFileListener processPoiDataListeners;
private String filePath;
private long start;
private long end;
public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {
this.setName(this.getName()+"-ReadFileThread");
this.start = start;
this.end = end;
this.filePath = file;
this.processPoiDataListeners = processPoiDataListeners;
}
@Override
public void run() {
ReadFile readFile = new ReadFile();
readFile.setReaderListener(processPoiDataListeners);
readFile.setEncode(processPoiDataListeners.getEncode());
// readFile.addObserver();
try {
readFile.readFileByLine(filePath, start, end + 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
具體業(yè)務(wù)邏輯監(jiān)聽
/**
* Created with Okey
* User: Okey
* Date: 13-3-14
* Time: 下午3:19
* NIO逐行讀數(shù)據(jù)回調(diào)方法
*/
public abstract class ReaderFileListener {
// 一次讀取行數(shù),默認(rèn)為500
private int readColNum = 500;
private String encode;
private List<String> list = new ArrayList<String>();
/**
* 設(shè)置一次讀取行數(shù)
* @param readColNum
*/
protected void setReadColNum(int readColNum) {
this.readColNum = readColNum;
}
public String getEncode() {
return encode;
}
public void setEncode(String encode) {
this.encode = encode;
}
/**
* 每讀取到一行數(shù)據(jù),添加到緩存中
* @param lineStr 讀取到的數(shù)據(jù)
* @param lineNum 行號
* @param over 是否讀取完成
* @throws Exception
*/
public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
if(null != lineStr)
list.add(lineStr);
if (!over && (lineNum % readColNum == 0)) {
output(list);
list.clear();
} else if (over) {
output(list);
list.clear();
}
}
/**
* 批量輸出
*
* @param stringList
* @throws Exception
*/
public abstract void output(List<String> stringList) throws Exception;
}
線程調(diào)度
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* User: okey
* Date: 14-4-1
* Time: 下午6:03
* To change this template use File | Settings | File Templates.
*/
public class BuildData {
public static void main(String[] args) throws Exception {
File file = new File("E:\\1396341974289.csv");
FileInputStream fis = null;
try {
ReadFile readFile = new ReadFile();
fis = new FileInputStream(file);
int available = fis.available();
int maxThreadNum = 50;
// 線程粗略開始位置
int i = available / maxThreadNum;
for (int j = 0; j < maxThreadNum; j++) {
// 計(jì)算精確開始位置
long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;
// 具體監(jiān)聽實(shí)現(xiàn)
ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk");
new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
現(xiàn)在就可以盡情的調(diào)整 maxThreadNum來享受風(fēng)一般的速度吧!
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot啟動執(zhí)行sql腳本的3種方法實(shí)例
在應(yīng)用程序啟動后,可以自動執(zhí)行建庫、建表等SQL腳本,下面這篇文章主要給大家介紹了關(guān)于SpringBoot啟動執(zhí)行sql腳本的3種方法,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-01-01
Spring詳細(xì)講解FactoryBean接口的使用
這篇文章主要為大家介紹了Spring容器FactoryBean工廠實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
Java的線程池ThreadPoolExecutor及多種線程池實(shí)現(xiàn)詳解
這篇文章主要介紹了Java的線程池ThreadPoolExecutor及多種線程池實(shí)現(xiàn)詳解,ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數(shù)量,之所以將信息存儲在一個變量中,是為了保證原子性,需要的朋友可以參考下2024-01-01
學(xué)習(xí)Java之自定義異常與NullPointerException的處理
有時(shí)候Java自身提供的異常類并不能很好地表達(dá)我們的需求,所以這時(shí)候我們就可以自定義異常,也就是說,我們可以制造出一個自己的異常類,這樣就可以拋出或捕獲自己的異常了,本文就給大家詳細(xì)講講Java自定義異常與NullPointerException的處理2023-08-08
解決idea啟動報(bào)錯javax.imageio.IIOException的問題
這篇文章主要介紹了idea啟動報(bào)錯javax.imageio.IIOException,解決打不開idea問題,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09
在java中使用SPI創(chuàng)建可擴(kuò)展的應(yīng)用程序操作
這篇文章主要介紹了在java中使用SPI創(chuàng)建可擴(kuò)展的應(yīng)用程序操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09

