解決windows下python3使用multiprocessing.Pool出現(xiàn)的問題
例如:
from multiprocessing import Pool def f(x): return x*x pool = Pool(processes=4) r=pool.map(f, range(100)) pool.close() pool.join()
在spyder里運(yùn)行直接沒反應(yīng);在shell窗口里,直接報(bào)錯(cuò),如下:
Process SpawnPoolWorker-15: Traceback (most recent call last): File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr self.run() File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker task = get() File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get return ForkingPickler.loads(res) AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
解決:
Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地變量都復(fù)制一份,因此可以使用任意的全局變量;在Windows下面,多進(jìn)程是通過啟動(dòng)新進(jìn)程完成的,所有的全局變量都是重新初始化的,在運(yùn)行過程中動(dòng)態(tài)生成、修改過的全局變量是不能使用的。
multiprocessing內(nèi)部使用pickling傳遞map的參數(shù)到不同的進(jìn)程,當(dāng)傳遞一個(gè)函數(shù)或類時(shí),pickling將函數(shù)或者類用所在模塊+函數(shù)/類名的方式表示,如果對(duì)端的Python進(jìn)程無法在對(duì)應(yīng)的模塊中找到相應(yīng)的函數(shù)或者類,就會(huì)出錯(cuò)。
當(dāng)你在Interactive Console當(dāng)中創(chuàng)建函數(shù)的時(shí)候,這個(gè)函數(shù)是動(dòng)態(tài)添加到__main__模塊中的,在重新啟動(dòng)的新進(jìn)程當(dāng)中不存在,所以會(huì)出錯(cuò)。
當(dāng)不在Console中,而是在獨(dú)立Python文件中運(yùn)行時(shí),你會(huì)遇到另一個(gè)問題:由于你下面調(diào)用multiprocessing的代碼沒有保護(hù),在新進(jìn)程加載這個(gè)模塊的時(shí)候會(huì)重新執(zhí)行這段代碼,創(chuàng)建出新的multiprocessing池,無限調(diào)用下去。
解決這個(gè)問題的方法是永遠(yuǎn)把實(shí)際執(zhí)行功能的代碼加入到帶保護(hù)的區(qū)域中:if __name__ == '__mian__':
補(bǔ)充知識(shí):multiprocessing Pool的異常處理問題
multiprocessing.Pool開發(fā)多進(jìn)程程序時(shí),在某個(gè)子進(jìn)程執(zhí)行函數(shù)使用了mysql-python連接數(shù)據(jù)庫(kù),
由于程序設(shè)計(jì)問題,沒有捕獲到所有異常,導(dǎo)致某個(gè)異常錯(cuò)誤直接拋到Pool中,導(dǎo)致整個(gè)Pool掛了,其異常錯(cuò)誤如下所示:
Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 765, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results task = get() File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__ 'msg': self.msg.encode('utf8') if PY2 else self.msg AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>, (2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))
本文檔基于以上問題對(duì)multiprocessing.Pool以及python-mysql-connector的源碼實(shí)現(xiàn)進(jìn)行分析,以定位具體的錯(cuò)誤原因。解決方法其實(shí)很簡(jiǎn)單,不要讓異常拋到Pool里就行。
問題產(chǎn)生場(chǎng)景
python 版本centos7.3自帶的2.7.5版本,或者最新的python-2.7.14
mysql-connector庫(kù),版本是2.0及以上,可到官網(wǎng)下載最新版:mysql-connector
問題發(fā)生的code其實(shí)可以簡(jiǎn)化為如下所示:
from multiprocessing import Pool, log_to_stderr import logging import mysql.connector # open multiprocessing lib log log_to_stderr(level=logging.DEBUG) def func(): raise mysql.connector.Error("demo test", 100) if __name__ == "__main__": p = Pool(3) res = p.apply_async(func) res.get()
所以解決問題很簡(jiǎn)單,在func里加個(gè)try-except就可以了。但是如果你好奇為什么為出現(xiàn)AttributeError的異常,那么可以繼續(xù)往下看。
Multiprocessing.Pool的實(shí)現(xiàn)
通過查看源碼,大致上multiprocess.Pool的實(shí)現(xiàn)如下圖所示:
當(dāng)我們執(zhí)行以下語句時(shí),主進(jìn)程會(huì)創(chuàng)建三個(gè)子線程:_handle_workers、_handle_results、_handle_tasks;同時(shí)會(huì)創(chuàng)建Pool(n)個(gè)數(shù)的worker子進(jìn)程。主進(jìn)程與各個(gè)worker子進(jìn)程間的通信使用內(nèi)部定義的Queue,其實(shí)就是Pipe管道通信,如上圖的_taskqueue、_inqueue和_outqueue。
p = Pool(3) res = p.apply_async(func) res.get()
這三個(gè)子線程的作用是:
1. handle_workers線程管理worker進(jìn)程,使進(jìn)程池維持Pool(n)個(gè)worker進(jìn)程數(shù);
2. handle_tasks線程將用戶的任務(wù)(包括job_id, 處理函數(shù)func等信息)傳遞到_inqueue中,子進(jìn)程們競(jìng)爭(zhēng)獲取任務(wù),然后運(yùn)行相關(guān)函數(shù),將結(jié)果放在_outqueue中,然后繼續(xù)監(jiān)聽tasksqueue的任務(wù)列表。其實(shí)就是典型的生產(chǎn)消費(fèi)問題。
3. handle_results線程監(jiān)聽_outQqueue的內(nèi)容,有就拿到,通過字典_cache找到對(duì)應(yīng)的job,將結(jié)果存儲(chǔ)在*Result對(duì)象中,釋放該job的信號(hào)量,表明job執(zhí)行完畢。此后,就可以通過*Result.get()函數(shù)獲取執(zhí)行結(jié)果。
當(dāng)我們調(diào)用p.apply_async 或者p.map時(shí),其實(shí)就是創(chuàng)建了AsyncResult或者M(jìn)apResult對(duì)象,然后將task放到_taskqueue中;調(diào)用*Result.get()方法等待task被worker子進(jìn)程執(zhí)行完成,獲取執(zhí)行結(jié)果。
在知道了multprocess.Pool的實(shí)現(xiàn)邏輯后,現(xiàn)在我們來探索下,當(dāng)func將異常拋出時(shí),Pool的worker是怎么處理的。下面的代碼是pool.worker工作子進(jìn)程的核心執(zhí)行函數(shù)的簡(jiǎn)化版。
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): ... while xxx: try: task = get() except: ... job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) ... try: put((job, i, result)) except Exception, e: ...
從代碼中可以看到,在執(zhí)行func時(shí),如果func拋出異常,那么worker會(huì)將異常對(duì)象直接放入到_outqueue中,然后等待下一個(gè)task。也就是說,worker是可以處理異常的。
那么接下來看看_handle_result線程是怎么處理worker發(fā)過來的結(jié)果的。如下所示:
@staticmethod def _handle_results(outqueue, get, cache): while 1: try: task = get() except (IOError, EOFError): return ...
上述代碼為_handle_result的主要處理邏輯,可以看到,它只對(duì) IOError, EOFError進(jìn)行了處理,也就是說,如果在get()時(shí)發(fā)生了其它異常錯(cuò)誤,將導(dǎo)致_handle_result這個(gè)線程直接退出(而事實(shí)上的確如此)。既然_handle_result退出了,那么就沒有動(dòng)作來觸發(fā)_cache中*Result對(duì)象釋放信號(hào)量,則用戶的執(zhí)行流程就一直處于wait狀態(tài)。這樣,用戶主進(jìn)程就會(huì)一直卡在get()中,導(dǎo)致主流程執(zhí)行不下去。
我們通過打開multiprocessing庫(kù)的日志(log_to_stderr(level=logging.DEBUG)),然后修改multiprocessing.Pool中_handel_result的代碼,加上一個(gè)except Exception,然后運(yùn)行文章一開始的的異常代碼,如下所示:
# multiprocessing : pool.py # class Pool(object): @staticmethod def _handle_results(outqueue, get, cache): while 1: try: task = get() except (IOError, EOFError): return except Exception: debug("handle_result not catch Exceptions.") return ...
控制臺(tái)如果輸出"handle_result not catch Exceptions.",表明_handle_results沒有catch到所有的異常。而實(shí)際上,真的是由于task = get()這句話拋異常了。
那么,_outqueue.get()方法做了什么。深入查看源碼,發(fā)現(xiàn)get()方法其實(shí)就是os.pipe的read/write方法,但是做了一些處理吧。其內(nèi)部實(shí)現(xiàn)大致如下:
def Pipe(duplex=True): ... fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) # get c2 = _multiprocessing.Connection(fd2, readable=False) # put return c1, c2
_multiprocessing.Connection內(nèi)部使用了C的實(shí)現(xiàn),就不再深入了,否則會(huì)就越來越復(fù)雜了。它內(nèi)部應(yīng)該使用了pickle庫(kù),在put時(shí)將對(duì)象實(shí)例pickle(也就是序列化吧),然后在get時(shí)將實(shí)例unpikcle,重新生成實(shí)例對(duì)象。具體可查看python官方文檔關(guān)于pickle的介紹(包括object可pickle的條件以及在unpickle時(shí)調(diào)用的方法等)。不管如何,就是實(shí)例在get,即unpickle的過程出錯(cuò)了。
'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: 'int' object has no attribute 'encode'
從上述錯(cuò)誤日志中可以看到,表明在重構(gòu)時(shí)msg參數(shù)傳入了int類型變量。就是說在unpickle階段,Mysql Error重新實(shí)例化時(shí)執(zhí)行了__init__()方法,但是傳參錯(cuò)誤了。為了驗(yàn)證這一現(xiàn)象,我將MySql Error的__init__()進(jìn)行簡(jiǎn)化,最終確認(rèn)到self.args的賦值上,即Exception及其子類在unpickle時(shí)會(huì)調(diào)用__init__()方法,并將self.args作為參數(shù)列表傳遞給__init__()。
通過以下代碼可以簡(jiǎn)單的驗(yàn)證問題:
import os from multiprocessing import Pipe class DemoError(Exception): def __init__(msg, errno): print "msg: %s, errno: %s" % (msg, errno) self.args = ("aa", "bb") def func(): raise DemoError("demo test", 100) r, w = Pipe(duplex=False) try: result = (True, func(1)) except Exception, e: result = (False, e) print "send result" w.send(result) print "get result" res = r.recv() print "finished."
日志會(huì)在recv調(diào)用時(shí)打印 msg: aa, errno: bb,表明recv異常類Exception時(shí)會(huì)將self.args作為參數(shù)傳入init()函數(shù)中。而Mysql的Error類重寫self.args變量,而且順序不對(duì),導(dǎo)致msg在執(zhí)行編碼時(shí)出錯(cuò)。MySql Error的實(shí)現(xiàn)簡(jiǎn)化如下:
class Error(Exception): def __init__(self, msg=None, errno=None, values=None, sqlstate=None): super(Error, self).__init__() ... if self.msg and self.errno != -1: fields = { 'errno': self.errno, 'msg': self.msg.encode('utf-8') if PY2 else self.msg } ... self.args = (self.errno, self._full_msg, self.sqlstate)
可以看到,mysql Error中的self.args與__init__(msg, errno, values, sqlstate)的順序不一,因此self.args第一個(gè)參數(shù)errno傳給了msg,導(dǎo)致AttributeError。至于self.args是什么,簡(jiǎn)單查了下,是Exception類中定義的,一般用__str__或者_(dá)_repr__方法的輸出,python官方文檔不建議overwrite。
總結(jié)
好吧,說了這么多,通過問題的追蹤,我們也基本上了解清楚multiprocessing.Pool庫(kù)的實(shí)現(xiàn)了。事實(shí)上,也很難說是誰的bug,是兩者共同作用下出現(xiàn)的。不管如何,希望在用到multiprocessing庫(kù)時(shí),特別與Pipe相關(guān)時(shí),謹(jǐn)慎點(diǎn)使用,最好的不要讓異常跑到multiprocess中處理,應(yīng)該在func中將所有的異常處理掉,如果有自己定于的異常類,請(qǐng)最好保證self.args的順序與__init__()的順序一致。同時(shí),網(wǎng)上好像也聽說使用multprocessing和subprocess庫(kù)出現(xiàn)問題,或許也是這個(gè)異常拋出的問題,畢竟suprocessError定義與Exception好像有些區(qū)別。
以上這篇解決windows下python3使用multiprocessing.Pool出現(xiàn)的問題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python3實(shí)現(xiàn)將文件樹中所有文件和子目錄歸檔到tar壓縮文件的方法
這篇文章主要介紹了Python3實(shí)現(xiàn)將文件樹中所有文件和子目錄歸檔到tar壓縮文件的方法,涉及Python3使用tarfile模塊實(shí)現(xiàn)tar壓縮文件的技巧,需要的朋友可以參考下2015-05-05Python實(shí)現(xiàn)的連接mssql數(shù)據(jù)庫(kù)操作示例
這篇文章主要介紹了Python實(shí)現(xiàn)的連接mssql數(shù)據(jù)庫(kù)操作,結(jié)合實(shí)例形式分析了Python安裝pymssql模塊以及基于pymssql模塊連接sql2008 R2數(shù)據(jù)庫(kù)的具體操作技巧,需要的朋友可以參考下2018-08-08C# DataGridView行列轉(zhuǎn)換的具體實(shí)現(xiàn)
本文主要介紹了C# DataGridView行列轉(zhuǎn)換的具體實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池
這篇文章主要介紹了用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池,通過這個(gè)小程序可以幫助更好地理解Python中線程的運(yùn)行機(jī)制,需要的朋友可以參考下2015-04-04使用python+requests+pytest實(shí)現(xiàn)接口自動(dòng)化
這篇文章主要介紹了使用python+requests+pytest實(shí)現(xiàn)接口自動(dòng)化,在當(dāng)前互聯(lián)網(wǎng)產(chǎn)品迭代頻繁的背景下,回歸測(cè)試的時(shí)間越來越少,但接口自動(dòng)化測(cè)試因其實(shí)現(xiàn)簡(jiǎn)單、維護(hù)成本低,容易提高覆蓋率等特點(diǎn),越來越受重視,需要的朋友可以參考下2023-08-08Python實(shí)現(xiàn)求最大公約數(shù)及判斷素?cái)?shù)的方法
這篇文章主要介紹了Python實(shí)現(xiàn)求最大公約數(shù)及判斷素?cái)?shù)的方法,涉及Python算數(shù)運(yùn)算的相關(guān)技巧,需要的朋友可以參考下2015-05-05