Python自定義線程池實(shí)現(xiàn)方法分析
本文實(shí)例講述了Python自定義線程池實(shí)現(xiàn)方法。分享給大家供大家參考,具體如下:
關(guān)于python的多線程,由與GIL的存在被廣大群主所詬病,說python的多線程不是真正的多線程。但多線程處理IO密集的任務(wù)效率還是可以杠杠的。
我實(shí)現(xiàn)的這個(gè)線程池其實(shí)是根據(jù)銀角的思路來實(shí)現(xiàn)的。
主要思路:
任務(wù)獲取和執(zhí)行:
1、任務(wù)加入隊(duì)列,等待線程來獲取并執(zhí)行。
2、按需生成線程,每個(gè)線程循環(huán)取任務(wù)。
線程銷毀:
1、獲取任務(wù)是終止符時(shí),線程停止。
2、線程池close()時(shí),向任務(wù)隊(duì)列加入和已生成線程等量的終止符。
3、線程池terminate()時(shí),設(shè)置線程下次任務(wù)取到為終止符。
流程概要設(shè)計(jì):

詳細(xì)代碼:
import threading
import contextlib
from Queue import Queue
import time
class ThreadPool(object):
def __init__(self, max_num):
self.StopEvent = 0#線程任務(wù)終止符,當(dāng)線程從隊(duì)列獲取到StopEvent時(shí),代表此線程可以銷毀。可設(shè)置為任意與任務(wù)有區(qū)別的值。
self.q = Queue()
self.max_num = max_num #最大線程數(shù)
self.terminal = False #是否設(shè)置線程池強(qiáng)制終止
self.created_list = [] #已創(chuàng)建線程的線程列表
self.free_list = [] #空閑線程的線程列表
self.Deamon=False #線程是否是后臺(tái)線程
def run(self, func, args, callback=None):
"""
線程池執(zhí)行一個(gè)任務(wù)
:param func: 任務(wù)函數(shù)
:param args: 任務(wù)函數(shù)所需參數(shù)
:param callback:
:return: 如果線程池已經(jīng)終止,則返回True否則None
"""
if len(self.free_list) == 0 and len(self.created_list) < self.max_num:
self.create_thread()
task = (func, args, callback,)
self.q.put(task)
def create_thread(self):
"""
創(chuàng)建一個(gè)線程
"""
t = threading.Thread(target=self.call)
t.setDaemon(self.Deamon)
t.start()
self.created_list.append(t)#將當(dāng)前線程加入已創(chuàng)建線程列表created_list
def call(self):
"""
循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)
"""
current_thread = threading.current_thread() #獲取當(dāng)前線程對(duì)象·
event = self.q.get() #從任務(wù)隊(duì)列獲取任務(wù)
while event != self.StopEvent: #判斷獲取到的任務(wù)是否是終止符
func, arguments, callback = event#從任務(wù)中獲取函數(shù)名、參數(shù)、和回調(diào)函數(shù)名
try:
result = func(*arguments)
func_excute_status =True#func執(zhí)行成功狀態(tài)
except Exception as e:
func_excute_status = False
result =None
print '函數(shù)執(zhí)行產(chǎn)生錯(cuò)誤', e#打印錯(cuò)誤信息
if func_excute_status:#func執(zhí)行成功后才能執(zhí)行回調(diào)函數(shù)
if callback is not None:#判斷回調(diào)函數(shù)是否是空的
try:
callback(result)
except Exception as e:
print '回調(diào)函數(shù)執(zhí)行產(chǎn)生錯(cuò)誤', e # 打印錯(cuò)誤信息
with self.worker_state(self.free_list,current_thread):
#執(zhí)行完一次任務(wù)后,將線程加入空閑列表。然后繼續(xù)去取任務(wù),如果取到任務(wù)就將線程從空閑列表移除
if self.terminal:#判斷線程池終止命令,如果需要終止,則使下次取到的任務(wù)為StopEvent。
event = self.StopEvent
else: #否則繼續(xù)獲取任務(wù)
event = self.q.get() # 當(dāng)線程等待任務(wù)時(shí),q.get()方法阻塞住線程,使其持續(xù)等待
else:#若線程取到的任務(wù)是終止符,就銷毀線程
#將當(dāng)前線程從已創(chuàng)建線程列表created_list移除
self.created_list.remove(current_thread)
def close(self):
"""
執(zhí)行完所有的任務(wù)后,所有線程停止
"""
full_size = len(self.created_list)#按已創(chuàng)建的線程數(shù)量往線程隊(duì)列加入終止符。
while full_size:
self.q.put(self.StopEvent)
full_size -= 1
def terminate(self):
"""
無論是否還有任務(wù),終止線程
"""
self.terminal = True
while self.created_list:
self.q.put(self.StopEvent)
self.q.queue.clear()#清空任務(wù)隊(duì)列
def join(self):
"""
阻塞線程池上下文,使所有線程執(zhí)行完后才能繼續(xù)
"""
for t in self.created_list:
t.join()
@contextlib.contextmanager#上下文處理器,使其可以使用with語句修飾
def worker_state(self, state_list, worker_thread):
"""
用于記錄線程中正在等待的線程數(shù)
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
if __name__ == '__main__':
def Foo(arg):
return arg
# time.sleep(0.1)
def Bar(res):
print res
pool=ThreadPool(5)
# pool.Deamon=True#需在pool.run之前設(shè)置
for i in range(1000):
pool.run(func=Foo,args=(i,),callback=Bar)
pool.close()
pool.join()
# pool.terminate()
print "任務(wù)隊(duì)列里任務(wù)數(shù)%s" %pool.q.qsize()
print "當(dāng)前存活子線程數(shù)量:%d" % threading.activeCount()
print "當(dāng)前線程創(chuàng)建列表:%s" %pool.created_list
print "當(dāng)前線程創(chuàng)建列表:%s" %pool.free_list
關(guān)于上下文處理:
來個(gè)簡單例子說明:
下面的代碼手動(dòng)自定義了一個(gè)myopen方法,模擬我們常見的with open() as f:語句。具體的contextlib模塊使用,會(huì)單獨(dú)開章來將。
# coding:utf-8
import contextlib
@contextlib.contextmanager#定義該函數(shù)支持上下文with語句
def myopen(filename,mode):
f=open(filename,mode)
try:
yield f.readlines()#正常執(zhí)行返回f.readlines()
except Exception as e:
print e
finally:
f.close()#最后在with代碼快執(zhí)行完畢后返回執(zhí)行finally下的f.close()實(shí)現(xiàn)關(guān)閉文件
if __name__ == '__main__':
with myopen(r'c:\ip1.txt','r') as f:
for line in f:
print line
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進(jìn)階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。
相關(guān)文章
一行代碼python實(shí)現(xiàn)文件共享服務(wù)器
這篇文章主要介紹了一行代碼python實(shí)現(xiàn)文件共享服務(wù)器,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
Tornado協(xié)程在python2.7如何返回值(實(shí)現(xiàn)方法)
下面小編就為大家?guī)硪黄猅ornado協(xié)程在python2.7如何返回值(實(shí)現(xiàn)方法)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-06-06
Python實(shí)現(xiàn)自動(dòng)識(shí)別并填加驗(yàn)證碼的示例代碼
實(shí)現(xiàn)自動(dòng)識(shí)別網(wǎng)頁中的驗(yàn)證碼并填寫,需要結(jié)合使用網(wǎng)絡(luò)爬蟲技術(shù)、圖像識(shí)別(OCR),以及可能的瀏覽器自動(dòng)化工具(如Selenium),本文給大家介紹了Python實(shí)現(xiàn)自動(dòng)識(shí)別并填加驗(yàn)證碼的示例,需要的朋友可以參考下2024-06-06
運(yùn)行獨(dú)立 pyspark 時(shí)出現(xiàn) Windows 錯(cuò)誤解決辦法
在本篇文章里小編給大家分享的是一篇關(guān)于運(yùn)行獨(dú)立 pyspark 時(shí)出現(xiàn) Windows 錯(cuò)誤解決辦法,對(duì)此有需求的方法可以參考下。2021-12-12
python 下劃線的多種應(yīng)用場景總結(jié)
Python有很多地方使用下劃線,在不同場合下,有不同含義。本文總結(jié)Python語言編程中常用下劃線的地方,力圖一次搞懂下劃線的常見用法,感興趣的朋友快來一起看看吧2021-05-05
python刪除文件夾中具有相同后綴類型文件的實(shí)戰(zhàn)演練
在平時(shí)卸載軟件的時(shí)候會(huì)殘留許多文件和空文件夾,下面這篇文章主要給大家介紹了關(guān)于python刪除文件夾中具有相同后綴類型文件的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-03-03

