Python多線程如何同時處理多個文件
Python多線程同時處理多個文件
在需要對大量文件進(jìn)行相同的操作時,逐個遍歷是非常耗費(fèi)時間的。這時,我們可以借助于Python的多線程操作來大大提高處理效率,減少處理時間。
問題背景
比如說,我們現(xiàn)在需要從一個文件夾下面讀取出所有的視頻,然后對每個視頻進(jìn)行逐幀處理。
由于對視頻逐幀處理本身就是比較耗時的任務(wù),如果按照串行的方式順序處理每個視頻文件是效率非常低的,此時,一種比較容易的解決方案是使用Python的多線程進(jìn)行處理。
定義通用處理函數(shù)
并發(fā)適合于處理相似的任務(wù)。例如我們需要對視頻中的每一幀進(jìn)行處理,處理函數(shù)接受的是一個視頻名稱的列表,對列表內(nèi)的視頻順序處理,那么我們可以構(gòu)建以下處理函數(shù):
import cv2 def func(video_names): ? ? for video_name in video_names: ? ? ? ? cap = cv2.VideoCapture(video_name) ? ? ? ? while True: ? ? ? ? ? ? ret, frame = cap.read() ? ? ? ? ? ? if ret: ? ? ? ? ? ? ? ? # process temp frame ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? break
這樣,我們只需要將待處理的視頻名稱按照開啟的線程數(shù),劃分成多個子列表,就可以進(jìn)行并發(fā)批量處理了。
多線程 Thread
Python中的多線程是通過threading庫實(shí)現(xiàn)的,除此之外,multiprocessing也可以實(shí)現(xiàn)并發(fā)處理,二者之間是存在一定差異的。這里我們使用threading來實(shí)現(xiàn)同時處理多個不同的文件。
import threading # video_names_list = [part_names_1_list, part_names_2_list, ..., part_names_k_list] for part_video in video_names_list: ? ? thread = threading.Thread(target=func, args=([part_video])) ? ? thread.start()
在這里,首先將要處理的文件名稱列表劃分成若干個子列表,然后對每一個子列表開啟一個線程進(jìn)行處理。
Python多線程文件操作
使用python 將在csv文件中的一百萬條網(wǎng)址,寫入mongo數(shù)據(jù)庫中,這里使用多線程進(jìn)行操作。
直接貼上代碼,如下:
import os import threading ?#導(dǎo)入進(jìn)程 import csv import time from Mongo_cache import MongoCache? import win32com.client import winsound NUM_THREAD = 5 COUNT = 0 lock = threading.Lock() cache = MongoCache() ? ? #數(shù)據(jù)庫連接初始化 def worker(): """ func: 從csv文件中讀取數(shù)據(jù),并將數(shù)據(jù)返回 """ ? ? for path in os.listdir(os.getcwd()): ? ? ? ? #print("當(dāng)前工作目錄", path) ? ? ? ? file_name = path.split('.') ? ? ? ? #print(file_name) ? ? ? ? if file_name[-1] == 'csv': ? ? ? ? ? ? #print("地址是:",path) ? ? ? ? ? ? file = open(path) ? ? ? ? ? ? data = csv.reader(file) ? ? ? ? ? ? return data ? ? ? ? else: ? ? ? ? ? ? pass def save_info(data,i, num_retries=2): """ func: 將數(shù)據(jù)保存 """ ? ? global COUNT ? ? global lock ? ? global cache ? ? for _, website in data: ? ? ? ? try: ? ? ? ? ? ? lock.acquire() ? ? ? ? ? ? #print("線程{}》》》{}正在運(yùn)行".format(threading.current_thread().name, i)) ? ? ? ? ? ? item = {'website':website} ? ? ? ? ? ? cache(item) ? ? ? ? ? ? COUNT += 1 ? ? ? ? except: ? ? ? ? ? ? if num_retries > 0: ? ? ? ? ? ? ? ? save_info(data, i, num_retries-1) ? ? ? ? finally: ? ? ? ? ? ? lock.release() def main(): """ 啟動線程 """ ? ? print("start working") ? ? print("working...") ? ? data = worker() ? ? threads = [] ? #設(shè)置主線程 ? ? for i in range(NUM_THREAD): ? ? ? ? t = threading.Thread(target=save_info, args=(data, i)) ? ? ? ? threads.append(t) ? ? for i in range(NUM_THREAD): ? ? ? ? threads[i].start() ? ? for i in range(NUM_THREAD): ? ? ? ? threads[i].join() ? ? print("all was done!") if __name__ == '__main__': ? ? s_time = time.time() ? ? main() ? ? e_time = time.time() ? ? print("總的信息條數(shù):", COUNT) ? ? print("總耗時:", e_time-s_time) ? ? speak = win32com.client.Dispatch('SAPI.SPVOICE') ? ? speak.Speak("早上好,eric,程序結(jié)束!")
數(shù)據(jù)存儲模塊
import pickle import zlib from bson.binary import Binary? from datetime import datetime, timedelta from pymongo import MongoClient import time class MongoCache(object): ? ? def __init__(self, client=None, expires=timedelta(days=30)): ? ? ? ? self.client = MongoClient('localhost', 27017) if client is None else client ? ? ? ? self.db = self.client.cache ? ? ? ? #self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds()) ?#設(shè)置自動刪除時間 ? ? def __call__(self,url): ? ? ? ? self.db.webpage.insert(url) ? ? ? ? #print("保存成功") ? ? def __contains__(self,url): ? ? ? ? try: ? ? ? ? ? ? self[url] ? ? ? ? except KeyError: ? ? ? ? ? ? return False ? ? ? ? else: ? ? ? ? ? ? return True ? ? def __getitem__(self, url): ? ? ? ? record = self.db.webpage.find_one({'_id':url}) ? ? ? ? if record: ? ? ? ? ? ? return pickle.loads(zlib.decompress(record['result'])) ? ? ? ? else: ? ? ? ? ? ? raise KeyError(url + 'dose not exist') ? ? def __setitem__(self, url, result): ? ? ? ? record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp':datetime.utcnow()} ? ? ? ? self.db.webpage.update({'_id':url},{'$set':record},upsert=True) ? ? def clear(self): ? ? ? ? self.db.webpage.drop()
將一百萬條網(wǎng)址從csv文件保存到數(shù)據(jù)庫所花費(fèi)的時間為
start working working... all was done! 總的信息條數(shù): 1000000 總耗時: 427.4034459590912
總結(jié)
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
pycharm訪問mysql數(shù)據(jù)庫的方法步驟
這篇文章主要介紹了pycharm訪問mysql數(shù)據(jù)庫的方法步驟。文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06python中concurrent.futures的具體使用
concurrent.futures是Python標(biāo)準(zhǔn)庫的一部分,提供了ThreadPoolExecutor和ProcessPoolExecutor兩種執(zhí)行器,用于管理線程池和進(jìn)程池,通過這些執(zhí)行器,可以簡化多線程和多進(jìn)程任務(wù)的管理,提高程序執(zhí)行效率2024-09-09分享7個 Python 實(shí)戰(zhàn)項(xiàng)目練習(xí)
這篇文章主要介紹了分享7個 Python 實(shí)戰(zhàn)項(xiàng)目代碼,經(jīng)過Python3.6.4調(diào)試通過的代碼,就具一點(diǎn)的參考價值,需要的小伙伴可以參考一下2022-03-03