欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解決windows下python3使用multiprocessing.Pool出現(xiàn)的問題

 更新時(shí)間:2020年04月08日 10:38:16   作者:一吱大懶蟲  
這篇文章主要介紹了解決windows下python3使用multiprocessing.Pool出現(xiàn)的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧

例如:

from multiprocessing import Pool

def f(x):
return x*x
pool = Pool(processes=4)
r=pool.map(f, range(100)) 
pool.close() 
pool.join() 

在spyder里運(yùn)行直接沒反應(yīng);在shell窗口里,直接報(bào)錯(cuò),如下:

Process SpawnPoolWorker-15:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get
return ForkingPickler.loads(res)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

解決:

Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地變量都復(fù)制一份,因此可以使用任意的全局變量;在Windows下面,多進(jìn)程是通過啟動(dòng)新進(jìn)程完成的,所有的全局變量都是重新初始化的,在運(yùn)行過程中動(dòng)態(tài)生成、修改過的全局變量是不能使用的。

multiprocessing內(nèi)部使用pickling傳遞map的參數(shù)到不同的進(jìn)程,當(dāng)傳遞一個(gè)函數(shù)或類時(shí),pickling將函數(shù)或者類用所在模塊+函數(shù)/類名的方式表示,如果對(duì)端的Python進(jìn)程無法在對(duì)應(yīng)的模塊中找到相應(yīng)的函數(shù)或者類,就會(huì)出錯(cuò)。

當(dāng)你在Interactive Console當(dāng)中創(chuàng)建函數(shù)的時(shí)候,這個(gè)函數(shù)是動(dòng)態(tài)添加到__main__模塊中的,在重新啟動(dòng)的新進(jìn)程當(dāng)中不存在,所以會(huì)出錯(cuò)。

當(dāng)不在Console中,而是在獨(dú)立Python文件中運(yùn)行時(shí),你會(huì)遇到另一個(gè)問題:由于你下面調(diào)用multiprocessing的代碼沒有保護(hù),在新進(jìn)程加載這個(gè)模塊的時(shí)候會(huì)重新執(zhí)行這段代碼,創(chuàng)建出新的multiprocessing池,無限調(diào)用下去。

解決這個(gè)問題的方法是永遠(yuǎn)把實(shí)際執(zhí)行功能的代碼加入到帶保護(hù)的區(qū)域中:if __name__ == '__mian__':

補(bǔ)充知識(shí):multiprocessing Pool的異常處理問題

multiprocessing.Pool開發(fā)多進(jìn)程程序時(shí),在某個(gè)子進(jìn)程執(zhí)行函數(shù)使用了mysql-python連接數(shù)據(jù)庫(kù),

由于程序設(shè)計(jì)問題,沒有捕獲到所有異常,導(dǎo)致某個(gè)異常錯(cuò)誤直接拋到Pool中,導(dǎo)致整個(gè)Pool掛了,其異常錯(cuò)誤如下所示:

Exception in thread Thread-3:
Traceback (most recent call last):
 File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
 self.run()
 File "/usr/lib64/python2.7/threading.py", line 765, in run
 self.__target(*self.__args, **self.__kwargs)
 File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results
 task = get()
 File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__
 'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>, 
(2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))

本文檔基于以上問題對(duì)multiprocessing.Pool以及python-mysql-connector的源碼實(shí)現(xiàn)進(jìn)行分析,以定位具體的錯(cuò)誤原因。解決方法其實(shí)很簡(jiǎn)單,不要讓異常拋到Pool里就行。

問題產(chǎn)生場(chǎng)景

python 版本centos7.3自帶的2.7.5版本,或者最新的python-2.7.14

mysql-connector庫(kù),版本是2.0及以上,可到官網(wǎng)下載最新版:mysql-connector

問題發(fā)生的code其實(shí)可以簡(jiǎn)化為如下所示:

from multiprocessing import Pool, log_to_stderr
import logging
import mysql.connector

# open multiprocessing lib log
log_to_stderr(level=logging.DEBUG)

def func():
 raise mysql.connector.Error("demo test", 100)

if __name__ == "__main__":
 p = Pool(3)
 res = p.apply_async(func)
 res.get()

所以解決問題很簡(jiǎn)單,在func里加個(gè)try-except就可以了。但是如果你好奇為什么為出現(xiàn)AttributeError的異常,那么可以繼續(xù)往下看。

Multiprocessing.Pool的實(shí)現(xiàn)

通過查看源碼,大致上multiprocess.Pool的實(shí)現(xiàn)如下圖所示:

當(dāng)我們執(zhí)行以下語句時(shí),主進(jìn)程會(huì)創(chuàng)建三個(gè)子線程:_handle_workers、_handle_results、_handle_tasks;同時(shí)會(huì)創(chuàng)建Pool(n)個(gè)數(shù)的worker子進(jìn)程。主進(jìn)程與各個(gè)worker子進(jìn)程間的通信使用內(nèi)部定義的Queue,其實(shí)就是Pipe管道通信,如上圖的_taskqueue、_inqueue和_outqueue。

p = Pool(3)
res = p.apply_async(func)
res.get()

這三個(gè)子線程的作用是:

1. handle_workers線程管理worker進(jìn)程,使進(jìn)程池維持Pool(n)個(gè)worker進(jìn)程數(shù);

2. handle_tasks線程將用戶的任務(wù)(包括job_id, 處理函數(shù)func等信息)傳遞到_inqueue中,子進(jìn)程們競(jìng)爭(zhēng)獲取任務(wù),然后運(yùn)行相關(guān)函數(shù),將結(jié)果放在_outqueue中,然后繼續(xù)監(jiān)聽tasksqueue的任務(wù)列表。其實(shí)就是典型的生產(chǎn)消費(fèi)問題。

3. handle_results線程監(jiān)聽_outQqueue的內(nèi)容,有就拿到,通過字典_cache找到對(duì)應(yīng)的job,將結(jié)果存儲(chǔ)在*Result對(duì)象中,釋放該job的信號(hào)量,表明job執(zhí)行完畢。此后,就可以通過*Result.get()函數(shù)獲取執(zhí)行結(jié)果。

當(dāng)我們調(diào)用p.apply_async 或者p.map時(shí),其實(shí)就是創(chuàng)建了AsyncResult或者M(jìn)apResult對(duì)象,然后將task放到_taskqueue中;調(diào)用*Result.get()方法等待task被worker子進(jìn)程執(zhí)行完成,獲取執(zhí)行結(jié)果。

在知道了multprocess.Pool的實(shí)現(xiàn)邏輯后,現(xiàn)在我們來探索下,當(dāng)func將異常拋出時(shí),Pool的worker是怎么處理的。下面的代碼是pool.worker工作子進(jìn)程的核心執(zhí)行函數(shù)的簡(jiǎn)化版。

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
 ...
 while xxx:
  try:
   task = get()
  except:
   ...

  job, i, func, args, kwds = task
  try:
   result = (True, func(*args, **kwds))
  except Exception, e:
   result = (False, e)
  ...
  try:
   put((job, i, result))
  except Exception, e:
   ...

從代碼中可以看到,在執(zhí)行func時(shí),如果func拋出異常,那么worker會(huì)將異常對(duì)象直接放入到_outqueue中,然后等待下一個(gè)task。也就是說,worker是可以處理異常的。

那么接下來看看_handle_result線程是怎么處理worker發(fā)過來的結(jié)果的。如下所示:

@staticmethod
def _handle_results(outqueue, get, cache):
 while 1:
  try:
   task = get()
  except (IOError, EOFError):
   return
  ...

上述代碼為_handle_result的主要處理邏輯,可以看到,它只對(duì) IOError, EOFError進(jìn)行了處理,也就是說,如果在get()時(shí)發(fā)生了其它異常錯(cuò)誤,將導(dǎo)致_handle_result這個(gè)線程直接退出(而事實(shí)上的確如此)。既然_handle_result退出了,那么就沒有動(dòng)作來觸發(fā)_cache中*Result對(duì)象釋放信號(hào)量,則用戶的執(zhí)行流程就一直處于wait狀態(tài)。這樣,用戶主進(jìn)程就會(huì)一直卡在get()中,導(dǎo)致主流程執(zhí)行不下去。

我們通過打開multiprocessing庫(kù)的日志(log_to_stderr(level=logging.DEBUG)),然后修改multiprocessing.Pool中_handel_result的代碼,加上一個(gè)except Exception,然后運(yùn)行文章一開始的的異常代碼,如下所示:

# multiprocessing : pool.py
#
class Pool(object):
 @staticmethod
 def _handle_results(outqueue, get, cache):
  while 1:
   try:
    task = get()
   except (IOError, EOFError):
    return
   except Exception:
    debug("handle_result not catch Exceptions.")
    return
  ...

控制臺(tái)如果輸出"handle_result not catch Exceptions.",表明_handle_results沒有catch到所有的異常。而實(shí)際上,真的是由于task = get()這句話拋異常了。

那么,_outqueue.get()方法做了什么。深入查看源碼,發(fā)現(xiàn)get()方法其實(shí)就是os.pipe的read/write方法,但是做了一些處理吧。其內(nèi)部實(shí)現(xiàn)大致如下:

def Pipe(duplex=True):
 ...
 fd1, fd2 = os.pipe()
 c1 = _multiprocessing.Connection(fd1, writable=False) # get
 c2 = _multiprocessing.Connection(fd2, readable=False) # put
 return c1, c2

_multiprocessing.Connection內(nèi)部使用了C的實(shí)現(xiàn),就不再深入了,否則會(huì)就越來越復(fù)雜了。它內(nèi)部應(yīng)該使用了pickle庫(kù),在put時(shí)將對(duì)象實(shí)例pickle(也就是序列化吧),然后在get時(shí)將實(shí)例unpikcle,重新生成實(shí)例對(duì)象。具體可查看python官方文檔關(guān)于pickle的介紹(包括object可pickle的條件以及在unpickle時(shí)調(diào)用的方法等)。不管如何,就是實(shí)例在get,即unpickle的過程出錯(cuò)了。

'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: 'int' object has no attribute 'encode'

從上述錯(cuò)誤日志中可以看到,表明在重構(gòu)時(shí)msg參數(shù)傳入了int類型變量。就是說在unpickle階段,Mysql Error重新實(shí)例化時(shí)執(zhí)行了__init__()方法,但是傳參錯(cuò)誤了。為了驗(yàn)證這一現(xiàn)象,我將MySql Error的__init__()進(jìn)行簡(jiǎn)化,最終確認(rèn)到self.args的賦值上,即Exception及其子類在unpickle時(shí)會(huì)調(diào)用__init__()方法,并將self.args作為參數(shù)列表傳遞給__init__()。

通過以下代碼可以簡(jiǎn)單的驗(yàn)證問題:

import os
from multiprocessing import Pipe

class DemoError(Exception):

 def __init__(msg, errno):
  print "msg: %s, errno: %s" % (msg, errno)
  self.args = ("aa", "bb")

def func():
 raise DemoError("demo test", 100)

r, w = Pipe(duplex=False)
try:
 result = (True, func(1))
except Exception, e:
 result = (False, e)

print "send result"
w.send(result)
print "get result"
res = r.recv()
print "finished."

日志會(huì)在recv調(diào)用時(shí)打印 msg: aa, errno: bb,表明recv異常類Exception時(shí)會(huì)將self.args作為參數(shù)傳入init()函數(shù)中。而Mysql的Error類重寫self.args變量,而且順序不對(duì),導(dǎo)致msg在執(zhí)行編碼時(shí)出錯(cuò)。MySql Error的實(shí)現(xiàn)簡(jiǎn)化如下:

class Error(Exception):
 def __init__(self, msg=None, errno=None, values=None, sqlstate=None):
  super(Error, self).__init__()
  ...
  if self.msg and self.errno != -1:
   fields = {
    'errno': self.errno,
    'msg': self.msg.encode('utf-8') if PY2 else self.msg
   }
  ...
  self.args = (self.errno, self._full_msg, self.sqlstate)

可以看到,mysql Error中的self.args與__init__(msg, errno, values, sqlstate)的順序不一,因此self.args第一個(gè)參數(shù)errno傳給了msg,導(dǎo)致AttributeError。至于self.args是什么,簡(jiǎn)單查了下,是Exception類中定義的,一般用__str__或者_(dá)_repr__方法的輸出,python官方文檔不建議overwrite。

總結(jié)

好吧,說了這么多,通過問題的追蹤,我們也基本上了解清楚multiprocessing.Pool庫(kù)的實(shí)現(xiàn)了。事實(shí)上,也很難說是誰的bug,是兩者共同作用下出現(xiàn)的。不管如何,希望在用到multiprocessing庫(kù)時(shí),特別與Pipe相關(guān)時(shí),謹(jǐn)慎點(diǎn)使用,最好的不要讓異常跑到multiprocess中處理,應(yīng)該在func中將所有的異常處理掉,如果有自己定于的異常類,請(qǐng)最好保證self.args的順序與__init__()的順序一致。同時(shí),網(wǎng)上好像也聽說使用multprocessing和subprocess庫(kù)出現(xiàn)問題,或許也是這個(gè)異常拋出的問題,畢竟suprocessError定義與Exception好像有些區(qū)別。

以上這篇解決windows下python3使用multiprocessing.Pool出現(xiàn)的問題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Python3實(shí)現(xiàn)將文件樹中所有文件和子目錄歸檔到tar壓縮文件的方法

    Python3實(shí)現(xiàn)將文件樹中所有文件和子目錄歸檔到tar壓縮文件的方法

    這篇文章主要介紹了Python3實(shí)現(xiàn)將文件樹中所有文件和子目錄歸檔到tar壓縮文件的方法,涉及Python3使用tarfile模塊實(shí)現(xiàn)tar壓縮文件的技巧,需要的朋友可以參考下
    2015-05-05
  • Python實(shí)現(xiàn)的連接mssql數(shù)據(jù)庫(kù)操作示例

    Python實(shí)現(xiàn)的連接mssql數(shù)據(jù)庫(kù)操作示例

    這篇文章主要介紹了Python實(shí)現(xiàn)的連接mssql數(shù)據(jù)庫(kù)操作,結(jié)合實(shí)例形式分析了Python安裝pymssql模塊以及基于pymssql模塊連接sql2008 R2數(shù)據(jù)庫(kù)的具體操作技巧,需要的朋友可以參考下
    2018-08-08
  • C# DataGridView行列轉(zhuǎn)換的具體實(shí)現(xiàn)

    C# DataGridView行列轉(zhuǎn)換的具體實(shí)現(xiàn)

    本文主要介紹了C# DataGridView行列轉(zhuǎn)換的具體實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • numpy.sum()的使用詳解

    numpy.sum()的使用詳解

    這篇文章主要介紹了numpy.sum()的使用詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • 用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池

    用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池

    這篇文章主要介紹了用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程池,通過這個(gè)小程序可以幫助更好地理解Python中線程的運(yùn)行機(jī)制,需要的朋友可以參考下
    2015-04-04
  • 使用python+requests+pytest實(shí)現(xiàn)接口自動(dòng)化

    使用python+requests+pytest實(shí)現(xiàn)接口自動(dòng)化

    這篇文章主要介紹了使用python+requests+pytest實(shí)現(xiàn)接口自動(dòng)化,在當(dāng)前互聯(lián)網(wǎng)產(chǎn)品迭代頻繁的背景下,回歸測(cè)試的時(shí)間越來越少,但接口自動(dòng)化測(cè)試因其實(shí)現(xiàn)簡(jiǎn)單、維護(hù)成本低,容易提高覆蓋率等特點(diǎn),越來越受重視,需要的朋友可以參考下
    2023-08-08
  • Python使用jsonpath_ng的方法

    Python使用jsonpath_ng的方法

    json path_ng 是 Python 中一款解析和操作 JSON 數(shù)據(jù)的工具,它可以通過 JSONPath 語法來對(duì) JSON 數(shù)據(jù)進(jìn)行定位和提取,其用法類似于 XPath 語法對(duì) XML 數(shù)據(jù)進(jìn)行定位,這篇文章主要介紹了Python使用jsonpath_ng的方法,需要的朋友可以參考下
    2023-12-12
  • Python實(shí)現(xiàn)求最大公約數(shù)及判斷素?cái)?shù)的方法

    Python實(shí)現(xiàn)求最大公約數(shù)及判斷素?cái)?shù)的方法

    這篇文章主要介紹了Python實(shí)現(xiàn)求最大公約數(shù)及判斷素?cái)?shù)的方法,涉及Python算數(shù)運(yùn)算的相關(guān)技巧,需要的朋友可以參考下
    2015-05-05
  • 使用python 進(jìn)行區(qū)間取值的方法

    使用python 進(jìn)行區(qū)間取值的方法

    這篇文章主要介紹了使用python 進(jìn)行區(qū)間取值的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-07-07
  • python 字典的打印實(shí)現(xiàn)

    python 字典的打印實(shí)現(xiàn)

    這篇文章主要介紹了python 字典的打印實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09

最新評(píng)論