Java多線程實(shí)現(xiàn)快速切分文件的程序
前段時(shí)間需要進(jìn)行大批量數(shù)據(jù)導(dǎo)入,DBA給提供的是CVS文件,但是每個(gè)CVS文件都好幾個(gè)GB大小,直接進(jìn)行l(wèi)oad,數(shù)據(jù)庫很慢還會(huì)產(chǎn)生內(nèi)存不足的問題,為了實(shí)現(xiàn)這個(gè)功能,寫了個(gè)快速切分文件的程序。
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class FileSplitUtil {
private final static Logger log = LogManager.getLogger(FileSplitUtil.class);
private static final long originFileSize = 1024 * 1024 * 100;// 100M
private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文亂碼,必須取2的N次方
/**
* CVS文件分隔符
*/
private static final char cvsSeparator = '^';
public static void main(String args[]){
long start = System.currentTimeMillis();
try {
String fileName = "D:\\csvtest\\aa.csv";
File sourceFile = new File(fileName);
if (sourceFile.length() >= originFileSize) {
String cvsFileName = fileName.replaceAll("\\\\", "/");
FileSplitUtil fileSplitUtil = new FileSplitUtil();
List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);
for(String part:parts){
System.out.println("partName is:"+part);
}
}
System.out.println("總文件長度"+sourceFile.length()+",拆分文件耗時(shí):" + (System.currentTimeMillis() - start) + "ms.");
}catch (Exception e){
log.info(e.getStackTrace());
}
}
/**
* 拆分文件
*
* @param fileName 待拆分的完整文件名
* @param byteSize 按多少字節(jié)大小拆分
* @return 拆分后的文件名列表
*/
public List<String> splitBySize(String fileName, int byteSize)
throws IOException, InterruptedException {
List<String> parts = new ArrayList<String>();
File file = new File(fileName);
int count = (int) Math.ceil(file.length() / (double) byteSize);
int countLen = (count + "").length();
RandomAccessFile raf = new RandomAccessFile(fileName, "r");
long totalLen = raf.length();
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
String partFileName = file.getPath() + "."
+ leftPad((i + 1) + "", countLen, '0') + ".cvs";
int readSize=byteSize;
long startPos=(long)i * byteSize;
long nextPos=(long)(i+1) * byteSize;
if(nextPos>totalLen){
readSize= (int) (totalLen-startPos);
}
new SplitRunnable(readSize, startPos, partFileName, file, latch).run();
parts.add(partFileName);
}
latch.await();//等待所有文件寫完
//由于切割時(shí)可能會(huì)導(dǎo)致行被切斷,加工所有的的分割文件,合并行
mergeRow(parts);
return parts;
}
/**
* 分割處理Runnable
*
* @author supeidong
*/
private class SplitRunnable implements Runnable {
int byteSize;
String partFileName;
File originFile;
long startPos;
CountDownLatch latch;
public SplitRunnable(int byteSize, long startPos, String partFileName,
File originFile, CountDownLatch latch) {
this.startPos = startPos;
this.byteSize = byteSize;
this.partFileName = partFileName;
this.originFile = originFile;
this.latch = latch;
}
public void run() {
RandomAccessFile rFile;
OutputStream os;
try {
rFile = new RandomAccessFile(originFile, "r");
byte[] b = new byte[byteSize];
rFile.seek(startPos);// 移動(dòng)指針到每“段”開頭
int s = rFile.read(b);
os = new FileOutputStream(partFileName);
os.write(b, 0, s);
os.flush();
os.close();
latch.countDown();
} catch (IOException e) {
log.error(e.getMessage());
latch.countDown();
}
}
}
/**
* 合并被切斷的行
*
* @param parts
*/
private void mergeRow(List<String> parts) {
List<PartFile> partFiles = new ArrayList<PartFile>();
try {
//組裝被切分表對(duì)象
for (int i=0;i<parts.size();i++) {
String partFileName=parts.get(i);
File splitFileTemp = new File(partFileName);
if (splitFileTemp.exists()) {
PartFile partFile = new PartFile();
BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));
String firstRow = reader.readLine();
String secondRow = reader.readLine();
String endRow = readLastLine(partFileName);
partFile.setPartFileName(partFileName);
partFile.setFirstRow(firstRow);
partFile.setEndRow(endRow);
if(i>=1){
String prePartFile=parts.get(i - 1);
String preEndRow = readLastLine(prePartFile);
partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));
}
partFiles.add(partFile);
reader.close();
}
}
//進(jìn)行需要合并的行的寫入
for (int i = 0; i < partFiles.size() - 1; i++) {
PartFile partFile = partFiles.get(i);
PartFile partFileNext = partFiles.get(i + 1);
StringBuilder sb = new StringBuilder();
if (partFileNext.getFirstIsFull()) {
sb.append("\r\n");
sb.append(partFileNext.getFirstRow());
} else {
sb.append(partFileNext.getFirstRow());
}
writeLastLine(partFile.getPartFileName(),sb.toString());
}
} catch (Exception e) {
log.error(e.getMessage());
}
}
/**
* 得到某個(gè)字符出現(xiàn)的次數(shù)
* @param s
* @return
*/
private int getCharCount(String s) {
int count = 0;
for (int i = 0; i < s.length(); i++) {
if (s.charAt(i) == cvsSeparator) {
count++;
}
}
return count;
}
/**
* 采用BufferedInputStream方式讀取文件行數(shù)
*
* @param filename
* @return
*/
public int getFileRow(String filename) throws IOException {
InputStream is = new BufferedInputStream(new FileInputStream(filename));
byte[] c = new byte[1024];
int count = 0;
int readChars = 0;
while ((readChars = is.read(c)) != -1) {
for (int i = 0; i < readChars; ++i) {
if (c[i] == '\n')
++count;
}
}
is.close();
return count;
}
/**
* 讀取最后一行數(shù)據(jù)
* @param filename
* @return
* @throws IOException
*/
private String readLastLine(String filename) throws IOException {
// 使用RandomAccessFile , 從后找最后一行數(shù)據(jù)
RandomAccessFile raf = new RandomAccessFile(filename, "r");
long len = raf.length();
String lastLine = "";
if(len!=0L) {
long pos = len - 1;
while (pos > 0) {
pos--;
raf.seek(pos);
if (raf.readByte() == '\n') {
lastLine = raf.readLine();
lastLine=new String(lastLine.getBytes("8859_1"), "gbk");
break;
}
}
}
raf.close();
return lastLine;
}
/**
* 修改最后一行數(shù)據(jù)
* @param fileName
* @param lastString
* @return
* @throws IOException
*/
private void writeLastLine(String fileName,String lastString){
try {
// 打開一個(gè)隨機(jī)訪問文件流,按讀寫方式
RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
// 文件長度,字節(jié)數(shù)
long fileLength = randomFile.length();
//將寫文件指針移到文件尾。
randomFile.seek(fileLength);
//此處必須加gbk,否則會(huì)出現(xiàn)寫入亂碼
randomFile.write(lastString.getBytes("gbk"));
randomFile.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
/**
* 左填充
*
* @param str
* @param length
* @param ch
* @return
*/
public static String leftPad(String str, int length, char ch) {
if (str.length() >= length) {
return str;
}
char[] chs = new char[length];
Arrays.fill(chs, ch);
char[] src = str.toCharArray();
System.arraycopy(src, 0, chs, length - src.length, src.length);
return new String(chs);
}
/**
* 合并文件行內(nèi)部類
*/
class PartFile {
private String partFileName;
private String firstRow;
private String endRow;
private boolean firstIsFull;
public String getPartFileName() {
return partFileName;
}
public void setPartFileName(String partFileName) {
this.partFileName = partFileName;
}
public String getFirstRow() {
return firstRow;
}
public void setFirstRow(String firstRow) {
this.firstRow = firstRow;
}
public String getEndRow() {
return endRow;
}
public void setEndRow(String endRow) {
this.endRow = endRow;
}
public boolean getFirstIsFull() {
return firstIsFull;
}
public void setFirstIsFull(boolean firstIsFull) {
this.firstIsFull = firstIsFull;
}
}
}
以上就是本文的全部內(nèi)容,希望對(duì)大家學(xué)習(xí)java程序設(shè)計(jì)有所幫助。
相關(guān)文章
JAVA 實(shí)現(xiàn)磁盤文件加解密操作的示例代碼
這篇文章主要介紹了JAVA 實(shí)現(xiàn)磁盤文件加解密操作的示例代碼,幫助大家利用Java實(shí)現(xiàn)文件的加解密,感興趣的朋友可以了解下2020-09-09
java編程經(jīng)典案例之基于斐波那契數(shù)列解決兔子問題實(shí)例
這篇文章主要介紹了java編程經(jīng)典案例之基于斐波那契數(shù)列解決兔子問題,結(jié)合完整實(shí)例形式分析了斐波那契數(shù)列的原理及java解決兔子問題的相關(guān)操作技巧,需要的朋友可以參考下2017-10-10
springboot配置多個(gè)數(shù)據(jù)源兩種方式實(shí)現(xiàn)
在我們的實(shí)際業(yè)務(wù)中可能會(huì)遇到;在一個(gè)項(xiàng)目里面讀取多個(gè)數(shù)據(jù)庫的數(shù)據(jù)來進(jìn)行展示,spring對(duì)同時(shí)配置多個(gè)數(shù)據(jù)源是支持的,本文主要介紹了springboot配置多個(gè)數(shù)據(jù)源兩種方式實(shí)現(xiàn),感興趣的可以了解一下2022-03-03
Java實(shí)現(xiàn)Excel轉(zhuǎn)PDF的兩種方法詳解
使用具將Excel轉(zhuǎn)為PDF的方法有很多,在這里我給大家介紹兩種常用的方法:使用spire轉(zhuǎn)化PDF、使用jacob實(shí)現(xiàn)Excel轉(zhuǎn)PDF,分別應(yīng)對(duì)兩種不一樣的使用場(chǎng)景,需要的可以參考一下2022-01-01
Spring運(yùn)行環(huán)境Environment的解析
本文主要介紹了Spring運(yùn)行環(huán)境Environment的解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08

