Python并發(fā)之多進(jìn)程的方法實(shí)例代碼
一,進(jìn)程的理論基礎(chǔ)
一個應(yīng)用程序,歸根結(jié)底是一堆代碼,是靜態(tài)的,而進(jìn)程才是執(zhí)行中的程序,在一個程序運(yùn)行的時候會有多個進(jìn)程并發(fā)執(zhí)行。
進(jìn)程和線程的區(qū)別:
- 進(jìn)程是系統(tǒng)資源分配的基本單位。
- 一個進(jìn)程內(nèi)可以包含多個線程,屬于一對多的關(guān)系,進(jìn)程內(nèi)的資源,被其內(nèi)的線程共享
- 線程是進(jìn)程運(yùn)行的最小單位,如果說進(jìn)程是完成一個功能,那么其線程就是完成這個功能的基本單位
- 進(jìn)程間資源不共享,多進(jìn)程切換資源開銷,難度大,同一進(jìn)程內(nèi)的線程資源共享,多線程切換資源開銷,難度小
進(jìn)程與線程的共同點(diǎn):
都是為了提高程序運(yùn)行效率,都有執(zhí)行的優(yōu)先權(quán)
二,Python的多進(jìn)程( multiprocessing模塊)
創(chuàng)建一個進(jìn)程(和創(chuàng)建線程類似)
方法一:創(chuàng)建Process對象,通過對象調(diào)用start()方法啟動進(jìn)程
from multiprocessing import Process
def foo(name):
print('hello,%s'%name)
if __name__ == '__main__':
p1=Process(target=foo,args=('world',))
p2 = Process(target=foo, args=('China',))
p1.start()
p2.start()
print('=====主進(jìn)程=====')
# == == =主進(jìn)程 == == =
# hello, world
# hello, China
#主進(jìn)程和子進(jìn)程并發(fā)執(zhí)行
注意:Process對象只能在在 if __name__ == '__main__':下創(chuàng)建,不然會報錯。
方法二:自定義一個類繼承Process類,并重寫run()方法,將執(zhí)行代碼放在其內(nèi)
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print('hello,%s'%self.name)
if __name__ == '__main__':
myprocess1 = MyProcess('world')
myprocess2 = MyProcess('world')
myprocess1.start()
myprocess2.start()
Process內(nèi)置方法
實(shí)例方法:
p.start():啟動進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()
p.run():進(jìn)程啟動時運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法
p.terminate():強(qiáng)制終止進(jìn)程p,不會進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進(jìn)而導(dǎo)致死鎖
p.is_alive():如果p仍然運(yùn)行,返回True
p.join([timeout]):主線程等待p終止。timeout是可選的超時時間
Process屬性
p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時,p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置
p.name:進(jìn)程的名稱
p.pid:進(jìn)程的pid
p.exitcode:進(jìn)程在運(yùn)行時為None、如果為–N,表示被信號N結(jié)束(了解即可)
守護(hù)進(jìn)程
類似于守護(hù)線程,只不過守護(hù)線程是對象的一個方法,而守護(hù)進(jìn)程封裝成對象的屬性。
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
time.sleep(3)
print('hello,%s'%self.name)
if __name__ == '__main__':
myprocess1=MyProcess('world')
myprocess1.daemon = True
myprocess1.start()
print('結(jié)束')
#不會輸出‘hello world',因為設(shè)置為守護(hù)進(jìn)程,主進(jìn)程不會等待
也可以使用join方法,使主進(jìn)程等待
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
time.sleep(3)
print('hello,%s'%self.name)
if __name__ == '__main__':
myprocess1=MyProcess('world')
myprocess1.daemon = True
myprocess1.start()
myprocess1.join() #程序阻塞
print('結(jié)束')
join()
進(jìn)程同步和鎖
進(jìn)程雖然不像線程共享資源,但是這并不意味著進(jìn)程間不 需要加鎖,比如不同進(jìn)程會共享同一個終端 ( 屏幕),或者操作同一個文件,數(shù)據(jù)庫,那么數(shù)據(jù)安全還是很有必要的,因此我們可以加鎖,
from multiprocessing import Process,Lock
import time
def a_print(l): #需要傳入對象,因為信息不共享
l.acquire()
print('我要打印信息')
time.sleep(1)
print('我打印完了')
l.release()
if __name__ == '__main__':
l = Lock()
for i in range(20):
p = Process(target=a_print,args=(l,))
p.start()
信號量(Semaphore)
能夠并發(fā)執(zhí)行的進(jìn)程數(shù),超出的進(jìn)程阻塞,直到有進(jìn)程運(yùn)行完成。
Semaphore管理一個內(nèi)置的計數(shù)器,
每當(dāng)調(diào)用acquire()時內(nèi)置計數(shù)器-1;
調(diào)用release() 時內(nèi)置計數(shù)器+1;
計數(shù)器不能小于0;當(dāng)計數(shù)器為0時,acquire()將阻塞進(jìn)程直到其他進(jìn)程調(diào)用release()。
from multiprocessing import Process,Queue,Semaphore
import time,random
def seat(s,n):
s.acquire()
print('學(xué)生%d坐下了'%n)
time.sleep(random.randint(1,2))
s.release()
if __name__ == '__main__':
s = Semaphore(5)
for i in range(20):
p = Process(target=seat,args=(s,i))
p.start()
print('-----主進(jìn)程-------')
注意:其實(shí)信號量和鎖類似,只是限制進(jìn)程運(yùn)行某個代碼塊的數(shù)量(鎖為1個),并不是能限制并發(fā)的進(jìn)程,如上述代碼,一次性還是創(chuàng)建了20個進(jìn)程
事件(Event)
from multiprocessing import Process,Event
import time, random
def eating(event):
event.wait()
print('去吃飯的路上...')
def makeing(event):
print('做飯中')
time.sleep(random.randint(1,2))
print('做好了,快來...')
event.set()
if __name__ == '__main__':
event=Event()
t1 = Process(target=eating,args=(event,))
t2 = Process(target=makeing,args=(event,))
t1.start()
t2.start()
# 做飯中
# 做好了,快來...
# 去吃飯的路上...
和線程事件幾乎一致
進(jìn)程隊列(Queue)
進(jìn)程隊列是進(jìn)程通訊的方式之一。使用multiprocessing 下的Queue
from multiprocessing import Process,Queue
import time
def func1(queue):
while True:
info=queue.get()
if info == None:
return
print(info)
def func2(queue):
for i in range(10):
time.sleep(1)
queue.put('is %d'%i)
queue.put(None) #結(jié)束的標(biāo)志
if __name__ == '__main__':
q = Queue()
p1 = Process(target=func1,args=(q,))
p2 = Process(target=func2, args=(q,))
p1.start()
p2.start()
Queue類的方法,源碼如下:
class Queue(object):
def __init__(self, maxsize=-1): #可以傳參設(shè)置隊列最大容量
self._maxsize = maxsize
def qsize(self): #返回當(dāng)前時刻隊列中的個數(shù)
return 0
def empty(self): #是否為空
return False
def full(self): 是否滿了
return False
def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常
pass
def put_nowait(self, obj): #=put(False)
pass
def get(self, block=True, timeout=None): 獲取值,get方法有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時間內(nèi)沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
pass
def get_nowait(self): # = get(False)
pass
def close(self): #將隊列關(guān)閉
pass
def join_thread(self): #略,幾乎不用
pass
def cancel_join_thread(self):
pass
進(jìn)程隊列源碼注釋
進(jìn)程池
進(jìn)程的消耗是很大的,因此我們不能無節(jié)制的開啟新進(jìn)程,因此我們可以 通過維護(hù)一個進(jìn)程池來控制進(jìn)程的數(shù)量 。這就不同于信號量,進(jìn)程池可以從源頭控制進(jìn)程數(shù)量。在Python中可以通過如下方法使用
同步調(diào)用
from multiprocessing import Pool
import time, random, os
def func(n):
pid = os.getpid()
print('進(jìn)程%s正在處理第%d個任務(wù)'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
time.sleep(2)
res = '處理%s'%random.choice(['成功','失敗'])
return res
if __name__ == '__main__':
p = Pool(4) #創(chuàng)建4個進(jìn)程,
li = []
for i in range(10):
res = p.apply(func,args=(i,)) 交給進(jìn)程池處理,處理完成才返回值,會阻塞,即使池內(nèi)還有空余進(jìn)程,相當(dāng)于順序執(zhí)行
li.append(res)
for i in li:
print(i)
#進(jìn)程1916正在處理第0個任務(wù) 時間21-02-53
#進(jìn)程1240正在處理第1個任務(wù) 時間21-02-55
#進(jìn)程3484正在處理第2個任務(wù) 時間21-02-57
#進(jìn)程7512正在處理第3個任務(wù) 時間21-02-59
#進(jìn)程1916正在處理第4個任務(wù) 時間21-03-01
#進(jìn)程1240正在處理第5個任務(wù) 時間21-03-03
#進(jìn)程3484正在處理第6個任務(wù) 時間21-03-05
#進(jìn)程7512正在處理第7個任務(wù) 時間21-03-07
#進(jìn)程1916正在處理第8個任務(wù) 時間21-03-09
#進(jìn)程1240正在處理第9個任務(wù) 時間21-03-11
從結(jié)果可以發(fā)現(xiàn)兩點(diǎn):
- 不是并發(fā)處理
- 一直都只有四個進(jìn)程,串行執(zhí)行
因此進(jìn)程池提供了 異步處理 的方式
from multiprocessing import Pool
import time, random, os
def func(n):
pid = os.getpid()
print('進(jìn)程%s正在處理第%d個任務(wù)'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
time.sleep(2)
res = '處理%s'%random.choice(['成功','失敗'])
return res
if __name__ == '__main__':
p = Pool(4)
li = []
for i in range(10):
res = p.apply_async(func,args=(i,)) 結(jié)果不會立刻返回,遇到阻塞,開啟下一個進(jìn)程,在這,相當(dāng)于幾乎同時出現(xiàn)四個打印結(jié)果(一個線程處理一個任務(wù),處理完下個任務(wù)才能進(jìn)來)
li.append(res)
p.close() #join之前需要關(guān)閉進(jìn)程池
p.join() #因為異步,所以需要等待池內(nèi)進(jìn)程工作結(jié)束再繼續(xù)
for i in li:
print(i.get()) #i是一個對象,通過get方法獲取返回值,而同步則沒有該方法
關(guān)于回調(diào)函數(shù)
from multiprocessing import Pool
import time, random, os
def func(n):
pid = os.getpid()
print('進(jìn)程%s正在處理第%d個任務(wù)'%(pid,n),'時間%s'%time.strftime('%H-%M-%S'))
time.sleep(2)
res = '處理%s'%random.choice(['成功','失敗'])
return res
def foo(info):
print(info) #傳入值為進(jìn)程執(zhí)行結(jié)果
if __name__ == '__main__':
p = Pool(4)
li = []
for i in range(10):
res = p.apply_async(func,args=(i,),callback = foo) callback()回調(diào)函數(shù)會在進(jìn)程執(zhí)行完之后調(diào)用(主進(jìn)程調(diào)用)
li.append(res)
p.close()
p.join()
for i in li:
print(i.get())
有回調(diào)函數(shù)
總結(jié)
以上所述是小編給大家介紹的Python并發(fā)之多進(jìn)程的方法實(shí)例代碼,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
python實(shí)現(xiàn)遍歷文件夾圖片并重命名
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)遍歷文件夾圖片并重命名,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2020-03-03
Python查找兩個有序列表中位數(shù)的方法【基于歸并算法】
這篇文章主要介紹了Python查找兩個有序列表中位數(shù)的方法,結(jié)合實(shí)例形式分析了Python基于歸并算法遍歷、計算有序列表相關(guān)操作技巧,需要的朋友可以參考下2018-04-04
Python中的copy()函數(shù)詳解(list,array)
這篇文章主要介紹了Python中的copy()函數(shù)詳解(list,array),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09
小眾實(shí)用的Python 爬蟲庫RoboBrowser
這篇文章主要介紹了Python 爬蟲庫RoboBrowser的使用簡介,幫助大家更好的理解和學(xué)習(xí)使用python,感興趣的朋友可以了解下2021-03-03
Python 12306搶火車票腳本 Python京東搶手機(jī)腳本
這篇文章主要為大家詳細(xì)介紹了Python 12306搶火車票腳本和Python京東搶手機(jī)腳本,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02

