簡單學習Python多進程Multiprocessing
1.1 什么是 Multiprocessing
多線程在同一時間只能處理一個任務。
可把任務平均分配給每個核,而每個核具有自己的運算空間。
1.2 添加進程 Process
與線程類似,如下所示,但是該程序直接運行無結果,因為IDLE不支持多進程,在命令行終端運行才有結果顯示
import multiprocessing as mp def job(a,b): print('abc') if __name__=='__main__': p1=mp.Process(target=job,args=(1,2)) p1.start() p1.join()
1.3 存儲進程輸出 Queue
不知道為什么下面的這個程序可以在IDLE中正常運行。首先定義了一個job函數(shù)作系列數(shù)學運算,然后將結果放到res中,在main函數(shù)運行,取出queue中存儲的結果再進行一次加法運算。
import multiprocessing as mp def job(q): res=0 for i in range(1000): res+=i+i**2+i**3 q.put(res) if __name__ == '__main__': q=mp.Queue() p1 = mp.Process(target=job,args=(q,))#注意當參數(shù)只有一個時,應加上逗號 p2 = mp.Process(target=job,args=(q,)) p1.start() p2.start() p1.join() p2.join() res1=q.get() res2=q.get() print(res1+res2)
結果如下所示:
1.4 效率比對 threading & multiprocessing
在job函數(shù)中定義了數(shù)學運算,比較正常情況、多線程和多進程分別的運行時間。
import multiprocessing as mp import threading as td import time def job(q): res = 0 for i in range(10000000): res += i+i**2+i**3 q.put(res) # queue def multicore(): q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print('multicore:' , res1+res2) def normal(): res = 0 for _ in range(2):#線程或進程都構造了兩個,進行了兩次運算,所以這里循環(huán)兩次 for i in range(10000000): res += i+i**2+i**3 print('normal:', res) def multithread(): q = mp.Queue() t1 = td.Thread(target=job, args=(q,)) t2 = td.Thread(target=job, args=(q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print('multithread:', res1+res2) if __name__ == '__main__': st = time.time() normal() st1= time.time() print('normal time:', st1 - st) multithread() st2 = time.time() print('multithread time:', st2 - st1) multicore() print('multicore time:', time.time()-st2)
在視頻中的運行結果是多進程<正常<多線程,而我的運行結果為下圖所示:
綜上,多核/多進程運行最快,說明在同時間運行了多個任務,而多線程卻不一定會比正常情況下的運行來的快,這和多線程中的GIL有關。
1.5 進程池
進程池Pool,就是我們將所要運行的東西,放到池子里,Python會自行解決多進程的問題。
import multiprocessing as mp def job(x): return x*x def multicore(): pool=mp.Pool(processes=2)#定義一個Pool,并定義CPU核數(shù)量為2 res=pool.map(job,range(10)) print(res) res=pool.apply_async(job,(2,)) print(res.get()) multi_res=[pool.apply_async(job,(i,)) for i in range(10)] print([res.get()for res in multi_res]) if __name__=='__main__': multicore()
運行結果如下所示:
首先定義一個池子,有了池子之后,就可以讓池子對應某一個函數(shù),在上述代碼中定義的pool對應job函數(shù)。我們向池子里丟數(shù)據(jù),池子就會返回函數(shù)返回的值。 Pool和之前的Process的不同點是丟向Pool的函數(shù)有返回值,而Process的沒有返回值。
接下來用map()獲取結果,在map()中需要放入函數(shù)和需要迭代運算的值,然后它會自動分配給CPU核,返回結果
我們怎么知道Pool是否真的調(diào)用了多個核呢?我們可以把迭代次數(shù)增大些,然后打開CPU負載看下CPU運行情況
打開CPU負載(Mac):活動監(jiān)視器 > CPU > CPU負載(單擊一下即可)
Pool默認大小是CPU的核數(shù),我們也可以通過在Pool中傳入processes參數(shù)即可自定義需要的核數(shù)量。
Pool除了可以用map來返回結果之外,還可以用apply_async(),與map不同的是,只能傳遞一個值,只會放入一個核進行計算,但是傳入值時要注意是可迭代的,所以在傳入值后需要加逗號, 同時需要用get()方法獲取返回值。所對應的代碼為:
res=pool.apply_async(job,(2,)) print(res.get())
運行結果為4。
由于傳入值是可以迭代的,則我們同樣可以使用apply_async()來輸出多個結果。如果在apply_async()中輸入多個傳入值:
res = pool.apply_async(job, (2,3,4,))
結果會報錯:
TypeError: job() takes exactly 1 argument (3 given)
即apply_async()只能輸入一組參數(shù)。
在此我們將apply_async()放入迭代器中,定義一個新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
同樣在取出值時需要一個一個取出來
print([res.get() for res in multi_res])
apply用迭代器的運行結果與map取出的結果相同。
note:
(1)Pool默認調(diào)用是CPU的核數(shù),傳入processes參數(shù)可自定義CPU核數(shù)
(2)map() 放入迭代參數(shù),返回多個結果
(3)apply_async()只能放入一組參數(shù),并返回一個結果,如果想得到map()的效果需要通過迭代
1.6 共享內(nèi)存 shared memory
只有通過共享內(nèi)存才能讓CPU之間進行交流。
通過Value將數(shù)據(jù)存儲在一個共享的內(nèi)存表中。
import multiprocessing as mp value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14)
其中,i和d表示數(shù)據(jù)類型。i為帶符號的整型,d為雙精浮點類型。更多數(shù)據(jù)類型可參考網(wǎng)址:https://docs.python.org/3/library/array.html
在多進程中有一個Array類,可以和共享內(nèi)存交互,來實現(xiàn)進程之間共享數(shù)據(jù)。
和numpy中的不同,這里的Array只能是一維的,并且需要定義數(shù)據(jù)類型否則會報錯。
array = mp.Array('i', [1, 2, 3, 4])
1.7 進程鎖 Lock
首先是不加進程鎖的運行情況,在下述代碼中定義了共享變量v,定義了兩個進程,均可對v進行操作。job函數(shù)的作用是每隔0.1s輸出一次累加num的值,累加值num在兩個進程中分別為1和3。
import multiprocessing as mp import time def job(v,num): for _ in range(10): time.sleep(0.1)#暫停0.1s,讓輸出效果更明顯 v.value+=num #v.value獲取共享變量值 print(v.value) def multicore(): v=mp.Value('i',0)#定義共享變量 p1=mp.Process(target=job,args=(v,1)) p2=mp.Process(target=job,args=(v,3)) p1.start() p2.start() p1.join() p2.join() if __name__=='__main__': multicore()
運行結果如下所示:
可以看到兩個進程互相搶占共享內(nèi)存v。
為了解決上述不同進程搶共享資源的問題,我們可以用加進程鎖來解決。
首先需要定義一個進程鎖:
l = mp.Lock() # 定義一個進程鎖
然后將進程鎖的信息傳入各個進程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要將Lock傳入 p2 = mp.Process(target=job, args=(v,3,l))
在job()中設置進程鎖的使用,保證運行時一個進程的對鎖內(nèi)內(nèi)容的獨占
def job(v, num, l): l.acquire() # 鎖住 for _ in range(5): time.sleep(0.1) v.value += num # v.value獲取共享內(nèi)存 print(v.value) l.release() # 釋放
完整代碼:
def job(v, num, l): l.acquire() # 鎖住 for _ in range(5): time.sleep(0.1) v.value += num # 獲取共享內(nèi)存 print(v.value) l.release() # 釋放 def multicore(): l = mp.Lock() # 定義一個進程鎖 v = mp.Value('i', 0) # 定義共享內(nèi)存 p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入 p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
運行結果如下所示:
可以看到進程1運行完之后才運行進程2。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
python爬取”頂點小說網(wǎng)“《純陽劍尊》的示例代碼
這篇文章主要介紹了python爬取”頂點小說網(wǎng)“《純陽劍尊》的示例代碼,幫助大家更好的利用python 爬蟲爬取數(shù)據(jù),感興趣的朋友可以了解下2020-10-10Python 檢查數(shù)組元素是否存在類似PHP isset()方法
isset方法來檢查數(shù)組元素是否存在,在Python中無對應函數(shù),在Python中一般可以通過異常來處理數(shù)組元素不存在的情況,而無須事先檢查2014-10-10python實現(xiàn)讀取Excel內(nèi)容并展示成json
這篇文章主要為大家詳細介紹了如何使用python實現(xiàn)讀取Excel內(nèi)容并展示成json功能,文中的示例代碼講解詳細,感興趣的小伙伴可以參考一下2023-12-12像線程一樣管理進程的Python multiprocessing庫
multiprocessing庫是基于threading API,它可以把工作劃分為多個進程.有些情況下,multiprocessing可以作為臨時替換取代threading來利用多個CPU內(nèi)核,相應地避免Python全局解釋器鎖所帶來的計算瓶頸.本文詳細介紹了Python multiprocessing庫,需要的朋友可以參考下2021-05-05