Python高級(jí)編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool)實(shí)例詳解
本文實(shí)例講述了Python高級(jí)編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool)。分享給大家供大家參考,具體如下:
Queue消息隊(duì)列
1.創(chuàng)建
import multiprocessing queue = multiprocessing.Queue(隊(duì)列長(zhǎng)度)
2.方法
方法 | 描述 |
---|---|
put | 變量名.put(數(shù)據(jù)),放入數(shù)據(jù)(如隊(duì)列已滿,則程序進(jìn)入阻塞狀態(tài),等待隊(duì)列取出后再放入) |
put_nowait | 變量名.put_nowati(數(shù)據(jù)),放入數(shù)據(jù)(如隊(duì)列已滿,則不等待隊(duì)列信息取出后再放入,直接報(bào)錯(cuò)) |
get | 變量名.get(數(shù)據(jù)),取出數(shù)據(jù)(如隊(duì)列為空,則程序進(jìn)入阻塞狀態(tài),等待隊(duì)列防如數(shù)據(jù)后再取出) |
get_nowait | 變量名.get_nowait(數(shù)據(jù)),取出數(shù)據(jù)(如隊(duì)列為空,則不等待隊(duì)列放入信息后取出數(shù)據(jù),直接報(bào)錯(cuò)),放入數(shù)據(jù)后立馬判斷是否為空有時(shí)為True,原因是放入值和判斷同時(shí)進(jìn)行 |
qsize | 變量名.qsize(),消息數(shù)量 |
empty | 變量名.empty()(返回值為True或False),判斷是否為空 |
full | 變量名.full()(返回值為True或False),判斷是否為滿 |
3.進(jìn)程通信
因?yàn)檫M(jìn)程間不共享全局變量,所以使用Queue進(jìn)行數(shù)據(jù)通信,可以在父進(jìn)程中創(chuàng)建兩個(gè)字進(jìn)程,一個(gè)往Queue里寫數(shù)據(jù),一個(gè)從Queue里取出數(shù)據(jù)。
例:
import multiprocessing import time def write_queue(queue): # 循環(huán)寫入數(shù)據(jù) for i in range(10): if queue.full(): print("隊(duì)列已滿!") break # 向隊(duì)列中放入消息 queue.put(i) print(i) time.sleep(0.5) def read_queue(queue): # 循環(huán)讀取隊(duì)列消息 while True: # 隊(duì)列為空,停止讀取 if queue.empty(): print("---隊(duì)列已空---") break # 讀取消息并輸出 result = queue.get() print(result) if __name__ == '__main__': # 創(chuàng)建消息隊(duì)列 queue = multiprocessing.Queue(3) # 創(chuàng)建子進(jìn)程 p1 = multiprocessing.Process(target=write_queue, args=(queue,)) p1.start() # 等待p1寫數(shù)據(jù)進(jìn)程執(zhí)行結(jié)束后,再往下執(zhí)行 p1.join() p1 = multiprocessing.Process(target=read_queue, args=(queue,)) p1.start()
執(zhí)行結(jié)果:
Pool進(jìn)程池
初始化Pool時(shí),可以指定一個(gè)最大進(jìn)程數(shù),當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到指定的最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)用之前的進(jìn)程來執(zhí)行新的任務(wù)。
1.創(chuàng)建
import multiprocessing pool = multiprocessing.Pool(最大進(jìn)程數(shù))
2.方法
方法 | 描述 |
---|---|
apply() | 以同步方式添加進(jìn)程 |
apply_async() | 以異步方式添加進(jìn)程 |
close() | 關(guān)閉Pool,使其不接受新任務(wù)(還可以使用) |
terminate() | 不管任務(wù)是否完成,立即終止 |
join() | 主進(jìn)程阻塞,等待子進(jìn)程的退出,必須在close和terminate后使用 |
3.進(jìn)程池內(nèi)通信
創(chuàng)建進(jìn)程池內(nèi)Queue消息隊(duì)列通信
import multiprocessing Queue:queue = multiprocessing.Manager().Queue()
例:
import multiprocessing import time
寫入數(shù)據(jù)的方法
def write_data(queue): # for循環(huán) 向消息隊(duì)列中寫入值 for i in range(5): # 添加消息 queue.put(i) print(i) time.sleep(0.2) print("隊(duì)列已滿~")
創(chuàng)建讀取數(shù)據(jù)的方法
def read_data(queue): # 循環(huán)讀取數(shù)據(jù) while True: # 判斷隊(duì)列是否為空 if queue.qsize() == 0: print("隊(duì)列為空~") break # 從隊(duì)列中讀取數(shù)據(jù) result = queue.get() print(result) if __name__ == '__main__': # 創(chuàng)建進(jìn)程池 pool = multiprocessing.Pool(2) # 創(chuàng)建進(jìn)程池隊(duì)列 queue = multiprocessing.Manager().Queue() # 在進(jìn)程池中的進(jìn)程間進(jìn)行通信 # 使用線程池同步的方式,先寫后讀 # pool.apply(write_data, (queue, )) # pool.apply(read_data, (queue, )) # apply_async() 返回ApplyResult 對(duì)象 result = pool.apply_async(write_data, (queue, )) # ApplyResult對(duì)象的wait() 方法,表示后續(xù)進(jìn)程必須等待當(dāng)前進(jìn)程執(zhí)行完再繼續(xù) result.wait() pool.apply_async(read_data, (queue, )) pool.close() # 異步后,主線程不再等待子進(jìn)程執(zhí)行結(jié)束,再結(jié)束 # join() 后,表示主線程會(huì)等待子進(jìn)程執(zhí)行結(jié)束后,再結(jié)束 pool.join()
運(yùn)行結(jié)果:
4.案例(文件夾copy器)
代碼:
# 導(dǎo)入模塊 import os import multiprocessing # 拷貝文件函數(shù) def copy_dir(file_name, source_dir, desk_dir): # 要拷貝的文件路徑 source_path = source_dir+'/'+file_name # 目標(biāo)路徑 desk_path = desk_dir+'/'+file_name # 獲取文件大小 file_size = os.path.getsize(source_path) # 記錄拷貝次數(shù) i = 0 # 以二進(jìn)制度讀方式打開原文件 with open(source_path, "rb") as source_file: # 以二進(jìn)制寫入方式創(chuàng)建并打開目標(biāo)文件 with open(desk_path, "wb") as desk_file: # 循環(huán)寫入 while True: # 讀取1024字節(jié) file_data = source_file.read(1024) # 如果讀到的不為空,則將讀到的寫入目標(biāo)文件 if file_data: desk_file.write(file_data) # 讀取次數(shù)+1 i += 1 # 拷貝百分比進(jìn)度等于拷貝次數(shù)*1024*100/文件大小 n = i*102400/file_size if n >= 100: n = 100 print(file_name, "拷貝進(jìn)度%.2f%%" % n) else: print(file_name, "拷貝成功") break if __name__ == '__main__': # 要拷貝的文件夾 source_dir = 'test' # 要拷貝到的路徑 desk_dir = 'C:/Users/Administrator/Desktop/'+source_dir # 存在文件夾則不創(chuàng)建 try: os.mkdir(desk_dir) except: print("目標(biāo)文件夾已存在,未創(chuàng)建") # 獲取文件夾內(nèi)文件目錄,存到列表里 file_list = os.listdir(source_dir) print(file_list) # 創(chuàng)建進(jìn)程池,最多同時(shí)運(yùn)行3個(gè)子進(jìn)程 pool = multiprocessing.Pool(3) for file_name in file_list: # 異步方式添加到進(jìn)程池內(nèi) pool.apply_async(copy_dir, args=(file_name, source_dir, desk_dir)) # 關(guān)閉進(jìn)程池(停止添加,已添加的還可運(yùn)行) pool.close() # 讓主進(jìn)程阻塞,等待子進(jìn)程結(jié)束 pool.join()
運(yùn)行結(jié)果:
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進(jìn)階經(jīng)典教程》、《Python+MySQL數(shù)據(jù)庫(kù)程序設(shè)計(jì)入門教程》及《Python常見數(shù)據(jù)庫(kù)操作技巧匯總》
希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。
相關(guān)文章
Python使用urllib2模塊實(shí)現(xiàn)斷點(diǎn)續(xù)傳下載的方法
這篇文章主要介紹了Python使用urllib2模塊實(shí)現(xiàn)斷點(diǎn)續(xù)傳下載的方法,實(shí)例分析了urllib2模塊的使用及斷點(diǎn)續(xù)傳下載的實(shí)現(xiàn)技巧,需要的朋友可以參考下2015-06-06對(duì)Python _取log的幾種方式小結(jié)
今天小編就為大家分享一篇對(duì)Python _取log的幾種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-07-07Python直接使用plot()函數(shù)畫圖的方法實(shí)例
Python非常簡(jiǎn)單而又非常強(qiáng)大,它的功能之一就是畫出漂亮的圖表,實(shí)現(xiàn)數(shù)據(jù)的可視化,下面這篇文章主要給大家介紹了關(guān)于Python直接使用plot()函數(shù)畫圖的相關(guān)資料,需要的朋友可以參考下2022-05-05Python數(shù)據(jù)結(jié)構(gòu)與算法之二叉樹結(jié)構(gòu)定義與遍歷方法詳解
這篇文章主要介紹了Python數(shù)據(jù)結(jié)構(gòu)與算法之二叉樹結(jié)構(gòu)定義與遍歷方法,結(jié)合實(shí)例形式詳細(xì)分析了Python實(shí)現(xiàn)二叉樹結(jié)構(gòu)的定義、遍歷方法及相關(guān)注意事項(xiàng),需要的朋友可以參考下2017-12-12Python 字符串處理特殊空格\xc2\xa0\t\n Non-breaking space
今天遇到一個(gè)問題,使用python的find函數(shù)尋找字符串中的第一個(gè)空格時(shí)沒有找到正確的位置,下面是解決方法,需要的朋友可以參考下2020-02-02