python中的線程池threadpool
python線程池threadpool
今天在學(xué)習(xí)python進程與線程時,無意間發(fā)現(xiàn)了線程池threadpool模塊
模塊使用非常簡單,前提是得需要熟悉線程池的工作原理。
我們知道系統(tǒng)處理任務(wù)時,需要為每個請求創(chuàng)建和銷毀對象。當有大量并發(fā)任務(wù)需要處理時,再使用傳統(tǒng)的多線程就會造成大量的資源創(chuàng)建銷毀導(dǎo)致服務(wù)器效率的下降。
這時候,線程池就派上用場了。線程池技術(shù)為線程創(chuàng)建、銷毀的開銷問題和系統(tǒng)資源不足問題提供了很好的解決方案。
優(yōu)點
(1)可以控制產(chǎn)生線程的數(shù)量。通過預(yù)先創(chuàng)建一定數(shù)量的工作線程并限制其數(shù)量,控制線程對象的內(nèi)存消耗。
(2)降低系統(tǒng)開銷和資源消耗。通過對多個請求重用線程,線程創(chuàng)建、銷毀的開銷被分攤到了多個請求上。另外通過限制線程數(shù)量,降低虛擬機在垃圾回收方面的開銷。
(3)提高系統(tǒng)響應(yīng)速度。線程事先已被創(chuàng)建,請求到達時可直接進行處理,消除了因線程創(chuàng)建所帶來的延遲,另外多個線程可并發(fā)處理。
線程池的基本實現(xiàn)方法
(1)線程池管理器。創(chuàng)建并維護線程池,根據(jù)需要調(diào)整池的大小,并監(jiān)控線程泄漏現(xiàn)象。
(2)工作線程。它是一個可以循環(huán)執(zhí)行任務(wù)的線程,沒有任務(wù)時處于 Wait 狀態(tài),新任務(wù)到達時可被喚醒。
(3)任務(wù)隊列。它提供一種緩沖機制,用以臨時存放待處理的任務(wù),同時作為并發(fā)線程的 monitor 對象。
(4)任務(wù)接口。它是每個任務(wù)必須實現(xiàn)的接口,工作線程通過該接口調(diào)度任務(wù)的執(zhí)行。
構(gòu)建線程池管理器時,首先初始化任務(wù)隊列(Queue),運行時通過調(diào)用添加任務(wù)的方法將任務(wù)添加到任務(wù)隊列中。
之后創(chuàng)建并啟動一定數(shù)量的工作線程,將這些線程保存在線程隊列中。線程池管理器在運行時可根據(jù)需要增加或減少工作線程數(shù)量。
工作線程運行時首先鎖定任務(wù)隊列,以保證多線程對任務(wù)隊列的正確并發(fā)訪問,如隊列中有待處理的任務(wù),工作線程取走一個任務(wù)并釋放對任務(wù)隊列的鎖定,以便其他線程實現(xiàn)對任務(wù)隊列的訪問和處理。
在獲取任務(wù)之后工作線程調(diào)用任務(wù)接口完成對任務(wù)的處理。當任務(wù)隊列為空時,工作線程加入到任務(wù)隊列的等待線程列表中,此時工作線程處于 Wait 狀態(tài),幾乎不占 CPU 資源。
一旦新的任務(wù)到達,通過調(diào)用任務(wù)列表對象的notify方法,從等待線程列表中喚醒一個工作線程以對任務(wù)進行處理。
通過這種協(xié)作模式,既節(jié)省了線程創(chuàng)建、銷毀的開銷,又保證了對任務(wù)的并發(fā)處理,提高了系統(tǒng)的響應(yīng)速度。
簡而言之:
就是把并發(fā)執(zhí)行的任務(wù)傳遞給一個線程池,來替代為每個并發(fā)執(zhí)行的任務(wù)都啟動一個新的線程。只要池里有空閑的線程,任務(wù)就會分配給一個線程執(zhí)行。
pool = ThreadPool(poolsize) requests = makeRequests(some_callable,list_of_args,callback) [pool.putRequest(req) for req in requests] pool.wait()
- 第一行的意思是創(chuàng)建一個可存放poolsize個數(shù)目的線程的線程池。
- 第二行的意思是調(diào)用makeRequests創(chuàng)建請求。 some_callable是需要開啟多線程處理的函數(shù),list_of_args是函數(shù)參數(shù),callback是可選參數(shù)回調(diào),默認是無。
- 第三行的意思是把運行多線程的函數(shù)放入線程池中。
- 最后一行的意思是等待所有的線程完成工作后退出。
通過分析源代碼,其實發(fā)現(xiàn)里面的內(nèi)容很簡單。
import sys import threading import Queue import traceback # exceptions class NoResultsPending(Exception): ? ? """All work requests have been processed.""" ? ? pass class NoWorkersAvailable(Exception): ? ? """No worker threads available to process remaining requests.""" ? ? pass # internal module helper functions def _handle_thread_exception(request, exc_info): ? ? """Default exception handler callback function. ? ? This just prints the exception info via ``traceback.print_exception``. ? ? """ ? ? traceback.print_exception(*exc_info) # utility functions def makeRequests(callable_, args_list, callback=None, ?#用來創(chuàng)建多個任務(wù)請求 callback是回調(diào)函數(shù)處理結(jié)果,exc_callback是用來處理發(fā)生的異常 ? ? ? ? exc_callback=_handle_thread_exception): ? ? """Create several work requests for same callable with different arguments. ? ? Convenience function for creating several work requests for the same ? ? callable where each invocation of the callable receives different values ? ? for its arguments. ? ? ``args_list`` contains the parameters for each invocation of callable. ? ? Each item in ``args_list`` should be either a 2-item tuple of the list of ? ? positional arguments and a dictionary of keyword arguments or a single, ? ? non-tuple argument. ? ? See docstring for ``WorkRequest`` for info on ``callback`` and ? ? ``exc_callback``. ? ? """ ? ? requests = [] ? ? for item in args_list: ? ? ? ? if isinstance(item, tuple): ? ? ? ? ? ? requests.append( ? ? ? ? ? ? ? ? WorkRequest(callable_, item[0], item[1], callback=callback, ? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback) ? ? ? ? ? ? ) ? ? ? ? else: ? ? ? ? ? ? requests.append( ? ? ? ? ? ? ? ? WorkRequest(callable_, [item], None, callback=callback, ? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback) ? ? ? ? ? ? ) ? ? return requests # classes class WorkerThread(threading.Thread): ? ? #工作線程 ? ? """Background thread connected to the requests/results queues. ? ? A worker thread sits in the background and picks up work requests from ? ? one queue and puts the results in another until it is dismissed. ? ? """ ? ? def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): ? ? ? ? """Set up thread in daemonic mode and start it immediatedly. ? ? ? ? ``requests_queue`` and ``results_queue`` are instances of ? ? ? ? ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new ? ? ? ? worker thread. ? ? ? ? """ ? ? ? ? threading.Thread.__init__(self, **kwds) ? ? ? ? self.setDaemon(1) ? ? ? ? self._requests_queue = requests_queue ? ? #任務(wù)隊列 ? ? ? ? self._results_queue = results_queue ? ? ? #結(jié)果隊列 ? ? ? ? self._poll_timeout = poll_timeout ? ? ? ? self._dismissed = threading.Event() ? ? ? ? self.start() ? ? def run(self): ? ? ? ? """Repeatedly process the job queue until told to exit.""" ? ? ? ? while True: ? ? ? ? ? ? if self._dismissed.isSet(): ?#如果標識位設(shè)為True,則表示線程非阻塞 ? ? ? ? ? ? ? ? # we are dismissed, break out of loop ? ? ? ? ? ? ? ? break ? ? ? ? ? ? # get next work request. If we don't get a new request from the ? ? ? ? ? ? # queue after self._poll_timout seconds, we jump to the start of ? ? ? ? ? ? # the while loop again, to give the thread a chance to exit. ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? request = self._requests_queue.get(True, self._poll_timeout)#獲取待處理任務(wù),block設(shè)為True,標識線程同步 ,并設(shè)置超時時間 ? ? ? ? ? ? except Queue.Empty: ? ? ? ? ? ? ? ? continue ? ? ? ? ? ? else: ? ? ? ? ? ? ? ? if self._dismissed.isSet():再次判斷,因為在取任務(wù)期間,線程有可能被掛起 ? ? ? ? ? ? ? ? ? ? # we are dismissed, put back request in queue and exit loop ? ? ? ? ? ? ? ? ? ? self._requests_queue.put(request) #添加任務(wù)到任務(wù)隊列 ? ? ? ? ? ? ? ? ? ? break ? ? ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? ? ? result = request.callable(*request.args, **request.kwds) ? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, result)) ? ? ? ? ? ? ? ? except: ? ? ? ? ? ? ? ? ? ? request.exception = True ? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, sys.exc_info())) ? ? def dismiss(self): ? ? ? ? """Sets a flag to tell the thread to exit when done with current job.""" ? ? ? ? self._dismissed.set() class WorkRequest: ? ? ?#創(chuàng)建單個任務(wù)請求 ? ? """A request to execute a callable for putting in the request queue later. ? ? See the module function ``makeRequests`` for the common case ? ? where you want to build several ``WorkRequest`` objects for the same ? ? callable but with different arguments for each call. ? ? """ ? ? def __init__(self, callable_, args=None, kwds=None, requestID=None, ? ? ? ? ? ? callback=None, exc_callback=_handle_thread_exception): ? ? ? ? """Create a work request for a callable and attach callbacks. ? ? ? ? A work request consists of the a callable to be executed by a ? ? ? ? worker thread, a list of positional arguments, a dictionary ? ? ? ? of keyword arguments. ? ? ? ? A ``callback`` function can be specified, that is called when the ? ? ? ? results of the request are picked up from the result queue. It must ? ? ? ? accept two anonymous arguments, the ``WorkRequest`` object and the ? ? ? ? results of the callable, in that order. If you want to pass additional ? ? ? ? information to the callback, just stick it on the request object. ? ? ? ? You can also give custom callback for when an exception occurs with ? ? ? ? the ``exc_callback`` keyword parameter. It should also accept two ? ? ? ? anonymous arguments, the ``WorkRequest`` and a tuple with the exception ? ? ? ? details as returned by ``sys.exc_info()``. The default implementation ? ? ? ? of this callback just prints the exception info via ? ? ? ? ``traceback.print_exception``. If you want no exception handler ? ? ? ? callback, just pass in ``None``. ? ? ? ? ``requestID``, if given, must be hashable since it is used by ? ? ? ? ``ThreadPool`` object to store the results of that work request in a ? ? ? ? dictionary. It defaults to the return value of ``id(self)``. ? ? ? ? """ ? ? ? ? if requestID is None: ? ? ? ? ? ? self.requestID = id(self) #id返回對象的內(nèi)存地址 ? ? ? ? else: ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? self.requestID = hash(requestID) #哈希處理 ? ? ? ? ? ? except TypeError: ? ? ? ? ? ? ? ? raise TypeError("requestID must be hashable.") ? ? ? ? self.exception = False ? ? ? ? self.callback = callback ? ? ? ? self.exc_callback = exc_callback ? ? ? ? self.callable = callable_ ? ? ? ? self.args = args or [] ? ? ? ? self.kwds = kwds or {} ? ? def __str__(self): ? ? ? ? return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ ? ? ? ? ? ? (self.requestID, self.args, self.kwds, self.exception) class ThreadPool: ?#線程池管理器 ? ? """A thread pool, distributing work requests and collecting results. ? ? See the module docstring for more information. ? ? """ ? ? def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): ? ? ? ? """Set up the thread pool and start num_workers worker threads. ? ? ? ? ``num_workers`` is the number of worker threads to start initially. ? ? ? ? If ``q_size > 0`` the size of the work *request queue* is limited and ? ? ? ? the thread pool blocks when the queue is full and it tries to put ? ? ? ? more work requests in it (see ``putRequest`` method), unless you also ? ? ? ? use a positive ``timeout`` value for ``putRequest``. ? ? ? ? If ``resq_size > 0`` the size of the *results queue* is limited and the ? ? ? ? worker threads will block when the queue is full and they try to put ? ? ? ? new results in it. ? ? ? ? .. warning: ? ? ? ? ? ? If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is ? ? ? ? ? ? the possibilty of a deadlock, when the results queue is not pulled ? ? ? ? ? ? regularly and too many jobs are put in the work requests queue. ? ? ? ? ? ? To prevent this, always set ``timeout > 0`` when calling ? ? ? ? ? ? ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. ? ? ? ? """ ? ? ? ? self._requests_queue = Queue.Queue(q_size) ?#任務(wù)隊列 ? ? ? ? self._results_queue = Queue.Queue(resq_size) #結(jié)果隊列 ? ? ? ? self.workers = [] ?#工作線程 ? ? ? ? self.dismissedWorkers = [] #睡眠線程 ? ? ? ? self.workRequests = {} ?#一個字典 鍵是id 值是request ? ? ? ? self.createWorkers(num_workers, poll_timeout) ? ? def createWorkers(self, num_workers, poll_timeout=5): ? ? ? ? """Add num_workers worker threads to the pool. ? ? ? ? ``poll_timout`` sets the interval in seconds (int or float) for how ? ? ? ? ofte threads should check whether they are dismissed, while waiting for ? ? ? ? requests. ? ? ? ? """ ? ? ? ? for i in range(num_workers): ? ? ? ? ? ? self.workers.append(WorkerThread(self._requests_queue, ? ? ? ? ? ? ? ? self._results_queue, poll_timeout=poll_timeout)) ? ? def dismissWorkers(self, num_workers, do_join=False): ? ? ? ? """Tell num_workers worker threads to quit after their current task.""" ? ? ? ? dismiss_list = [] ? ? ? ? for i in range(min(num_workers, len(self.workers))): ? ? ? ? ? ? worker = self.workers.pop() ? ? ? ? ? ? worker.dismiss() ? ? ? ? ? ? dismiss_list.append(worker) ? ? ? ? if do_join: ? ? ? ? ? ? for worker in dismiss_list: ? ? ? ? ? ? ? ? worker.join() ? ? ? ? else: ? ? ? ? ? ? self.dismissedWorkers.extend(dismiss_list) ? ? def joinAllDismissedWorkers(self): ? ? ? ? """Perform Thread.join() on all worker threads that have been dismissed. ? ? ? ? """ ? ? ? ? for worker in self.dismissedWorkers: ? ? ? ? ? ? worker.join() ? ? ? ? self.dismissedWorkers = [] ? ? def putRequest(self, request, block=True, timeout=None): ? ? ? ? """Put work request into work queue and save its id for later.""" ? ? ? ? assert isinstance(request, WorkRequest) ? ? ? ? # don't reuse old work requests ? ? ? ? assert not getattr(request, 'exception', None) ? ? ? ? self._requests_queue.put(request, block, timeout) ? ? ? ? self.workRequests[request.requestID] = request ?#確立一對一對應(yīng)關(guān)系 一個id對應(yīng)一個request ? ? def poll(self, block=False):#處理任務(wù), ? ? ? ? """Process any new results in the queue.""" ? ? ? ? while True: ? ? ? ? ? ? # still results pending? ? ? ? ? ? ? if not self.workRequests: #沒有任務(wù) ? ? ? ? ? ? ? ? raise NoResultsPending ? ? ? ? ? ? # are there still workers to process remaining requests? ? ? ? ? ? ? elif block and not self.workers:#無工作線程 ? ? ? ? ? ? ? ? raise NoWorkersAvailable ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? # get back next results ? ? ? ? ? ? ? ? request, result = self._results_queue.get(block=block) ? ? ? ? ? ? ? ? # has an exception occured? ? ? ? ? ? ? ? ? if request.exception and request.exc_callback: ? ? ? ? ? ? ? ? ? ? request.exc_callback(request, result) ? ? ? ? ? ? ? ? # hand results to callback, if any ? ? ? ? ? ? ? ? if request.callback and not \ ? ? ? ? ? ? ? ? ? ? ? ?(request.exception and request.exc_callback): ? ? ? ? ? ? ? ? ? ? request.callback(request, result) ? ? ? ? ? ? ? ? del self.workRequests[request.requestID] ? ? ? ? ? ? except Queue.Empty: ? ? ? ? ? ? ? ? break ? ? def wait(self): ? ? ? ? """Wait for results, blocking until all have arrived.""" ? ? ? ? while 1: ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? self.poll(True) ? ? ? ? ? ? except NoResultsPending: ? ? ? ? ? ? ? ? break
有三個類 ThreadPool,workRequest,workThread
第一步我們需要建立一個線程池調(diào)度ThreadPool實例(根據(jù)參數(shù)而產(chǎn)生多個線程works),然后再通過makeRequests創(chuàng)建具有多個不同參數(shù)的任務(wù)請求workRequest,然后把任務(wù)請求用putRequest放入線程池中的任務(wù)隊列中,此時線程workThread就會得到任務(wù)callable,然后進行處理后得到結(jié)果,存入結(jié)果隊列。如果存在callback就對結(jié)果調(diào)用函數(shù)。
注意:結(jié)果隊列中的元素是元組(request,result)這樣就一一對應(yīng)了。
python線程池使用樣例
import os import threading import time from threading import Thread from concurrent.futures import ThreadPoolExecutor threadPool = ThreadPoolExecutor(max_workers=5,thread_name_prefix="test_") def test(v1,v2): print(threading.current_thread().name,v1,v2) time.sleep(2) if __name__=='__main__': for i in range(0,10): threadPool.submit(test,i,i+1) threadPool.shutdown(wait=True)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python的Django框架中TEMPLATES項的設(shè)置教程
這篇文章主要介紹了Python的Django框架中TEMPLATES項的設(shè)置教程,主要針對Django1.8后的新特性,需要的朋友可以參考下2015-05-05Python多線程中阻塞(join)與鎖(Lock)使用誤區(qū)解析
這篇文章主要為大家詳細介紹了Python多線程中阻塞join與鎖Lock的使用誤區(qū),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-04-04解決python2.7 查詢mysql時出現(xiàn)中文亂碼
這篇文章主要介紹了python操作mysql中文顯示亂碼的解決方法,是Python數(shù)據(jù)庫程序設(shè)計中經(jīng)常會遇到的問題,非常具有實用價值,需要的朋友可以參考下2016-10-10pytorch實現(xiàn)mnist數(shù)據(jù)集的圖像可視化及保存
今天小編就為大家分享一篇pytorch實現(xiàn)mnist數(shù)據(jù)集的圖像可視化及保存,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01matlab和Excel的數(shù)據(jù)交互操作(非xlsread和xlswrite)
在使用MATLAB時,可能會遇到很多表格數(shù)據(jù)的處理,有時MATLAB也需要利用現(xiàn)存的表格數(shù)據(jù)實現(xiàn)操作目的,下面這篇文章主要給大家介紹了關(guān)于matlab和Excel的交互操作的相關(guān)資料,非xlsread和xlswrite,需要的朋友可以參考下2021-08-08