一篇文章帶你搞定Python多進(jìn)程
1.Python多進(jìn)程模塊
Python中的多進(jìn)程是通過multiprocessing包來實(shí)現(xiàn)的,和多線程的threading.Thread差不多,它可以利用multiprocessing.Process對(duì)象來創(chuàng)建一個(gè)進(jìn)程對(duì)象。這個(gè)進(jìn)程對(duì)象的方法和線程對(duì)象的方法差不多也有start(), run(), join()等方法,其中有一個(gè)方法不同Thread線程對(duì)象中的守護(hù)線程方法是setDeamon,而Process進(jìn)程對(duì)象的守護(hù)進(jìn)程是通過設(shè)置daemon屬性來完成的。
下面說說Python多進(jìn)程的實(shí)現(xiàn)方法,和多線程類似
2.Python多進(jìn)程實(shí)現(xiàn)方法一
from multiprocessing import Process
def fun1(name):
print('測(cè)試%s多進(jìn)程' %name)
if __name__ == '__main__':
process_list = []
for i in range(5): #開啟5個(gè)子進(jìn)程執(zhí)行fun1函數(shù)
p = Process(target=fun1,args=('Python',)) #實(shí)例化進(jìn)程對(duì)象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('結(jié)束測(cè)試')結(jié)果
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
結(jié)束測(cè)試
Process finished with exit code 0
上面的代碼開啟了5個(gè)子進(jìn)程去執(zhí)行函數(shù),我們可以觀察結(jié)果,是同時(shí)打印的,這里實(shí)現(xiàn)了真正的并行操作,就是多個(gè)CPU同時(shí)執(zhí)行任務(wù)。我們知道進(jìn)程是python中最小的資源分配單元,也就是進(jìn)程中間的數(shù)據(jù),內(nèi)存是不共享的,每啟動(dòng)一個(gè)進(jìn)程,都要獨(dú)立分配資源和拷貝訪問的數(shù)據(jù),所以進(jìn)程的啟動(dòng)和銷毀的代價(jià)是比較大了,所以在實(shí)際中使用多進(jìn)程,要根據(jù)服務(wù)器的配置來設(shè)定。
3.Python多進(jìn)程實(shí)現(xiàn)方法二
還記得python多線程的第二種實(shí)現(xiàn)方法嗎?是通過類繼承的方法來實(shí)現(xiàn)的,python多進(jìn)程的第二種實(shí)現(xiàn)方式也是一樣的
from multiprocessing import Process
class MyProcess(Process): #繼承Process類
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name
def run(self):
print('測(cè)試%s多進(jìn)程' % self.name)
if __name__ == '__main__':
process_list = []
for i in range(5): #開啟5個(gè)子進(jìn)程執(zhí)行fun1函數(shù)
p = MyProcess('Python') #實(shí)例化進(jìn)程對(duì)象
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('結(jié)束測(cè)試')結(jié)果
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
測(cè)試Python多進(jìn)程
結(jié)束測(cè)試
Process finished with exit code 0
效果和第一種方式一樣。
我們可以看到Python多進(jìn)程的實(shí)現(xiàn)方式和多線程的實(shí)現(xiàn)方式幾乎一樣。
Process類的其他方法
構(gòu)造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組
target: 要執(zhí)行的方法
name: 進(jìn)程名
args/kwargs: 要傳入方法的參數(shù)
實(shí)例方法:
is_alive():返回進(jìn)程是否在運(yùn)行,bool類型。
join([timeout]):阻塞當(dāng)前上下文環(huán)境的進(jìn)程程,直到調(diào)用此方法的進(jìn)程終止或到達(dá)指定的timeout(可選參數(shù))。
start():進(jìn)程準(zhǔn)備就緒,等待CPU調(diào)度
run():strat()調(diào)用run方法,如果實(shí)例進(jìn)程時(shí)未制定傳入target,這star執(zhí)行t默認(rèn)run()方法。
terminate():不管任務(wù)是否完成,立即停止工作進(jìn)程
屬性:
daemon:和線程的setDeamon功能一樣
name:進(jìn)程名字
pid:進(jìn)程號(hào)
關(guān)于join,daemon的使用和python多線程一樣,這里就不在復(fù)述了。
4.Python多線程的通信
進(jìn)程是系統(tǒng)獨(dú)立調(diào)度核分配系統(tǒng)資源(CPU、內(nèi)存)的基本單位,進(jìn)程之間是相互獨(dú)立的,每啟動(dòng)一個(gè)新的進(jìn)程相當(dāng)于把數(shù)據(jù)進(jìn)行了一次克隆,子進(jìn)程里的數(shù)據(jù)修改無法影響到主進(jìn)程中的數(shù)據(jù),不同子進(jìn)程之間的數(shù)據(jù)也不能共享,這是多進(jìn)程在使用中與多線程最明顯的區(qū)別。但是難道Python多進(jìn)程中間難道就是孤立的嗎?當(dāng)然不是,python也提供了多種方法實(shí)現(xiàn)了多進(jìn)程中間的通信和數(shù)據(jù)共享(可以修改一份數(shù)據(jù))
進(jìn)程對(duì)列Queue
Queue在多線程中也說到過,在生成者消費(fèi)者模式中使用,是線程安全的,是生產(chǎn)者和消費(fèi)者中間的數(shù)據(jù)管道,那在python多進(jìn)程中,它其實(shí)就是進(jìn)程之間的數(shù)據(jù)管道,實(shí)現(xiàn)進(jìn)程通信。
from multiprocessing import Process,Queue
def fun1(q,i):
print('子進(jìn)程%s 開始put數(shù)據(jù)' %i)
q.put('我是%s 通過Queue通信' %i)
if __name__ == '__main__':
q = Queue()
process_list = []
for i in range(3):
p = Process(target=fun1,args=(q,i,)) #注意args里面要把q對(duì)象傳給我們要執(zhí)行的方法,這樣子進(jìn)程才能和主進(jìn)程用Queue來通信
p.start()
process_list.append(p)
for i in process_list:
p.join()
print('主進(jìn)程獲取Queue數(shù)據(jù)')
print(q.get())
print(q.get())
print(q.get())
print('結(jié)束測(cè)試')結(jié)果
子進(jìn)程0 開始put數(shù)據(jù)
子進(jìn)程1 開始put數(shù)據(jù)
子進(jìn)程2 開始put數(shù)據(jù)
主進(jìn)程獲取Queue數(shù)據(jù)
我是0 通過Queue通信
我是1 通過Queue通信
我是2 通過Queue通信
結(jié)束測(cè)試
Process finished with exit code 0
上面的代碼結(jié)果可以看到我們主進(jìn)程中可以通過Queue獲取子進(jìn)程中put的數(shù)據(jù),實(shí)現(xiàn)進(jìn)程間的通信。
管道Pipe
管道Pipe和Queue的作用大致差不多,也是實(shí)現(xiàn)進(jìn)程間的通信,下面之間看怎么使用吧
from multiprocessing import Process, Pipe
def fun1(conn):
print('子進(jìn)程發(fā)送消息:')
conn.send('你好主進(jìn)程')
print('子進(jìn)程接受消息:')
print(conn.recv())
conn.close()
if __name__ == '__main__':
conn1, conn2 = Pipe() #關(guān)鍵點(diǎn),pipe實(shí)例化生成一個(gè)雙向管
p = Process(target=fun1, args=(conn2,)) #conn2傳給子進(jìn)程
p.start()
print('主進(jìn)程接受消息:')
print(conn1.recv())
print('主進(jìn)程發(fā)送消息:')
conn1.send("你好子進(jìn)程")
p.join()
print('結(jié)束測(cè)試')結(jié)果
主進(jìn)程接受消息:
子進(jìn)程發(fā)送消息:
子進(jìn)程接受消息:
你好主進(jìn)程
主進(jìn)程發(fā)送消息:
你好子進(jìn)程
結(jié)束測(cè)試
Process finished with exit code 0
上面可以看到主進(jìn)程和子進(jìn)程可以相互發(fā)送消息
Managers
Queue和Pipe只是實(shí)現(xiàn)了數(shù)據(jù)交互,并沒實(shí)現(xiàn)數(shù)據(jù)共享,即一個(gè)進(jìn)程去更改另一個(gè)進(jìn)程的數(shù)據(jù)。那么久要用到Managers
from multiprocessing import Process, Manager
def fun1(dic,lis,index):
dic[index] = 'a'
dic['2'] = 'b'
lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
#print(l)
if __name__ == '__main__':
with Manager() as manager:
dic = manager.dict()#注意字典的聲明方式,不能直接通過{}來定義
l = manager.list(range(5))#[0,1,2,3,4]
process_list = []
for i in range(10):
p = Process(target=fun1, args=(dic,l,i))
p.start()
process_list.append(p)
for res in process_list:
res.join()
print(dic)
print(l)結(jié)果:
{0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'}
[0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
可以看到主進(jìn)程定義了一個(gè)字典和一個(gè)列表,在子進(jìn)程中,可以添加和修改字典的內(nèi)容,在列表中插入新的數(shù)據(jù),實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,即可以共同修改同一份數(shù)據(jù)
5.進(jìn)程池
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí),則去進(jìn)程池中獲取一個(gè)進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)進(jìn)程,那么程序就會(huì)等待,直到進(jìn)程池中有可用進(jìn)程為止。就是固定有幾個(gè)進(jìn)程可以使用。
進(jìn)程池中有兩個(gè)方法:
- apply:同步,一般不使用
- apply_async:異步
from multiprocessing import Process,Pool
import os, time, random
def fun1(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
pool = Pool(5) #創(chuàng)建一個(gè)5個(gè)進(jìn)程的進(jìn)程池
for i in range(10):
pool.apply_async(func=fun1, args=(i,))
pool.close()
pool.join()
print('結(jié)束測(cè)試')結(jié)果
Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
結(jié)束測(cè)試
對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。
進(jìn)程池map方法
案例來源于網(wǎng)絡(luò),侵權(quán)請(qǐng)告知,謝謝
因?yàn)榫W(wǎng)上看到這個(gè)例子覺得不錯(cuò),所以這里就不自己寫案例,這個(gè)案例比較有說服力
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if \'jpeg\' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == \'__main__\':
folder = os.path.abspath(
\'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(creat_thumbnail, images) #關(guān)鍵點(diǎn),images是一個(gè)可迭代對(duì)象
pool.close()
pool.join()上邊這段代碼的主要工作就是將遍歷傳入的文件夾中的圖片文件,一一生成縮略圖,并將這些縮略圖保存到特定文件夾中。這我的機(jī)器上,用這一程序處理 6000 張圖片需要花費(fèi) 27.9 秒。 map 函數(shù)并不支持手動(dòng)線程管理,反而使得相關(guān)的 debug 工作也變得異常簡單。
map在爬蟲的領(lǐng)域里也可以使用,比如多個(gè)URL的內(nèi)容爬取,可以把URL放入元祖里,然后傳給執(zhí)行函數(shù)。
以上就是一篇文章帶你搞定Python多進(jìn)程的詳細(xì)內(nèi)容,更多關(guān)于Python多進(jìn)程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
pandas和spark dataframe互相轉(zhuǎn)換實(shí)例詳解
這篇文章主要介紹了pandas和spark dataframe互相轉(zhuǎn)換實(shí)例詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
一篇文章帶你搞定Ubuntu中打開Pycharm總是卡頓崩潰
這篇文章主要介紹了一篇文章帶你搞定Ubuntu中打開Pycharm總是卡頓崩潰,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
淺談python requests 的put, post 請(qǐng)求參數(shù)的問題
今天小編就為大家分享一篇淺談python requests 的put, post 請(qǐng)求參數(shù)的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-01-01
opencv-python 提取sift特征并匹配的實(shí)例
今天小編就為大家分享一篇opencv-python 提取sift特征并匹配的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-12-12

