解決python ThreadPoolExecutor 線程池中的異常捕獲問(wèn)題
問(wèn)題
最近寫(xiě)了涉及線程池及線程的 python 腳本,運(yùn)行過(guò)程中發(fā)現(xiàn)一個(gè)有趣的現(xiàn)象,線程池中的工作線程出現(xiàn)問(wèn)題,引發(fā)了異常,但是主線程沒(méi)有捕獲異常,還在發(fā)現(xiàn) BUG 之前一度以為線程池代碼正常返回。
先說(shuō)重點(diǎn)
這里主要想介紹 python concurrent.futuresthread.ThreadPoolExecutor 線程池中的 worker 引發(fā)異常的時(shí)候,并不會(huì)直接向上拋起異常,而是需要主線程通過(guò)調(diào)用concurrent.futures.Future.exception(timeout=None) 方法主動(dòng)獲取 worker 的異常。
問(wèn)題重現(xiàn)及解決
引子
問(wèn)題主要由這樣一段代碼引起的:
def thread_executor(): logger.info("I am slave. I am working. I am going to sleep 3s") sleep(3) logger.info("Exit thread executor") def main(): thread_obj = threading.Thread(target=thread_executor) while True: logger.info("Master starts thread worker") try: # 工作線程由于某種異常而結(jié)束并退出了,想重啟工作線程的工作,但又不想重復(fù)創(chuàng)建線程 thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線程不能重復(fù)啟動(dòng) except Exception as e: logger.error("Master start thread error", exc_info=True) raise e logger.info("Master is going to sleep 5s") sleep(5)
上面這段代碼的功能如注釋中解釋的,主要要實(shí)現(xiàn)類似生產(chǎn)者消費(fèi)者的功能,工作線程一直去生產(chǎn)資源,主線程去消費(fèi)工作線程生產(chǎn)的資源。但是工作線程由于異常推出了,想重新啟動(dòng)生產(chǎn)工作。顯然,這個(gè)代碼會(huì)報(bào)錯(cuò)。
運(yùn)行結(jié)果:
thread: MainThread [INFO] Master starts thread worker thread: Thread-1 [INFO] I am slave. I am working. I am going to sleep 3s thread: MainThread [INFO] Master is going to sleep 5s thread: Thread-1 [INFO] Exit thread executor because of some exception thread: MainThread [INFO] Master starts thread worker thread: MainThread [ERROR] Master start thread error Traceback (most recent call last): File "xxx.py", line 47, in main thread_obj.start() File "E:\anaconda\lib\threading.py", line 843, in start raise RuntimeError("threads can only be started once") RuntimeError: threads can only be started once Traceback (most recent call last): File "xxx.py", line 56, in <module> main() File "xxx.py", line 50, in main raise e File "xxx.py", line 47, in main thread_obj.start() File "E:\anaconda\lib\threading.py", line 843, in start raise RuntimeError("threads can only be started once") RuntimeError: threads can only be started once
切入正題
然而腳本還有其他業(yè)務(wù)代碼要運(yùn)行,所以需要把上面的資源生產(chǎn)和消費(fèi)的代碼放到一個(gè)線程里完成,所以引入線程池來(lái)執(zhí)行這段代碼:
def thread_executor(): while True: logger.info("I am slave. I am working. I am going to sleep 3s") sleep(3) logger.info("Exit thread executor because of some exception") break def main(): thread_obj = threading.Thread(target=thread_executor) while True: logger.info("Master starts thread worker") # 工作線程由于某種異常而結(jié)束并退出了,想重啟工作線程的工作,但又不想重復(fù)創(chuàng)建線程 # 沒(méi)有想到這里會(huì)有異常 thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線程不能重復(fù)啟動(dòng) logger.info("Master is going to sleep 5s") sleep(5) def thread_pool_main(): thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor") logger.info("Master ThreadPool Executor starts thread worker") thread_obj.submit(main) while True: logger.info("Master ThreadPool Executor is going to sleep 5s") sleep(5) if __name__ == '__main__': thread_pool_main()
代碼運(yùn)行結(jié)果如下:
INFO [thread: MainThread] Master ThreadPool Executor starts thread worker INFO [thread: WorkExecutor_0] Master starts thread worker INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s INFO [thread: WorkExecutor_0] Master is going to sleep 5s INFO [thread: Thread-1] Exit thread executor because of some exception INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: WorkExecutor_0] Master starts thread worker INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s ... ...
顯然,由上面的結(jié)果,在線程池 worker 執(zhí)行到 INFO [thread: WorkExecutor_0] Master starts thread worker 的時(shí)候,是會(huì)有異常產(chǎn)生的,但是整個(gè)代碼并沒(méi)有拋棄任何異常。
解決方法
發(fā)現(xiàn)上面的 bug 后,想在線程池 worker 出錯(cuò)的時(shí)候,把異常記錄到日志。查閱資料,要獲取線程池的異常信息,需要調(diào)用 concurrent.futures.Future.exception(timeout=None) 方法,為了記錄日志,這里加了線程池執(zhí)行結(jié)束的回調(diào)函數(shù)。同時(shí),日志中記錄異常信息,用了 logging.exception() 方法。
def thread_executor(): while True: logger.info("I am slave. I am working. I am going to sleep 3s") sleep(3) logger.info("Exit thread executor because of some exception") break def main(): thread_obj = threading.Thread(target=thread_executor) while True: logger.info("Master starts thread worker") # 工作線程由于某種異常而結(jié)束并退出了,想重啟工作線程的工作,但又不想重復(fù)創(chuàng)建線程 # 沒(méi)有想到這里會(huì)有異常 thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線程不能重復(fù)啟動(dòng) logger.info("Master is going to sleep 5s") sleep(5) def thread_pool_callback(worker): logger.info("called thread pool executor callback function") worker_exception = worker.exception() if worker_exception: logger.exception("Worker return exception: {}".format(worker_exception)) def thread_pool_main(): thread_obj = ThreadPoolExecutor(max_workers=1, thread_name_prefix="WorkExecutor") logger.info("Master ThreadPool Executor starts thread worker") thread_pool_exc = thread_obj.submit(main) thread_pool_exc.add_done_callback(thread_pool_callback) # logger.info("thread pool exception: {}".format(thread_pool_exc.exception())) while True: logger.info("Master ThreadPool Executor is going to sleep 5s") sleep(5) if __name__ == '__main__': thread_pool_main()
代碼運(yùn)行結(jié)果:
INFO [thread: MainThread] Master ThreadPool Executor starts thread worker INFO [thread: WorkExecutor_0] Master starts thread worker INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: Thread-1] I am slave. I am working. I am going to sleep 3s INFO [thread: WorkExecutor_0] Master is going to sleep 5s INFO [thread: Thread-1] Exit thread executor because of some exception INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: WorkExecutor_0] Master starts thread worker INFO [thread: WorkExecutor_0] called thread pool executor callback function ERROR [thread: WorkExecutor_0] Worker return exception: threads can only be started once Traceback (most recent call last): File "E:\anaconda\lib\concurrent\futures\thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "xxxx.py", line 46, in main thread_obj.start() # 這一行會(huì)報(bào)錯(cuò),同一線程不能重復(fù)啟動(dòng) File "E:\anaconda\lib\threading.py", line 843, in start raise RuntimeError("threads can only be started once") RuntimeError: threads can only be started once INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s INFO [thread: MainThread] Master ThreadPool Executor is going to sleep 5s ... ...
最終的寫(xiě)法
其實(shí),上面寫(xiě)法中,想重復(fù)利用一個(gè)線程去實(shí)現(xiàn)生產(chǎn)者線程的實(shí)現(xiàn)方法是有問(wèn)題的,在此處,一般情況下,線程執(zhí)行結(jié)束后,線程資源會(huì)被會(huì)被操作系統(tǒng),所以線程不能被重復(fù)調(diào)用 start() 。
一種可行的實(shí)現(xiàn)方式就是,用線程池替代。當(dāng)然,這樣做得注意上面提到的線程池執(zhí)行體的異常捕獲問(wèn)題。
def thread_executor(): while True: logger.info("I am slave. I am working. I am going to sleep 3s") sleep(3) logger.info("Exit thread executor because of some exception") break def executor_callback(worker): logger.info("called worker callback function") worker_exception = worker.exception() if worker_exception: logger.exception("Worker return exception: {}".format(worker_exception)) # raise worker_exception def main(): slave_thread_pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="SlaveExecutor") restart_flag = False while True: logger.info("Master starts thread worker") if not restart_flag: restart_flag = not restart_flag logger.info("Restart Slave work") slave_thread_pool.submit(thread_executor).add_done_callback(executor_callback) logger.info("Master is going to sleep 5s") sleep(5)
總結(jié)
這個(gè)問(wèn)題主要還是因?yàn)閷?duì) Python 的 concurrent.futuresthread.ThreadPoolExecutor 不夠了解導(dǎo)致的,接觸這個(gè)包是在書(shū)本上,但是書(shū)本沒(méi)完全介紹包的全部 API 及用法,所以代碼產(chǎn)生異常情況后,DEBUG 了許久在真正找到問(wèn)題所在。查閱 python docs 后才對(duì)其完整用法有所認(rèn)識(shí),所以,以后學(xué)習(xí)新的 python 包的時(shí)候還是可以查一查官方文檔的。
參考資料
英文版: docs of python concurrent.futures
中文版: python docs concurrent.futures — 啟動(dòng)并行任務(wù)
exception(timeout=None)
返回由調(diào)用引發(fā)的異常。如果調(diào)用還沒(méi)完成那么這個(gè)方法將等待 timeout 秒。如果在 timeout 秒內(nèi)沒(méi)有執(zhí)行完成,concurrent.futures.TimeoutError 將會(huì)被觸發(fā)。timeout 可以是整數(shù)或浮點(diǎn)數(shù)。如果 timeout 沒(méi)有指定或?yàn)?None,那么等待時(shí)間就沒(méi)有限制。
如果 futrue 在完成前被取消則 CancelledError 將被觸發(fā)。
如果調(diào)用正常完成那么返回 None。
add_done_callback(fn)
附加可調(diào)用 fn 到期程。當(dāng)期程被取消或完成運(yùn)行時(shí),將會(huì)調(diào)用 fn,而這個(gè)期程將作為它唯一的參數(shù)。
加入的可調(diào)用對(duì)象總被屬于添加它們的進(jìn)程中的線程按加入的順序調(diào)用。如果可調(diào)用對(duì)象引發(fā)一個(gè) Exception 子類,它會(huì)被記錄下來(lái)并被忽略掉。如果可調(diào)用對(duì)象引發(fā)一個(gè) BaseException 子類,這個(gè)行為沒(méi)有定義。
如果期程已經(jīng)完成或已取消,fn 會(huì)被立即調(diào)用。
以上這篇解決python ThreadPoolExecutor 線程池中的異常捕獲問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python的flask接收前臺(tái)的ajax的post數(shù)據(jù)和get數(shù)據(jù)的方法
這篇文章主要介紹了Python的flask接收前臺(tái)的ajax的post數(shù)據(jù)和get數(shù)據(jù)的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04Python自動(dòng)化操作Excel方法詳解(xlrd,xlwt)
Excel是Windows環(huán)境下流行的、強(qiáng)大的電子表格應(yīng)用。本文將詳解用Python利用xlrd和xlwt實(shí)現(xiàn)自動(dòng)化操作Excel的方法詳細(xì),需要的可以參考一下2022-06-06Python在for循環(huán)中更改list值的方法【推薦】
這篇文章主要介紹了Python在for循環(huán)中更改list值的方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-08-08python?實(shí)現(xiàn)兩個(gè)字符串乘法小練習(xí)
這篇文章主要給大家分享的是python?實(shí)現(xiàn)兩個(gè)字符串乘法小練習(xí),兩個(gè)字符串相乘,基本思路是num1依次乘以num2各個(gè)數(shù)位上的數(shù)字,下面分享的內(nèi)容,可作為大家平時(shí)學(xué)習(xí)的小練習(xí),需要的朋友可以參考下,希望對(duì)你的學(xué)習(xí)有所幫助2022-02-02python實(shí)現(xiàn)圖片加文字水印OPenCV和PIL庫(kù)
本文來(lái)為大家介紹一下,使用python中的庫(kù)實(shí)現(xiàn)給圖片添加文字水印,openCV可以給圖片添加水印,如果要添加漢字水印那就要使用PIL庫(kù)2021-09-09Pandas 合并多個(gè)Dataframe(merge,concat)的方法
今天小編就為大家分享一篇Pandas 合并多個(gè)Dataframe(merge,concat)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06