python threading和multiprocessing模塊基本用法實(shí)例分析
本文實(shí)例講述了python threading和multiprocessing模塊基本用法。分享給大家供大家參考,具體如下:
前言
這兩天為了做一個(gè)小項(xiàng)目,研究了一下python的并發(fā)編程,所謂并發(fā)無(wú)非多線程和多進(jìn)程,最初找到的是threading模塊,因?yàn)橛∠笾芯€程“輕量...”,“切換快...”,“可共享進(jìn)程資源...”等等,但是沒(méi)想到這里水很深,進(jìn)而找到了更好的替代品multiprocessing模塊。下面會(huì)講一些使用中的經(jīng)驗(yàn)。
后面出現(xiàn)的代碼都在ubuntu10.04 + python2.6.5的環(huán)境下測(cè)試通過(guò)。
一、使用threading模塊創(chuàng)建線程
1、三種線程創(chuàng)建方式
(1)傳入一個(gè)函數(shù)
這種方式是最基本的,即調(diào)用threading中的Thread類的構(gòu)造函數(shù),然后指定參數(shù)target=func,再使用返回的Thread的實(shí)例調(diào)用start()方法,即開(kāi)始運(yùn)行該線程,該線程將執(zhí)行函數(shù)func,當(dāng)然,如果func需要參數(shù),可以在Thread的構(gòu)造函數(shù)中傳入?yún)?shù)args=(...)。示例代碼如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
#用于線程執(zhí)行的函數(shù)
def counter(n):
cnt = 0;
for i in xrange(n):
for j in xrange(i):
cnt += j;
print cnt;
if __name__ == '__main__':
#初始化一個(gè)線程對(duì)象,傳入函數(shù)counter,及其參數(shù)1000
th = threading.Thread(target=counter, args=(1000,));
#啟動(dòng)線程
th.start();
#主線程阻塞等待子線程結(jié)束
th.join();
這段代碼很直觀,counter函數(shù)是一個(gè)很無(wú)聊的雙重循環(huán),需要注意的是th.join()這句,這句的意思是主線程將自我阻塞,然后等待th表示的線程執(zhí)行完畢再結(jié)束,如果沒(méi)有這句,運(yùn)行代碼會(huì)立即結(jié)束。join的意思比較晦澀,其實(shí)將這句理解成這樣會(huì)好理解些“while th.is_alive(): time.sleep(1)”。雖然意思相同,但是后面將看到,使用join也有陷阱。
(2)傳入一個(gè)可調(diào)用的對(duì)象
許多的python 對(duì)象都是我們所說(shuō)的可調(diào)用的,即是任何能通過(guò)函數(shù)操作符“()”來(lái)調(diào)用的對(duì)象(見(jiàn)《python核心編程》第14章)。類的對(duì)象也是可以調(diào)用的,當(dāng)被調(diào)用時(shí)會(huì)自動(dòng)調(diào)用對(duì)象的內(nèi)建方法__call__(),因此這種新建線程的方法就是給線程指定一個(gè)__call__方法被重載了的對(duì)象。示例代碼如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
#可調(diào)用的類
class Callable(object):
def __init__(self, func, args):
self.func = func;
self.args = args;
def __call__(self):
apply(self.func, self.args);
#用于線程執(zhí)行的函數(shù)
def counter(n):
cnt = 0;
for i in xrange(n):
for j in xrange(i):
cnt += j;
print cnt;
if __name__ == '__main__':
#初始化一個(gè)線程對(duì)象,傳入可調(diào)用的Callable對(duì)象,并用函數(shù)counter及其參數(shù)1000初始化這個(gè)對(duì)象
th = threading.Thread(target=Callable(counter, (1000,)));
#啟動(dòng)線程
th.start();
#主線程阻塞等待子線程結(jié)束
th.join();
這個(gè)例子關(guān)鍵的一句是apply(self.func, self.args); 這里使用初始化時(shí)傳入的函數(shù)對(duì)象及其參數(shù)來(lái)進(jìn)行一次調(diào)用。
(3)繼承Thread類
這種方式通過(guò)繼承Thread類,并重載其run方法,來(lái)實(shí)現(xiàn)自定義的線程行為,示例代碼如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import threading, time, random
def counter():
cnt = 0;
for i in xrange(10000):
for j in xrange(i):
cnt += j;
class SubThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self, name=name);
def run(self):
i = 0;
while i < 4:
print self.name,'counting...\n';
counter();
print self.name,'finish\n';
i += 1;
if __name__ == '__main__':
th = SubThread('thread-1');
th.start();
th.join();
print 'all done';
這個(gè)例子定義了一個(gè)SubThread類,它繼承了Thread類,并重載了run方法,在方法中調(diào)用counter4次并打印一些信息,可以看到這種方式比較直觀。在構(gòu)造函數(shù)中要記得先調(diào)用父類的構(gòu)造函數(shù)進(jìn)行初始化。
2、python多線程的限制
python多線程有個(gè)討厭的限制,全局解釋器鎖(global interpreter lock),這個(gè)鎖的意思是任一時(shí)間只能有一個(gè)線程使用解釋器,跟單cpu跑多個(gè)程序一個(gè)意思,大家都是輪著用的,這叫“并發(fā)”,不是“并行”。手冊(cè)上的解釋是為了保證對(duì)象模型的正確性!這個(gè)鎖造成的困擾是如果有一個(gè)計(jì)算密集型的線程占著cpu,其他的線程都得等著....,試想你的多個(gè)線程中有這么一個(gè)線程,得多悲劇,多線程生生被搞成串行;當(dāng)然這個(gè)模塊也不是毫無(wú)用處,手冊(cè)上又說(shuō)了:當(dāng)用于IO密集型任務(wù)時(shí),IO期間線程會(huì)釋放解釋器,這樣別的線程就有機(jī)會(huì)使用解釋器了!所以是否使用這個(gè)模塊需要考慮面對(duì)的任務(wù)類型。
二、使用multiprocessing創(chuàng)建進(jìn)程
1、三種創(chuàng)建方式
進(jìn)程的創(chuàng)建方式跟線程完全一致,只不過(guò)要將threading.Thread換成multiprocessing.Process。multiprocessing模塊盡力保持了與threading模塊在方法名上的一致性,示例代碼可參考上面線程部分的。這里只給出第一種使用函數(shù)的方式:
#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, time
def run():
i = 0;
while i<10000:
print 'running';
time.sleep(2);
i += 1;
if __name__ == '__main__':
p = multiprocessing.Process(target=run);
p.start();
#p.join();
print p.pid;
print 'master gone';
2、創(chuàng)建進(jìn)程池
該模塊還允許一次創(chuàng)建一組進(jìn)程,然后再給他們分配任務(wù)。詳細(xì)內(nèi)容可參考手冊(cè),這部分研究不多,不敢亂寫。
pool = multiprocessing.Pool(processes=4) pool.apply_async(func, args...)
3、使用進(jìn)程的好處
完全并行,無(wú)GIL的限制,可充分利用多cpu多核的環(huán)境;可以接受linux信號(hào),后面將看到,這個(gè)功能非常好用。
三、實(shí)例研究
該實(shí)例假想的任務(wù)是:一個(gè)主進(jìn)程會(huì)啟動(dòng)多個(gè)子進(jìn)程分別處理不同的任務(wù),各個(gè)子進(jìn)程可能又有自己的線程用于不同的IO處理(前面說(shuō)過(guò),線程在IO方面還是不錯(cuò)的),要實(shí)現(xiàn)的功能是,對(duì)這些子進(jìn)程發(fā)送信號(hào),能被正確的處理,例如發(fā)生SIGTERM,子進(jìn)程能通知其線程收工,然后“優(yōu)雅”的退出?,F(xiàn)在要解決的問(wèn)題有:(1)在子類化的Process對(duì)象中如何捕捉信號(hào);(2)如何“優(yōu)雅的退出”。下面分別說(shuō)明。
1、子類化Process并捕捉信號(hào)
如果是使用第一種進(jìn)程創(chuàng)建方式(傳入函數(shù)),那么捕捉信號(hào)很容易,假設(shè)給進(jìn)程運(yùn)行的函數(shù)叫func,代碼示例如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, signal,time
def handler(signum, frame):
print 'signal', signum;
def run():
signal.signal(signal.SIGTERM, handler);
signal.signal(signal.SIGINT, handler);
i = 0;
while i<10000:
print 'running';
time.sleep(2);
i += 1;
if __name__ == '__main__':
p = multiprocessing.Process(target=run);
p.start();
#p.join();
print p.pid;
print 'master gone';
這段代碼是在第一種創(chuàng)建方式的基礎(chǔ)上修改而來(lái)的,增加了兩行signal.signal(...)調(diào)用,這是說(shuō)這個(gè)函數(shù)要捕捉SIGTERM和SIGINT兩個(gè)信號(hào),另外增加了一個(gè)handler函數(shù),該函數(shù)用于捕捉到信號(hào)時(shí)進(jìn)行相應(yīng)的處理,我們這里只是簡(jiǎn)單的打印出信號(hào)值。
注意p.join()被注釋掉了,這里跟線程的情況有點(diǎn)區(qū)別,新的進(jìn)程啟動(dòng)后就開(kāi)始運(yùn)行了,主進(jìn)程也不用等待它運(yùn)行完,可以該干嘛干嘛去。這段代碼運(yùn)行后會(huì)打印出子進(jìn)程的進(jìn)程id,根據(jù)這個(gè)id,在另一個(gè)終端輸入kill -TERM id,會(huì)發(fā)現(xiàn)剛才的終端打印出了"signal 15"。
但是使用傳入函數(shù)的方式有一點(diǎn)不好的是封裝性太差,如果功能稍微復(fù)雜點(diǎn),將會(huì)有很多的全局變量暴露在外,最好還是將功能封裝成類,那么使用類又怎么注冊(cè)信號(hào)相應(yīng)函數(shù)呢?上面的例子貌似只能使用一個(gè)全局的函數(shù),手冊(cè)也沒(méi)有給出在類中處理信號(hào)的例子,其實(shí)解決方法大同小異,也很容易,這個(gè)帖子http://stackoverflow.com/questions/6204443/python-signal-reading-return-from-signal-handler-function給了我靈感:
class Master(multiprocessing.Process):
def __init__(self):
super(Master,self).__init__();
signal.signal(signal.SIGTERM, self.handler); #注冊(cè)信號(hào)處理函數(shù)
self.live = 1;
#信號(hào)處理函數(shù)
def handler(self, signum, frame):
print 'signal:',signum;
self.live = 0;
def run(self):
print 'PID:',self.pid;
while self.live:
print 'living...'
time.sleep(2);
方法很直觀,首先在構(gòu)造函數(shù)中注冊(cè)信號(hào)處理函數(shù),然后定義了一個(gè)方法handler作為處理函數(shù)。這個(gè)進(jìn)程類會(huì)每隔2秒打印一個(gè)“l(fā)iving...”,當(dāng)接收到SIGTERM后,改變self.live的值,run方法的循環(huán)檢測(cè)到這個(gè)值為0后就結(jié)束了,進(jìn)程也結(jié)束了。
2、讓進(jìn)程優(yōu)雅的退出
下面放出這次的假想任務(wù)的全部代碼,我在主進(jìn)程中啟動(dòng)了一個(gè)子進(jìn)程(通過(guò)子類化Process類),然后子進(jìn)程啟動(dòng)后又產(chǎn)生兩個(gè)子線程,用來(lái)模擬“生產(chǎn)者-消費(fèi)者”模型,兩個(gè)線程通過(guò)一個(gè)隊(duì)列進(jìn)行交流,為了互斥訪問(wèn)這個(gè)隊(duì)列,自然要加一把鎖(condition對(duì)象跟Lock對(duì)象差不多,不過(guò)多了等待和通知的功能);生產(chǎn)者每次產(chǎn)生一個(gè)隨機(jī)數(shù)并扔進(jìn)隊(duì)列,然后休息一個(gè)隨機(jī)時(shí)間,消費(fèi)者每次從隊(duì)列取一個(gè)數(shù);而子進(jìn)程中的主線程要負(fù)責(zé)接收信號(hào),以便讓整個(gè)過(guò)程優(yōu)雅的結(jié)束。代碼如下:
#!/usr/bin/python
#-*-coding:utf-8-*-
import time, multiprocessing, signal, threading, random, time, Queue
class Master(multiprocessing.Process):
def __init__(self):
super(Master,self).__init__();
signal.signal(signal.SIGTERM, self.handler);
#這個(gè)變量要傳入線程用于控制線程運(yùn)行,為什么用dict?充分利用線程間共享資源的特點(diǎn)
#因?yàn)榭勺儗?duì)象按引用傳遞,標(biāo)量是傳值的,不信寫成self.live = true試試
self.live = {'stat':True};
def handler(self, signum, frame):
print 'signal:',signum;
self.live['stat'] = 0; #置這個(gè)變量為0,通知子線程可以“收工”了
def run(self):
print 'PID:',self.pid;
cond = threading.Condition(threading.Lock()); #創(chuàng)建一個(gè)condition對(duì)象,用于子線程交互
q = Queue.Queue(); #一個(gè)隊(duì)列
sender = Sender(cond, self.live, q); #傳入共享資源
geter = Geter(cond, self.live, q);
sender.start(); #啟動(dòng)線程
geter.start();
signal.pause(); #主線程睡眠并等待信號(hào)
while threading.activeCount()-1: #主線程收到信號(hào)并被喚醒后,檢查還有多少線程活著(除掉自己)
time.sleep(2); #再睡眠等待,確保子線程都安全的結(jié)束
print 'checking live', threading.activeCount();
print 'mater gone';
class Sender(threading.Thread):
def __init__(self, cond, live, queue):
super(Sender, self).__init__(name='sender');
self.cond = cond;
self.queue = queue;
self.live = live
def run(self):
cond = self.cond;
while self.live['stat']: #檢查這個(gè)進(jìn)程內(nèi)的“全局”變量,為真就繼續(xù)運(yùn)行
cond.acquire(); #獲得鎖,以便控制隊(duì)列
i = random.randint(0,100);
self.queue.put(i,False);
if not self.queue.full():
print 'sender add:',i;
cond.notify(); #喚醒等待鎖的其他線程
cond.release(); #釋放鎖
time.sleep(random.randint(1,3));
print 'sender done'
class Geter(threading.Thread):
def __init__(self, cond, live, queue):
super(Geter, self).__init__(name='geter');
self.cond = cond;
self.queue = queue;
self.live = live
def run(self):
cond = self.cond;
while self.live['stat']:
cond.acquire();
if not self.queue.empty():
i = self.queue.get();
print 'geter get:',i;
cond.wait(3);
cond.release();
time.sleep(random.randint(1,3));
print 'geter done'
if __name__ == '__main__':
master = Master();
master.start(); #啟動(dòng)子進(jìn)程
需要注意的地方是,在Master的run方法中sender.start()和geter.start()之后,按常理應(yīng)該接著調(diào)用sender.join()和geter.join(),讓主線程等待子線程結(jié)束,前面說(shuō)的join的陷阱就在這里,join將主線程阻塞(blocking)住了,主線程無(wú)法再捕捉信號(hào),剛開(kāi)始研究這塊時(shí)還以為信號(hào)處理函數(shù)寫錯(cuò)了。網(wǎng)上討論比較少,這里說(shuō)的比較清楚http://stackoverflow.com/questions/631441/interruptible-thread-join-in-python,http://www.gossamer-threads.com/lists/python/python/541403
參考:
《python核心編程》
《python manual》
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進(jìn)階經(jīng)典教程》、《Python+MySQL數(shù)據(jù)庫(kù)程序設(shè)計(jì)入門教程》及《Python常見(jiàn)數(shù)據(jù)庫(kù)操作技巧匯總》
希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。
- 在Python中通過(guò)threading模塊定義和調(diào)用線程的方法
- Python+threading模塊對(duì)單個(gè)接口進(jìn)行并發(fā)測(cè)試
- Python 多線程,threading模塊,創(chuàng)建子線程的兩種方式示例
- Python多線程模塊Threading用法示例小結(jié)
- Python線程threading模塊用法詳解
- Python threading模塊condition原理及運(yùn)行流程詳解
- Python 多線程之threading 模塊的使用
- Python多線程編程之threading模塊詳解
- python threading模塊的使用指南
- Python常用模塊之threading和Thread模塊及線程通信
相關(guān)文章
Python3.2中的字符串函數(shù)學(xué)習(xí)總結(jié)
這篇文章主要介紹了Python3.2中的字符串函數(shù)學(xué)習(xí)總結(jié),本文講解了格式化類方法、查找 & 替換類方法、拆分 & 組合類方法等內(nèi)容,需要的朋友可以參考下2015-04-04
Python數(shù)據(jù)結(jié)構(gòu)之優(yōu)先級(jí)隊(duì)列queue用法詳解
queue庫(kù)提供了一個(gè)適用于多線程編程的先進(jìn)先出(FIFO)數(shù)據(jù)結(jié)構(gòu),可以用來(lái)在生產(chǎn)者與消費(fèi)者線程之間安全地傳遞消息或其他數(shù)據(jù),它會(huì)為調(diào)用者處理鎖定,使多個(gè)線程可以安全而更容易地處理同一個(gè)Queue實(shí)例.Queue的大小可能受限,以限制內(nèi)存使用或處理,需要的朋友可以參考下2021-05-05
Python提取PDF中的圖片的實(shí)現(xiàn)示例
本文主要介紹了Python提取PDF中的圖片的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
tensorflow: 查看 tensor詳細(xì)數(shù)值方法
今天小編就為大家分享一篇tensorflow: 查看 tensor詳細(xì)數(shù)值方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06
python 自動(dòng)刷新網(wǎng)頁(yè)的兩種方法
這篇文章主要介紹了python 自動(dòng)刷新網(wǎng)頁(yè)的兩種方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
Python登錄QQ郵箱發(fā)送郵件的實(shí)現(xiàn)示例
本文主要介紹了Python登錄QQ郵箱發(fā)送郵件的實(shí)現(xiàn)示例,主要就是三步,登錄郵件、寫郵件內(nèi)容、發(fā)送,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧<BR>2023-08-08
Python 使用pandas實(shí)現(xiàn)查詢和統(tǒng)計(jì)示例詳解
這篇文章主要為大家介紹了Python 使用pandas實(shí)現(xiàn)查詢和統(tǒng)計(jì)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
python+django+mysql開(kāi)發(fā)實(shí)戰(zhàn)(附demo)
本文主要介紹了python+django+mysql開(kāi)發(fā)實(shí)戰(zhàn)(附demo),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01

