Python多進程分塊讀取超大文件的方法
更新時間:2016年04月13日 09:52:10 作者:asdfsx
這篇文章主要介紹了Python多進程分塊讀取超大文件的方法,涉及Python多進程操作與文件分塊讀取的相關(guān)技巧,需要的朋友可以參考下
本文實例講述了Python多進程分塊讀取超大文件的方法。分享給大家供大家參考,具體如下:
讀取超大的文本文件,使用多進程分塊讀取,將每一塊單獨輸出成文件
# -*- coding: GBK -*-
import urlparse
import datetime
import os
from multiprocessing import Process,Queue,Array,RLock
"""
多進程分塊讀取文件
"""
WORKERS = 4
BLOCKSIZE = 100000000
FILE_SIZE = 0
def getFilesize(file):
"""
獲取要讀取文件的大小
"""
global FILE_SIZE
fstream = open(file,'r')
fstream.seek(0,os.SEEK_END)
FILE_SIZE = fstream.tell()
fstream.close()
def process_found(pid,array,file,rlock):
global FILE_SIZE
global JOB
global PREFIX
"""
進程處理
Args:
pid:進程編號
array:進程間共享隊列,用于標記各進程所讀的文件塊結(jié)束位置
file:所讀文件名稱
各個進程先從array中獲取當前最大的值為起始位置startpossition
結(jié)束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
if startpossition==FILE_SIZE則進程結(jié)束
if startpossition==0則從0開始讀取
if startpossition!=0為防止行被block截斷的情況,先讀一行不處理,從下一行開始正式處理
if 當前位置 <=endpossition 就readline
否則越過邊界,就從新查找array中的最大值
"""
fstream = open(file,'r')
while True:
rlock.acquire()
print 'pid%s'%pid,','.join([str(v) for v in array])
startpossition = max(array)
endpossition = array[pid] = (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
rlock.release()
if startpossition == FILE_SIZE:#end of the file
print 'pid%s end'%(pid)
break
elif startpossition !=0:
fstream.seek(startpossition)
fstream.readline()
pos = ss = fstream.tell()
ostream = open('/data/download/tmp_pid'+str(pid)+'_jobs'+str(endpossition),'w')
while pos<endpossition:
#處理line
line = fstream.readline()
ostream.write(line)
pos = fstream.tell()
print 'pid:%s,startposition:%s,endposition:%s,pos:%s'%(pid,ss,pos,pos)
ostream.flush()
ostream.close()
ee = fstream.tell()
fstream.close()
def main():
global FILE_SIZE
print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")
file = "/data/pds/download/scmcc_log/tmp_format_2011004.log"
getFilesize(file)
print FILE_SIZE
rlock = RLock()
array = Array('l',WORKERS,lock=rlock)
threads=[]
for i in range(WORKERS):
p=Process(target=process_found, args=[i,array,file,rlock])
threads.append(p)
for i in range(WORKERS):
threads[i].start()
for i in range(WORKERS):
threads[i].join()
print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")
if __name__ == '__main__':
main()
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python字符串操作技巧匯總》、《Python入門與進階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設(shè)計有所幫助。
您可能感興趣的文章:
- Python多進程并發(fā)(multiprocessing)用法實例詳解
- Python中使用多進程來實現(xiàn)并行處理的方法小結(jié)
- 淺析Python中的多進程與多線程的使用
- Python多進程通信Queue、Pipe、Value、Array實例
- Python多進程庫multiprocessing中進程池Pool類的使用詳解
- Python控制多進程與多線程并發(fā)數(shù)總結(jié)
- 探究Python多進程編程下線程之間變量的共享問題
- Python多進程同步Lock、Semaphore、Event實例
- Python多進程multiprocessing.Pool類詳解
- Python多進程與多線程的使用場景詳解
相關(guān)文章
python如何實現(xiàn)全角半角的相互轉(zhuǎn)換
這篇文章主要介紹了python如何實現(xiàn)全角半角的相互轉(zhuǎn)換方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11

