Python讀取文件的四種方式的實例詳解
故事背景:最近在處理Wikipedia的數(shù)據(jù)時發(fā)現(xiàn)由于數(shù)據(jù)量過大,之前的文件讀取和數(shù)據(jù)處理方法幾乎不可用,或耗時非常久。今天學(xué)校安排統(tǒng)一核酸檢查,剛好和文件讀取的過程非常相似。正好借此機(jī)會和大家一起從頭梳理一下幾種文件讀取方法。
故事設(shè)定:現(xiàn)在學(xué)校要求對所有同學(xué)進(jìn)行核酸采集,每位同學(xué)先在宿舍內(nèi)等候防護(hù)人員(以下簡稱“大白”)叫號,叫到自己時去停車場排隊等候大白對自己進(jìn)行采集,采集完之后的樣本由大白統(tǒng)一有序收集并儲存。
名詞解釋:
- 學(xué)生:所有的學(xué)生是一個大文件,每個學(xué)生是其中的一行數(shù)據(jù)
- 宿舍:硬盤
- 停車場:內(nèi)存
- 核酸采集:數(shù)據(jù)處理
- 樣本:處理后的數(shù)據(jù)
- 大白:程序
學(xué)生數(shù)量特別少的情況
當(dāng)學(xué)生數(shù)量特別少時,可以考慮將所有學(xué)生統(tǒng)一叫到停車場等候,再依次進(jìn)行核酸采集。
方法一:簡單情況
此時的程序可以模擬為:
import time from typing import List def pick_all_students(dorm: str) -> List[str]: with open(dorm, "rt", encoding="utf8") as fin: students = fin.readlines() return students def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(dorm: str, sample_storeroom: str) -> None: with open(sample_storeroom, "wt", encoding="utf8") as fout: students = pick_all_students(dorm) for student in students: sample = pick_sample(student) fout.write(f"{sample}\n") fout.flush() if __name__ == "__main__": process( "student_names.txt", "sample_storeroom.txt" )
注意,在第19行中,大白一次性把所有同學(xué)都叫到了停車場中。這種做法在學(xué)生比較少時做起來很快,但是如果學(xué)生特別多,停車場裝不下怎么辦?
停車場空間不夠時怎么辦?
方法二:邊讀邊處理
一般來說,由于停車場空間有限,我們不會采用一次性把所有學(xué)生都叫到停車場中,而是會一個一個地處理,這樣可以節(jié)約內(nèi)存空間。
import time from typing import Iterator def pick_one_student(dorm: str) -> Iterator[str]: with open(dorm, "rt", encoding="utf8") as fin: for student in fin: yield student def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(dorm: str, sample_storeroom: str) -> None: with open(sample_storeroom, "wt", encoding="utf8") as fout: for student in pick_one_student(dorm): sample = pick_sample(student) fout.write(f"{sample}\n") fout.flush() if __name__ == "__main__": process( "student_names.txt", "sample_storeroom.txt" )
這里pick_one_student函數(shù)中的返回值是用yield返回的,一次只會返回一名同學(xué)。
不過,這種做法雖然確保了停車場不會滿員,但是這種做法在人數(shù)特別多的時候就不再適合了。雖然可以保證完成任務(wù),但由于每次只能采集一個同學(xué),程序的執(zhí)行并不高。特別是當(dāng)你的CPU有多個核時,會浪費(fèi)機(jī)器性能,出現(xiàn)一核有難,其它圍觀的現(xiàn)象。
怎么加快執(zhí)行效率?
大家可能也已經(jīng)注意到了,剛剛我們的場景中,不論采用哪種方法,都只有一名大白在工作。那我們能不能加派人手,從而提高效率呢?
答案當(dāng)然是可行的。我們現(xiàn)在先考慮增加兩名大白,使得一名大白專注于叫號,安排學(xué)生進(jìn)入停車場,另外一名大白專注于采集核酸,最后一名大白用于存儲核酸樣本。
方法三
import time from multiprocessing import Queue, Process from typing import Iterator def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]: print("pick_student: started") picked_num = 0 with open(dorm, "rt", encoding="utf8") as fin: for student in fin: stu_queue.put(student) picked_num += 1 if picked_num % 500 == 0: print(f"pick_student: {picked_num}") # end signal stu_queue.put(None) print("pick_student: finished") def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(stu_queue: Queue, store_queue: Queue) -> None: print("process: started") process_num = 0 while True: student = stu_queue.get() if student is not None: sample = pick_sample(student) store_queue.put(sample) process_num += 1 if process_num % 500 == 0: print(f"process: {process_num}") else: break # end signal store_queue.put(None) print("process: finished") def store_sample(store_queue: Queue, sample_storeroom: str) -> None: print("store_sample: started") store_num = 0 with open(sample_storeroom, "wt", encoding="utf8") as fout: while True: sample = store_queue.get() if sample is not None: fout.write(f"{sample}\n") fout.flush() store_num += 1 if store_num % 500 == 0: print(f"store_sample: {store_num}") else: break print("store_sample: finished") if __name__ == "__main__": dorm = "student_names.txt" sample_storeroom = "sample_storeroom.txt" stu_queue = Queue() store_queue = Queue() store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True) store_p.start() process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True) process_p.start() read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True) read_p.start() store_p.join()
這份代碼中,我們引入了多進(jìn)程的思路,將每個大白看作一個進(jìn)程,并使用了隊列Queue作為進(jìn)程間通信的媒介。stu_queue表示學(xué)生叫號進(jìn)停車場的隊列,store_queue表示已經(jīng)采集過的待存儲核酸樣本的隊列。
此外,為了控制進(jìn)程的停止,我們在pick_student和 process函數(shù)的最后都向各自隊列中添加了None作為結(jié)束標(biāo)志符。
假設(shè)有1w名學(xué)生(student_names.txt文件有1w行),經(jīng)過測試后發(fā)現(xiàn)上述方法的時間如下:
- 方法一:1m40.716s
- 方法二:1m40.717s
- 方法三:1m41.097s
咦?不是做了分工嗎?怎么速度還變慢了?經(jīng)筆者觀察,這是因為叫號的大白速度太快了(文件讀取速度快)通常是TA已經(jīng)齊活了,另外倆人還在吭哧吭哧干活呢,體現(xiàn)不出來分工的優(yōu)勢。如果這個時候我們對法二和法三的叫號做延時操作,每個學(xué)生叫號之后停滯10ms再叫下一位學(xué)生,則方法三的處理時間幾乎不變,而方法二的時間則會延長至3m21.345s。
怎么加快處理速度?
上面提到,大白采核酸的時間較長,往往上一個人的核酸還沒采完,下一個人就已經(jīng)在后面等著了。我們能不能提高核酸采集這個動作(數(shù)據(jù)處理)的速度呢?其實一名大白執(zhí)行一次核酸采集的時間我們幾乎無法再縮短了,但是我們可以通過增加人手的方式,來達(dá)到這個目的。就像去銀行辦業(yè)務(wù),如果開放的窗口越多,那么每個人等待的時間就會越短。這里我們也采取類似的策略,增加核酸采集的窗口。
import time from multiprocessing import Queue, Process, cpu_count from typing import Iterator def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]: print("pick_student: started") picked_num = 0 with open(dorm, "rt", encoding="utf8") as fin: for student in fin: stu_queue.put(student) picked_num += 1 if picked_num % 500 == 0: print(f"pick_student: {picked_num}") # end signal for _ in range(num_workers): stu_queue.put(None) print("pick_student: finished") def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(stu_queue: Queue, store_queue: Queue) -> None: print("process: started") process_num = 0 while True: student = stu_queue.get() if student is not None: sample = pick_sample(student) store_queue.put(sample) process_num += 1 if process_num % 500 == 0: print(f"process: {process_num}") else: break print("process: finished") def store_sample(store_queue: Queue, sample_storeroom: str) -> None: print("store_sample: started") store_num = 0 with open(sample_storeroom, "wt", encoding="utf8") as fout: while True: sample = store_queue.get() if sample is not None: fout.write(f"{sample}\n") fout.flush() store_num += 1 if store_num % 500 == 0: print(f"store_sample: {store_num}") else: break print("store_sample: finished") if __name__ == "__main__": dorm = "student_names.txt" sample_storeroom = "sample_storeroom.txt" num_process = max(1, cpu_count() - 1) maxsize = 10 * num_process stu_queue = Queue(maxsize=maxsize) store_queue = Queue(maxsize=maxsize) store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True) store_p.start() process_workers = [] for _ in range(num_process): process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True) process_p.start() process_workers.append(process_p) read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True) read_p.start() for worker in process_workers: worker.join() # end signal store_queue.put(None) store_p.join()
總耗時 0m4.160s !我們來具體看看其中的細(xì)節(jié)部分:
首先我們將CPU核數(shù) - 3作為采核酸的大白數(shù)量。這里減3是為其它工作進(jìn)程保留了一些資源,你也可以根據(jù)自己的具體情況做調(diào)整
這次我們在 Queue中增加了 maxsize參數(shù),這個參數(shù)是限制隊列的最大長度,這個參數(shù)通常與你的實際內(nèi)存情況有關(guān)。如果數(shù)據(jù)特別多時要考慮做些調(diào)整。這里我采用10倍的工作進(jìn)程數(shù)目作為隊列的長度
注意這里pick_student函數(shù)中要為每個后續(xù)的工作進(jìn)程都添加一個結(jié)束標(biāo)志,因此最后會有個for循環(huán)
我們把之前放在process函數(shù)中的結(jié)束標(biāo)志提取出來,放在了最外側(cè),使得所有工作進(jìn)程均結(jié)束之后再關(guān)閉最后的store_p進(jìn)程
結(jié)語
總結(jié)來說,如果你的數(shù)據(jù)集特別小,用法一;通常情況下用法二;數(shù)據(jù)集特別大時用法四。
以上就是Python讀取文件的四種方式的實例詳解的詳細(xì)內(nèi)容,更多關(guān)于Python讀取文件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實現(xiàn)的生產(chǎn)者、消費(fèi)者問題完整實例
這篇文章主要介紹了Python實現(xiàn)的生產(chǎn)者、消費(fèi)者問題,簡單描述了生產(chǎn)者、消費(fèi)者問題的概念、原理,并結(jié)合完整實例形式分析了Python實現(xiàn)生產(chǎn)者、消費(fèi)者問題的相關(guān)操作技巧,需要的朋友可以參考下2018-05-05點(diǎn)云地面點(diǎn)濾波(Cloth Simulation Filter, CSF)
這篇文章主要介紹了點(diǎn)云地面點(diǎn)濾波(Cloth Simulation Filter, CSF)“布料”濾波算法介紹,本文從基本思想到實現(xiàn)思路一步步給大家講解的非常詳細(xì),需要的朋友可以參考下2021-08-08Python數(shù)據(jù)分析庫pandas高級接口dt的使用詳解
這篇文章主要介紹了Python數(shù)據(jù)分析庫pandas高級接口dt的使用詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12