Python多線程如何同時(shí)處理多個(gè)文件
Python多線程同時(shí)處理多個(gè)文件
在需要對(duì)大量文件進(jìn)行相同的操作時(shí),逐個(gè)遍歷是非常耗費(fèi)時(shí)間的。這時(shí),我們可以借助于Python的多線程操作來(lái)大大提高處理效率,減少處理時(shí)間。
問(wèn)題背景
比如說(shuō),我們現(xiàn)在需要從一個(gè)文件夾下面讀取出所有的視頻,然后對(duì)每個(gè)視頻進(jìn)行逐幀處理。
由于對(duì)視頻逐幀處理本身就是比較耗時(shí)的任務(wù),如果按照串行的方式順序處理每個(gè)視頻文件是效率非常低的,此時(shí),一種比較容易的解決方案是使用Python的多線程進(jìn)行處理。
定義通用處理函數(shù)
并發(fā)適合于處理相似的任務(wù)。例如我們需要對(duì)視頻中的每一幀進(jìn)行處理,處理函數(shù)接受的是一個(gè)視頻名稱(chēng)的列表,對(duì)列表內(nèi)的視頻順序處理,那么我們可以構(gòu)建以下處理函數(shù):
import cv2 def func(video_names): ? ? for video_name in video_names: ? ? ? ? cap = cv2.VideoCapture(video_name) ? ? ? ? while True: ? ? ? ? ? ? ret, frame = cap.read() ? ? ? ? ? ? if ret: ? ? ? ? ? ? ? ? # process temp frame ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? break
這樣,我們只需要將待處理的視頻名稱(chēng)按照開(kāi)啟的線程數(shù),劃分成多個(gè)子列表,就可以進(jìn)行并發(fā)批量處理了。
多線程 Thread
Python中的多線程是通過(guò)threading庫(kù)實(shí)現(xiàn)的,除此之外,multiprocessing也可以實(shí)現(xiàn)并發(fā)處理,二者之間是存在一定差異的。這里我們使用threading來(lái)實(shí)現(xiàn)同時(shí)處理多個(gè)不同的文件。
import threading # video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list] for part_video in video_names_list: ? ? thread = threading.Thread(target=func, args=([part_video])) ? ? thread.start()
在這里,首先將要處理的文件名稱(chēng)列表劃分成若干個(gè)子列表,然后對(duì)每一個(gè)子列表開(kāi)啟一個(gè)線程進(jìn)行處理。
Python多線程文件操作
使用python 將在csv文件中的一百萬(wàn)條網(wǎng)址,寫(xiě)入mongo數(shù)據(jù)庫(kù)中,這里使用多線程進(jìn)行操作。
直接貼上代碼,如下:
import os import threading ?#導(dǎo)入進(jìn)程 import csv import time from Mongo_cache import MongoCache? import win32com.client import winsound NUM_THREAD = 5 COUNT = 0 lock = threading.Lock() cache = MongoCache() ? ? #數(shù)據(jù)庫(kù)連接初始化 def worker(): """ func: 從csv文件中讀取數(shù)據(jù),并將數(shù)據(jù)返回 """ ? ? for path in os.listdir(os.getcwd()): ? ? ? ? #print("當(dāng)前工作目錄", path) ? ? ? ? file_name = path.split('.') ? ? ? ? #print(file_name) ? ? ? ? if file_name[-1] == 'csv': ? ? ? ? ? ? #print("地址是:",path) ? ? ? ? ? ? file = open(path) ? ? ? ? ? ? data = csv.reader(file) ? ? ? ? ? ? return data ? ? ? ? else: ? ? ? ? ? ? pass def save_info(data,i, num_retries=2): """ func: 將數(shù)據(jù)保存 """ ? ? global COUNT ? ? global lock ? ? global cache ? ? for _, website in data: ? ? ? ? try: ? ? ? ? ? ? lock.acquire() ? ? ? ? ? ? #print("線程{}》》》{}正在運(yùn)行".format(threading.current_thread().name, i)) ? ? ? ? ? ? item = {'website':website} ? ? ? ? ? ? cache(item) ? ? ? ? ? ? COUNT += 1 ? ? ? ? except: ? ? ? ? ? ? if num_retries > 0: ? ? ? ? ? ? ? ? save_info(data, i, num_retries-1) ? ? ? ? finally: ? ? ? ? ? ? lock.release() def main(): """ 啟動(dòng)線程 """ ? ? print("start working") ? ? print("working...") ? ? data = worker() ? ? threads = [] ? #設(shè)置主線程 ? ? for i in range(NUM_THREAD): ? ? ? ? t = threading.Thread(target=save_info, args=(data, i)) ? ? ? ? threads.append(t) ? ? for i in range(NUM_THREAD): ? ? ? ? threads[i].start() ? ? for i in range(NUM_THREAD): ? ? ? ? threads[i].join() ? ? print("all was done!") if __name__ == '__main__': ? ? s_time = time.time() ? ? main() ? ? e_time = time.time() ? ? print("總的信息條數(shù):", COUNT) ? ? print("總耗時(shí):", e_time-s_time) ? ? speak = win32com.client.Dispatch('SAPI.SPVOICE') ? ? speak.Speak("早上好,eric,程序結(jié)束!")
數(shù)據(jù)存儲(chǔ)模塊
import pickle import zlib from bson.binary import Binary? from datetime import datetime, timedelta from pymongo import MongoClient import time class MongoCache(object): ? ? def __init__(self, client=None, expires=timedelta(days=30)): ? ? ? ? self.client = MongoClient('localhost', 27017) if client is None else client ? ? ? ? self.db = self.client.cache ? ? ? ? #self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds()) ?#設(shè)置自動(dòng)刪除時(shí)間 ? ? def __call__(self,url): ? ? ? ? self.db.webpage.insert(url) ? ? ? ? #print("保存成功") ? ? def __contains__(self,url): ? ? ? ? try: ? ? ? ? ? ? self[url] ? ? ? ? except KeyError: ? ? ? ? ? ? return False ? ? ? ? else: ? ? ? ? ? ? return True ? ? def __getitem__(self, url): ? ? ? ? record = self.db.webpage.find_one({'_id':url}) ? ? ? ? if record: ? ? ? ? ? ? return pickle.loads(zlib.decompress(record['result'])) ? ? ? ? else: ? ? ? ? ? ? raise KeyError(url + 'dose not exist') ? ? def __setitem__(self, url, result): ? ? ? ? record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()} ? ? ? ? self.db.webpage.update({'_id':url},{'$set':record},upsert=True) ? ? def clear(self): ? ? ? ? self.db.webpage.drop()
將一百萬(wàn)條網(wǎng)址從csv文件保存到數(shù)據(jù)庫(kù)所花費(fèi)的時(shí)間為
start working working... all was done! 總的信息條數(shù): 1000000 總耗時(shí): 427.4034459590912
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
pycharm訪問(wèn)mysql數(shù)據(jù)庫(kù)的方法步驟
這篇文章主要介紹了pycharm訪問(wèn)mysql數(shù)據(jù)庫(kù)的方法步驟。文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06Python數(shù)據(jù)類(lèi)型中的元組Tuple
這篇文章主要介紹了Python數(shù)據(jù)類(lèi)型中的元組Tuple,元組可以理解為一個(gè)只讀列表,用()來(lái)標(biāo)識(shí),下文圍繞元組展開(kāi)詳細(xì)資料,需要的小伙伴可以參考一下2022-02-02python中concurrent.futures的具體使用
concurrent.futures是Python標(biāo)準(zhǔn)庫(kù)的一部分,提供了ThreadPoolExecutor和ProcessPoolExecutor兩種執(zhí)行器,用于管理線程池和進(jìn)程池,通過(guò)這些執(zhí)行器,可以簡(jiǎn)化多線程和多進(jìn)程任務(wù)的管理,提高程序執(zhí)行效率2024-09-09分享7個(gè) Python 實(shí)戰(zhàn)項(xiàng)目練習(xí)
這篇文章主要介紹了分享7個(gè) Python 實(shí)戰(zhàn)項(xiàng)目代碼,經(jīng)過(guò)Python3.6.4調(diào)試通過(guò)的代碼,就具一點(diǎn)的參考價(jià)值,需要的小伙伴可以參考一下2022-03-03Python?中的json常見(jiàn)用法實(shí)例詳解
這篇文章主要介紹了Python?中的json常見(jiàn)用法,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-12-12簡(jiǎn)單介紹Python2.x版本中的cmp()方法的使用
這篇文章主要介紹了簡(jiǎn)單介紹Python2.x版本中的cmp()方法的使用,然而該方法在Python3.x版本中已并不再內(nèi)置...需要的朋友可以參考下2015-05-05