python中的線程池threadpool
python線程池threadpool
今天在學(xué)習(xí)python進(jìn)程與線程時(shí),無意間發(fā)現(xiàn)了線程池threadpool模塊
模塊使用非常簡(jiǎn)單,前提是得需要熟悉線程池的工作原理。
我們知道系統(tǒng)處理任務(wù)時(shí),需要為每個(gè)請(qǐng)求創(chuàng)建和銷毀對(duì)象。當(dāng)有大量并發(fā)任務(wù)需要處理時(shí),再使用傳統(tǒng)的多線程就會(huì)造成大量的資源創(chuàng)建銷毀導(dǎo)致服務(wù)器效率的下降。
這時(shí)候,線程池就派上用場(chǎng)了。線程池技術(shù)為線程創(chuàng)建、銷毀的開銷問題和系統(tǒng)資源不足問題提供了很好的解決方案。
優(yōu)點(diǎn)
(1)可以控制產(chǎn)生線程的數(shù)量。通過預(yù)先創(chuàng)建一定數(shù)量的工作線程并限制其數(shù)量,控制線程對(duì)象的內(nèi)存消耗。
(2)降低系統(tǒng)開銷和資源消耗。通過對(duì)多個(gè)請(qǐng)求重用線程,線程創(chuàng)建、銷毀的開銷被分?jǐn)偟搅硕鄠€(gè)請(qǐng)求上。另外通過限制線程數(shù)量,降低虛擬機(jī)在垃圾回收方面的開銷。
(3)提高系統(tǒng)響應(yīng)速度。線程事先已被創(chuàng)建,請(qǐng)求到達(dá)時(shí)可直接進(jìn)行處理,消除了因線程創(chuàng)建所帶來的延遲,另外多個(gè)線程可并發(fā)處理。
線程池的基本實(shí)現(xiàn)方法
(1)線程池管理器。創(chuàng)建并維護(hù)線程池,根據(jù)需要調(diào)整池的大小,并監(jiān)控線程泄漏現(xiàn)象。
(2)工作線程。它是一個(gè)可以循環(huán)執(zhí)行任務(wù)的線程,沒有任務(wù)時(shí)處于 Wait 狀態(tài),新任務(wù)到達(dá)時(shí)可被喚醒。
(3)任務(wù)隊(duì)列。它提供一種緩沖機(jī)制,用以臨時(shí)存放待處理的任務(wù),同時(shí)作為并發(fā)線程的 monitor 對(duì)象。
(4)任務(wù)接口。它是每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,工作線程通過該接口調(diào)度任務(wù)的執(zhí)行。
構(gòu)建線程池管理器時(shí),首先初始化任務(wù)隊(duì)列(Queue),運(yùn)行時(shí)通過調(diào)用添加任務(wù)的方法將任務(wù)添加到任務(wù)隊(duì)列中。
之后創(chuàng)建并啟動(dòng)一定數(shù)量的工作線程,將這些線程保存在線程隊(duì)列中。線程池管理器在運(yùn)行時(shí)可根據(jù)需要增加或減少工作線程數(shù)量。
工作線程運(yùn)行時(shí)首先鎖定任務(wù)隊(duì)列,以保證多線程對(duì)任務(wù)隊(duì)列的正確并發(fā)訪問,如隊(duì)列中有待處理的任務(wù),工作線程取走一個(gè)任務(wù)并釋放對(duì)任務(wù)隊(duì)列的鎖定,以便其他線程實(shí)現(xiàn)對(duì)任務(wù)隊(duì)列的訪問和處理。
在獲取任務(wù)之后工作線程調(diào)用任務(wù)接口完成對(duì)任務(wù)的處理。當(dāng)任務(wù)隊(duì)列為空時(shí),工作線程加入到任務(wù)隊(duì)列的等待線程列表中,此時(shí)工作線程處于 Wait 狀態(tài),幾乎不占 CPU 資源。
一旦新的任務(wù)到達(dá),通過調(diào)用任務(wù)列表對(duì)象的notify方法,從等待線程列表中喚醒一個(gè)工作線程以對(duì)任務(wù)進(jìn)行處理。
通過這種協(xié)作模式,既節(jié)省了線程創(chuàng)建、銷毀的開銷,又保證了對(duì)任務(wù)的并發(fā)處理,提高了系統(tǒng)的響應(yīng)速度。
簡(jiǎn)而言之:
就是把并發(fā)執(zhí)行的任務(wù)傳遞給一個(gè)線程池,來替代為每個(gè)并發(fā)執(zhí)行的任務(wù)都啟動(dòng)一個(gè)新的線程。只要池里有空閑的線程,任務(wù)就會(huì)分配給一個(gè)線程執(zhí)行。
pool = ThreadPool(poolsize) requests = makeRequests(some_callable,list_of_args,callback) [pool.putRequest(req) for req in requests] pool.wait()
- 第一行的意思是創(chuàng)建一個(gè)可存放poolsize個(gè)數(shù)目的線程的線程池。
- 第二行的意思是調(diào)用makeRequests創(chuàng)建請(qǐng)求。 some_callable是需要開啟多線程處理的函數(shù),list_of_args是函數(shù)參數(shù),callback是可選參數(shù)回調(diào),默認(rèn)是無。
- 第三行的意思是把運(yùn)行多線程的函數(shù)放入線程池中。
- 最后一行的意思是等待所有的線程完成工作后退出。
通過分析源代碼,其實(shí)發(fā)現(xiàn)里面的內(nèi)容很簡(jiǎn)單。
import sys
import threading
import Queue
import traceback
# exceptions
class NoResultsPending(Exception):
? ? """All work requests have been processed."""
? ? pass
class NoWorkersAvailable(Exception):
? ? """No worker threads available to process remaining requests."""
? ? pass
# internal module helper functions
def _handle_thread_exception(request, exc_info):
? ? """Default exception handler callback function.
? ? This just prints the exception info via ``traceback.print_exception``.
? ? """
? ? traceback.print_exception(*exc_info)
# utility functions
def makeRequests(callable_, args_list, callback=None, ?#用來創(chuàng)建多個(gè)任務(wù)請(qǐng)求 callback是回調(diào)函數(shù)處理結(jié)果,exc_callback是用來處理發(fā)生的異常
? ? ? ? exc_callback=_handle_thread_exception):
? ? """Create several work requests for same callable with different arguments.
? ? Convenience function for creating several work requests for the same
? ? callable where each invocation of the callable receives different values
? ? for its arguments.
? ? ``args_list`` contains the parameters for each invocation of callable.
? ? Each item in ``args_list`` should be either a 2-item tuple of the list of
? ? positional arguments and a dictionary of keyword arguments or a single,
? ? non-tuple argument.
? ? See docstring for ``WorkRequest`` for info on ``callback`` and
? ? ``exc_callback``.
? ? """
? ? requests = []
? ? for item in args_list:
? ? ? ? if isinstance(item, tuple):
? ? ? ? ? ? requests.append(
? ? ? ? ? ? ? ? WorkRequest(callable_, item[0], item[1], callback=callback,
? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback)
? ? ? ? ? ? )
? ? ? ? else:
? ? ? ? ? ? requests.append(
? ? ? ? ? ? ? ? WorkRequest(callable_, [item], None, callback=callback,
? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback)
? ? ? ? ? ? )
? ? return requests
# classes
class WorkerThread(threading.Thread): ? ? #工作線程
? ? """Background thread connected to the requests/results queues.
? ? A worker thread sits in the background and picks up work requests from
? ? one queue and puts the results in another until it is dismissed.
? ? """
? ? def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
? ? ? ? """Set up thread in daemonic mode and start it immediatedly.
? ? ? ? ``requests_queue`` and ``results_queue`` are instances of
? ? ? ? ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
? ? ? ? worker thread.
? ? ? ? """
? ? ? ? threading.Thread.__init__(self, **kwds)
? ? ? ? self.setDaemon(1)
? ? ? ? self._requests_queue = requests_queue ? ? #任務(wù)隊(duì)列
? ? ? ? self._results_queue = results_queue ? ? ? #結(jié)果隊(duì)列
? ? ? ? self._poll_timeout = poll_timeout
? ? ? ? self._dismissed = threading.Event()
? ? ? ? self.start()
? ? def run(self):
? ? ? ? """Repeatedly process the job queue until told to exit."""
? ? ? ? while True:
? ? ? ? ? ? if self._dismissed.isSet(): ?#如果標(biāo)識(shí)位設(shè)為True,則表示線程非阻塞
? ? ? ? ? ? ? ? # we are dismissed, break out of loop
? ? ? ? ? ? ? ? break
? ? ? ? ? ? # get next work request. If we don't get a new request from the
? ? ? ? ? ? # queue after self._poll_timout seconds, we jump to the start of
? ? ? ? ? ? # the while loop again, to give the thread a chance to exit.
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? request = self._requests_queue.get(True, self._poll_timeout)#獲取待處理任務(wù),block設(shè)為True,標(biāo)識(shí)線程同步 ,并設(shè)置超時(shí)時(shí)間
? ? ? ? ? ? except Queue.Empty:
? ? ? ? ? ? ? ? continue
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? if self._dismissed.isSet():再次判斷,因?yàn)樵谌∪蝿?wù)期間,線程有可能被掛起
? ? ? ? ? ? ? ? ? ? # we are dismissed, put back request in queue and exit loop
? ? ? ? ? ? ? ? ? ? self._requests_queue.put(request) #添加任務(wù)到任務(wù)隊(duì)列
? ? ? ? ? ? ? ? ? ? break
? ? ? ? ? ? ? ? try:
? ? ? ? ? ? ? ? ? ? result = request.callable(*request.args, **request.kwds)
? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, result))
? ? ? ? ? ? ? ? except:
? ? ? ? ? ? ? ? ? ? request.exception = True
? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, sys.exc_info()))
? ? def dismiss(self):
? ? ? ? """Sets a flag to tell the thread to exit when done with current job."""
? ? ? ? self._dismissed.set()
class WorkRequest: ? ? ?#創(chuàng)建單個(gè)任務(wù)請(qǐng)求
? ? """A request to execute a callable for putting in the request queue later.
? ? See the module function ``makeRequests`` for the common case
? ? where you want to build several ``WorkRequest`` objects for the same
? ? callable but with different arguments for each call.
? ? """
? ? def __init__(self, callable_, args=None, kwds=None, requestID=None,
? ? ? ? ? ? callback=None, exc_callback=_handle_thread_exception):
? ? ? ? """Create a work request for a callable and attach callbacks.
? ? ? ? A work request consists of the a callable to be executed by a
? ? ? ? worker thread, a list of positional arguments, a dictionary
? ? ? ? of keyword arguments.
? ? ? ? A ``callback`` function can be specified, that is called when the
? ? ? ? results of the request are picked up from the result queue. It must
? ? ? ? accept two anonymous arguments, the ``WorkRequest`` object and the
? ? ? ? results of the callable, in that order. If you want to pass additional
? ? ? ? information to the callback, just stick it on the request object.
? ? ? ? You can also give custom callback for when an exception occurs with
? ? ? ? the ``exc_callback`` keyword parameter. It should also accept two
? ? ? ? anonymous arguments, the ``WorkRequest`` and a tuple with the exception
? ? ? ? details as returned by ``sys.exc_info()``. The default implementation
? ? ? ? of this callback just prints the exception info via
? ? ? ? ``traceback.print_exception``. If you want no exception handler
? ? ? ? callback, just pass in ``None``.
? ? ? ? ``requestID``, if given, must be hashable since it is used by
? ? ? ? ``ThreadPool`` object to store the results of that work request in a
? ? ? ? dictionary. It defaults to the return value of ``id(self)``.
? ? ? ? """
? ? ? ? if requestID is None:
? ? ? ? ? ? self.requestID = id(self) #id返回對(duì)象的內(nèi)存地址
? ? ? ? else:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? self.requestID = hash(requestID) #哈希處理
? ? ? ? ? ? except TypeError:
? ? ? ? ? ? ? ? raise TypeError("requestID must be hashable.")
? ? ? ? self.exception = False
? ? ? ? self.callback = callback
? ? ? ? self.exc_callback = exc_callback
? ? ? ? self.callable = callable_
? ? ? ? self.args = args or []
? ? ? ? self.kwds = kwds or {}
? ? def __str__(self):
? ? ? ? return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
? ? ? ? ? ? (self.requestID, self.args, self.kwds, self.exception)
class ThreadPool: ?#線程池管理器
? ? """A thread pool, distributing work requests and collecting results.
? ? See the module docstring for more information.
? ? """
? ? def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
? ? ? ? """Set up the thread pool and start num_workers worker threads.
? ? ? ? ``num_workers`` is the number of worker threads to start initially.
? ? ? ? If ``q_size > 0`` the size of the work *request queue* is limited and
? ? ? ? the thread pool blocks when the queue is full and it tries to put
? ? ? ? more work requests in it (see ``putRequest`` method), unless you also
? ? ? ? use a positive ``timeout`` value for ``putRequest``.
? ? ? ? If ``resq_size > 0`` the size of the *results queue* is limited and the
? ? ? ? worker threads will block when the queue is full and they try to put
? ? ? ? new results in it.
? ? ? ? .. warning:
? ? ? ? ? ? If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
? ? ? ? ? ? the possibilty of a deadlock, when the results queue is not pulled
? ? ? ? ? ? regularly and too many jobs are put in the work requests queue.
? ? ? ? ? ? To prevent this, always set ``timeout > 0`` when calling
? ? ? ? ? ? ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
? ? ? ? """
? ? ? ? self._requests_queue = Queue.Queue(q_size) ?#任務(wù)隊(duì)列
? ? ? ? self._results_queue = Queue.Queue(resq_size) #結(jié)果隊(duì)列
? ? ? ? self.workers = [] ?#工作線程
? ? ? ? self.dismissedWorkers = [] #睡眠線程
? ? ? ? self.workRequests = {} ?#一個(gè)字典 鍵是id 值是request
? ? ? ? self.createWorkers(num_workers, poll_timeout)
? ? def createWorkers(self, num_workers, poll_timeout=5):
? ? ? ? """Add num_workers worker threads to the pool.
? ? ? ? ``poll_timout`` sets the interval in seconds (int or float) for how
? ? ? ? ofte threads should check whether they are dismissed, while waiting for
? ? ? ? requests.
? ? ? ? """
? ? ? ? for i in range(num_workers):
? ? ? ? ? ? self.workers.append(WorkerThread(self._requests_queue,
? ? ? ? ? ? ? ? self._results_queue, poll_timeout=poll_timeout))
? ? def dismissWorkers(self, num_workers, do_join=False):
? ? ? ? """Tell num_workers worker threads to quit after their current task."""
? ? ? ? dismiss_list = []
? ? ? ? for i in range(min(num_workers, len(self.workers))):
? ? ? ? ? ? worker = self.workers.pop()
? ? ? ? ? ? worker.dismiss()
? ? ? ? ? ? dismiss_list.append(worker)
? ? ? ? if do_join:
? ? ? ? ? ? for worker in dismiss_list:
? ? ? ? ? ? ? ? worker.join()
? ? ? ? else:
? ? ? ? ? ? self.dismissedWorkers.extend(dismiss_list)
? ? def joinAllDismissedWorkers(self):
? ? ? ? """Perform Thread.join() on all worker threads that have been dismissed.
? ? ? ? """
? ? ? ? for worker in self.dismissedWorkers:
? ? ? ? ? ? worker.join()
? ? ? ? self.dismissedWorkers = []
? ? def putRequest(self, request, block=True, timeout=None):
? ? ? ? """Put work request into work queue and save its id for later."""
? ? ? ? assert isinstance(request, WorkRequest)
? ? ? ? # don't reuse old work requests
? ? ? ? assert not getattr(request, 'exception', None)
? ? ? ? self._requests_queue.put(request, block, timeout)
? ? ? ? self.workRequests[request.requestID] = request ?#確立一對(duì)一對(duì)應(yīng)關(guān)系 一個(gè)id對(duì)應(yīng)一個(gè)request
? ? def poll(self, block=False):#處理任務(wù),
? ? ? ? """Process any new results in the queue."""
? ? ? ? while True:
? ? ? ? ? ? # still results pending?
? ? ? ? ? ? if not self.workRequests: #沒有任務(wù)
? ? ? ? ? ? ? ? raise NoResultsPending
? ? ? ? ? ? # are there still workers to process remaining requests?
? ? ? ? ? ? elif block and not self.workers:#無工作線程
? ? ? ? ? ? ? ? raise NoWorkersAvailable
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? # get back next results
? ? ? ? ? ? ? ? request, result = self._results_queue.get(block=block)
? ? ? ? ? ? ? ? # has an exception occured?
? ? ? ? ? ? ? ? if request.exception and request.exc_callback:
? ? ? ? ? ? ? ? ? ? request.exc_callback(request, result)
? ? ? ? ? ? ? ? # hand results to callback, if any
? ? ? ? ? ? ? ? if request.callback and not \
? ? ? ? ? ? ? ? ? ? ? ?(request.exception and request.exc_callback):
? ? ? ? ? ? ? ? ? ? request.callback(request, result)
? ? ? ? ? ? ? ? del self.workRequests[request.requestID]
? ? ? ? ? ? except Queue.Empty:
? ? ? ? ? ? ? ? break
? ? def wait(self):
? ? ? ? """Wait for results, blocking until all have arrived."""
? ? ? ? while 1:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? self.poll(True)
? ? ? ? ? ? except NoResultsPending:
? ? ? ? ? ? ? ? break有三個(gè)類 ThreadPool,workRequest,workThread
第一步我們需要建立一個(gè)線程池調(diào)度ThreadPool實(shí)例(根據(jù)參數(shù)而產(chǎn)生多個(gè)線程works),然后再通過makeRequests創(chuàng)建具有多個(gè)不同參數(shù)的任務(wù)請(qǐng)求workRequest,然后把任務(wù)請(qǐng)求用putRequest放入線程池中的任務(wù)隊(duì)列中,此時(shí)線程workThread就會(huì)得到任務(wù)callable,然后進(jìn)行處理后得到結(jié)果,存入結(jié)果隊(duì)列。如果存在callback就對(duì)結(jié)果調(diào)用函數(shù)。
注意:結(jié)果隊(duì)列中的元素是元組(request,result)這樣就一一對(duì)應(yīng)了。
python線程池使用樣例
import os
import threading
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
threadPool = ThreadPoolExecutor(max_workers=5,thread_name_prefix="test_")
def test(v1,v2):
print(threading.current_thread().name,v1,v2)
time.sleep(2)
if __name__=='__main__':
for i in range(0,10):
threadPool.submit(test,i,i+1)
threadPool.shutdown(wait=True)以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python實(shí)現(xiàn)簡(jiǎn)單的圖書管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Python實(shí)現(xiàn)簡(jiǎn)單的圖書管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
Python的Django框架中TEMPLATES項(xiàng)的設(shè)置教程
這篇文章主要介紹了Python的Django框架中TEMPLATES項(xiàng)的設(shè)置教程,主要針對(duì)Django1.8后的新特性,需要的朋友可以參考下2015-05-05
Python多線程中阻塞(join)與鎖(Lock)使用誤區(qū)解析
這篇文章主要為大家詳細(xì)介紹了Python多線程中阻塞join與鎖Lock的使用誤區(qū),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-04-04
解決python2.7 查詢mysql時(shí)出現(xiàn)中文亂碼
這篇文章主要介紹了python操作mysql中文顯示亂碼的解決方法,是Python數(shù)據(jù)庫程序設(shè)計(jì)中經(jīng)常會(huì)遇到的問題,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2016-10-10
pytorch實(shí)現(xiàn)mnist數(shù)據(jù)集的圖像可視化及保存
今天小編就為大家分享一篇pytorch實(shí)現(xiàn)mnist數(shù)據(jù)集的圖像可視化及保存,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-01-01
matlab和Excel的數(shù)據(jù)交互操作(非xlsread和xlswrite)
在使用MATLAB時(shí),可能會(huì)遇到很多表格數(shù)據(jù)的處理,有時(shí)MATLAB也需要利用現(xiàn)存的表格數(shù)據(jù)實(shí)現(xiàn)操作目的,下面這篇文章主要給大家介紹了關(guān)于matlab和Excel的交互操作的相關(guān)資料,非xlsread和xlswrite,需要的朋友可以參考下2021-08-08

