欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python線程池如何使用

 更新時(shí)間:2020年05月28日 14:51:18   作者:流芳  
在本篇文章里小編給各位分享的是關(guān)于python中的線程池用法及實(shí)例內(nèi)容,需要的朋友們可以參考下。

線程池的使用

線程池的基類是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個(gè)子類,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于創(chuàng)建線程池,而 ProcessPoolExecutor 用于創(chuàng)建進(jìn)程池。

如果使用線程池/進(jìn)程池來管理并發(fā)編程,那么只要將相應(yīng)的 task 函數(shù)提交給線程池/進(jìn)程池,剩下的事情就由線程池/進(jìn)程池來搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關(guān)鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。
  • map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì) iterables 執(zhí)行 map 處理。
  • shutdown(wait=True):關(guān)閉線程池。

程序?qū)?task 函數(shù)提交(submit)給線程池后,submit 方法會(huì)返回一個(gè) Future 對(duì)象,F(xiàn)uture 類主要用于獲取線程任務(wù)函數(shù)的返回值。由于線程任務(wù)會(huì)在新線程中以異步方式執(zhí)行,因此,線程執(zhí)行的函數(shù)相當(dāng)于一個(gè)“將來完成”的任務(wù),所以 Python 使用 Future 來代表。

實(shí)際上,在 Java 的多線程編程中同樣有 Future,此處的 Future 與 Java 的 Future 大同小異。

Future 提供了如下方法:

  • cancel():取消該 Future 代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會(huì)取消該任務(wù),并返回 True。
  • cancelled():返回 Future 代表的線程任務(wù)是否被成功取消。
  • running():如果該 Future 代表的線程任務(wù)正在執(zhí)行、不可被取消,該方法返回 True。
  • done():如果該 Funture 代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回 True。
  • result(timeout=None):獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。
  • exception(timeout=None):獲取該 Future 代表的線程任務(wù)所引發(fā)的異常。如果該任務(wù)成功完成,沒有異常,則該方法返回 None。
  • add_done_callback(fn):為該 Future 代表的線程任務(wù)注冊一個(gè)“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)該 fn 函數(shù)。

在用完一個(gè)線程池后,應(yīng)該調(diào)用該線程池的 shutdown() 方法,該方法將啟動(dòng)線程池的關(guān)閉序列。調(diào)用 shutdown() 方法后的線程池不再接收新任務(wù),但會(huì)將以前所有的已提交任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,該線程池中的所有線程都會(huì)死亡。

使用線程池來執(zhí)行線程任務(wù)的步驟如下:

a、調(diào)用 ThreadPoolExecutor 類的構(gòu)造器創(chuàng)建一個(gè)線程池。

b、定義一個(gè)普通函數(shù)作為線程任務(wù)。

c、調(diào)用 ThreadPoolExecutor 對(duì)象的 submit() 方法來提交線程任務(wù)。

d、當(dāng)不想提交任何任務(wù)時(shí),調(diào)用 ThreadPoolExecutor 對(duì)象的 shutdown() 方法來關(guān)閉線程池。

下面程序示范了如何使用線程池來執(zhí)行線程任務(wù):

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個(gè)準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
  my_sum = 0
  for i in range(max):
    print(threading.current_thread().name + ' ' + str(i))
    my_sum += i
  return my_sum
# 創(chuàng)建一個(gè)包含2條線程的線程池
pool = ThreadPoolExecutor(max_workers=2)
# 向線程池提交一個(gè)task, 50會(huì)作為action()函數(shù)的參數(shù)
future1 = pool.submit(action, 50)
# 向線程池再提交一個(gè)task, 100會(huì)作為action()函數(shù)的參數(shù)
future2 = pool.submit(action, 100)
# 判斷future1代表的任務(wù)是否結(jié)束
print(future1.done())
time.sleep(3)
# 判斷future2代表的任務(wù)是否結(jié)束
print(future2.done())
# 查看future1代表的任務(wù)返回的結(jié)果
print(future1.result())
# 查看future2代表的任務(wù)返回的結(jié)果
print(future2.result())
# 關(guān)閉線程池
pool.shutdown()

上面程序中,第 13 行代碼創(chuàng)建了一個(gè)包含兩個(gè)線程的線程池,接下來的兩行代碼只要將 action() 函數(shù)提交(submit)給線程池,該線程池就會(huì)負(fù)責(zé)啟動(dòng)線程來執(zhí)行 action() 函數(shù)。這種啟動(dòng)線程的方法既優(yōu)雅,又具有更高的效率。

當(dāng)程序把 action() 函數(shù)提交給線程池時(shí),submit() 方法會(huì)返回該任務(wù)所對(duì)應(yīng)的 Future 對(duì)象,程序立即判斷 futurel 的 done() 方法,該方法將會(huì)返回 False(表明此時(shí)該任務(wù)還未完成)。接下來主程序暫停 3 秒,然后判斷 future2 的 done() 方法,如果此時(shí)該任務(wù)已經(jīng)完成,那么該方法將會(huì)返回 True。

程序最后通過 Future 的 result() 方法來獲取兩個(gè)異步任務(wù)返回的結(jié)果。

讀者可以自己運(yùn)行此代碼查看運(yùn)行結(jié)果,這里不再演示。

當(dāng)程序使用 Future 的 result() 方法來獲取結(jié)果時(shí),該方法會(huì)阻塞當(dāng)前線程,如果沒有指定 timeout 參數(shù),當(dāng)前線程將一直處于阻塞狀態(tài),直到 Future 代表的任務(wù)返回。

獲取執(zhí)行結(jié)果

前面程序調(diào)用了 Future 的 result() 方法來獲取線程任務(wù)的運(yùn)回值,但該方法會(huì)阻塞當(dāng)前主線程,只有等到錢程任務(wù)完成后,result() 方法的阻塞才會(huì)被解除。

如果程序不希望直接調(diào)用 result() 方法阻塞線程,則可通過 Future 的 add_done_callback() 方法來添加回調(diào)函數(shù),該回調(diào)函數(shù)形如 fn(future)。當(dāng)線程任務(wù)完成后,程序會(huì)自動(dòng)觸發(fā)該回調(diào)函數(shù),并將對(duì)應(yīng)的 Future 對(duì)象作為參數(shù)傳給該回調(diào)函數(shù)。

下面程序使用 add_done_callback() 方法來獲取線程任務(wù)的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個(gè)準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
  my_sum = 0
  for i in range(max):
    print(threading.current_thread().name + ' ' + str(i))
    my_sum += i
  return my_sum
# 創(chuàng)建一個(gè)包含2條線程的線程池
with ThreadPoolExecutor(max_workers=2) as pool:
  # 向線程池提交一個(gè)task, 50會(huì)作為action()函數(shù)的參數(shù)
  future1 = pool.submit(action, 50)
  # 向線程池再提交一個(gè)task, 100會(huì)作為action()函數(shù)的參數(shù)
  future2 = pool.submit(action, 100)
  def get_result(future):
    print(future.result())
  # 為future1添加線程完成的回調(diào)函數(shù)
  future1.add_done_callback(get_result)
  # 為future2添加線程完成的回調(diào)函數(shù)
  future2.add_done_callback(get_result)
  print('--------------')

上面主程序分別為 future1、future2 添加了同一個(gè)回調(diào)函數(shù),該回調(diào)函數(shù)會(huì)在線程任務(wù)結(jié)束時(shí)獲取其返回值。

主程序的最后一行代碼打印了一條橫線。由于程序并未直接調(diào)用 future1、future2 的 result() 方法,因此主線程不會(huì)被阻塞,可以立即看到輸出主線程打印出的橫線。接下來將會(huì)看到兩個(gè)新線程并發(fā)執(zhí)行,當(dāng)線程任務(wù)執(zhí)行完成后,get_result() 函數(shù)被觸發(fā),輸出線程任務(wù)的返回值。

另外,由于線程池實(shí)現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此,程序可以使用 with 語句來管理線程池,這樣即可避免手動(dòng)關(guān)閉線程池,如上面的程序所示。

此外,Exectuor 還提供了一個(gè) map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能類似于全局函數(shù) map(),區(qū)別在于線程池的 map() 方法會(huì)為 iterables 的每個(gè)元素啟動(dòng)一個(gè)線程,以并發(fā)方式來執(zhí)行 func 函數(shù)。這種方式相當(dāng)于啟動(dòng) len(iterables) 個(gè)線程,井收集每個(gè)線程的執(zhí)行結(jié)果。

例如,如下程序使用 Executor 的 map() 方法來啟動(dòng)線程,并收集線程任務(wù)的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個(gè)準(zhǔn)備作為線程任務(wù)的函數(shù)
def action(max):
  my_sum = 0
  for i in range(max):
    print(threading.current_thread().name + ' ' + str(i))
    my_sum += i
  return my_sum
# 創(chuàng)建一個(gè)包含4條線程的線程池
with ThreadPoolExecutor(max_workers=4) as pool:
  # 使用線程執(zhí)行map計(jì)算
  # 后面元組有3個(gè)元素,因此程序啟動(dòng)3條線程來執(zhí)行action函數(shù)
  results = pool.map(action, (50, 100, 150))
  print('--------------')
  for r in results:
print(r)

上面程序使用 map() 方法來啟動(dòng) 3 個(gè)線程(該程序的線程池包含 4 個(gè)線程,如果繼續(xù)使用只包含兩個(gè)線程的線程池,此時(shí)將有一個(gè)任務(wù)處于等待狀態(tài),必須等其中一個(gè)任務(wù)完成,線程空閑出來才會(huì)獲得執(zhí)行的機(jī)會(huì)),map() 方法的返回值將會(huì)收集每個(gè)線程任務(wù)的返回結(jié)果。

運(yùn)行上面程序,同樣可以看到 3 個(gè)線程并發(fā)執(zhí)行的結(jié)果,最后通過 results 可以看到 3 個(gè)線程任務(wù)的返回結(jié)果。

通過上面程序可以看出,使用 map() 方法來啟動(dòng)線程,并收集線程的執(zhí)行結(jié)果,不僅具有代碼簡單的優(yōu)點(diǎn),而且雖然程序會(huì)以并發(fā)方式來執(zhí)行 action() 函數(shù),但最后收集的 action() 函數(shù)的執(zhí)行結(jié)果,依然與傳入?yún)?shù)的結(jié)果保持一致。也就是說,上面 results 的第一個(gè)元素是 action(50) 的結(jié)果,第二個(gè)元素是 action(100) 的結(jié)果,第三個(gè)元素是 action(150) 的結(jié)果。

實(shí)例擴(kuò)展:

# coding:utf-8
  
import Queue
import threading
import sys
import time
import math
  
  
class WorkThread(threading.Thread):
  
  def __init__(self, task_queue):
    threading.Thread.__init__(self)
    self.setDaemon(True)
    self.task_queue = task_queue
    self.start()
    self.idle = True
  
  def run(self):
    sleep_time = 0.01 # 第1次無任務(wù)可做時(shí)休息10毫秒
    multiply = 0
    while True:
      try:
        # 從隊(duì)列中取一個(gè)任務(wù)
        func, args, kwargs = self.task_queue.get(block=False)
        self.idle = False
        multiply = 0
        # 執(zhí)行之
        func(*args, **kwargs)
      except Queue.Empty:
        time.sleep(sleep_time * math.pow(2, multiply))
        self.idle = True
        multiply += 1
        continue
      except:
        print sys.exc_info()
        raise
  
  
class ThreadPool:
  
  def __init__(self, thread_num=10, max_queue_len=1000):
    self.max_queue_len = max_queue_len
    self.task_queue = Queue.Queue(max_queue_len) # 任務(wù)等待隊(duì)列
    self.threads = []
    self.__create_pool(thread_num)
  
  def __create_pool(self, thread_num):
    for i in xrange(thread_num):
      thread = WorkThread(self.task_queue)
      self.threads.append(thread)
  
  def add_task(self, func, *args, **kwargs):
    '''添加一個(gè)任務(wù),返回任務(wù)等待隊(duì)列的長度
      調(diào)用該方法前最后先調(diào)用isSafe()判斷一下等待的任務(wù)是不是很多,以防止提交的任務(wù)被拒絕
    '''
    try:
      self.task_queue.put((func, args, kwargs))
    except Queue.Full:
      raise # 隊(duì)列已滿時(shí)直接拋出異常,不給執(zhí)行
    return self.task_queue.qsize()
  
  def isSafe(self):
    '''等待的任務(wù)數(shù)量離警界線還比較遠(yuǎn)
    '''
    return self.task_queue.qsize() < 0.9 * self.max_queue_len
  
  def wait_for_complete(self):
    '''等待提交到線程池的所有任務(wù)都執(zhí)行完畢
    '''
    #首先任務(wù)等待隊(duì)列要變成空
    while not self.task_queue.empty():
      time.sleep(1)
    # 其次,所以計(jì)算線程要變成idle狀態(tài)
    while True:
      all_idle = True
      for th in self.threads:
        if not th.idle:
          all_idle = False
          break
      if all_idle:
        break
      else:
        time.sleep(1)
  
  
if __name__ == '__main__':
  def foo(a, b):
    print a + b
    time.sleep(0.01)
  thread_pool = ThreadPool(10, 100)
  '''在Windows上測試不通過,Windows上Queue.Queue不是線程安全的'''
  size = 0
  for i in xrange(10000):
    try:
      size = thread_pool.add_task(foo, i, 2 * i)
    except Queue.Full:
      print 'queue full, queue size is ', size
  time.sleep(2)

到此這篇關(guān)于python線程池如何使用的文章就介紹到這了,更多相關(guān)python中的線程池詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Ubuntu+python將nii圖像保存成png格式

    Ubuntu+python將nii圖像保存成png格式

    這篇文章主要介紹了Ubuntu+python將nii圖像保存成png格式,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • Python下載網(wǎng)絡(luò)文本數(shù)據(jù)到本地內(nèi)存的四種實(shí)現(xiàn)方法示例

    Python下載網(wǎng)絡(luò)文本數(shù)據(jù)到本地內(nèi)存的四種實(shí)現(xiàn)方法示例

    這篇文章主要介紹了Python下載網(wǎng)絡(luò)文本數(shù)據(jù)到本地內(nèi)存的四種實(shí)現(xiàn)方法,涉及Python網(wǎng)絡(luò)傳輸、文本讀寫、內(nèi)存I/O、矩陣運(yùn)算等相關(guān)操作技巧,代碼中包含了較為詳盡的注釋說明便于理解,需要的朋友可以參考下
    2018-02-02
  • Python機(jī)器學(xué)習(xí)利用隨機(jī)森林對(duì)特征重要性計(jì)算評(píng)估

    Python機(jī)器學(xué)習(xí)利用隨機(jī)森林對(duì)特征重要性計(jì)算評(píng)估

    本文是對(duì)隨機(jī)森林如何用在特征選擇上做一個(gè)簡單的介紹。隨機(jī)森林非常簡單,易于實(shí)現(xiàn),計(jì)算開銷也很小,更令人驚奇的是它在分類和回歸上表現(xiàn)出了十分驚人的性能
    2021-10-10
  • PyQt5中QSpinBox計(jì)數(shù)器的實(shí)現(xiàn)

    PyQt5中QSpinBox計(jì)數(shù)器的實(shí)現(xiàn)

    這篇文章主要介紹了PyQt5中QSpinBox計(jì)數(shù)器的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • python?Pydub簡單易用的音頻處理庫使用實(shí)例探索

    python?Pydub簡單易用的音頻處理庫使用實(shí)例探索

    Pydub是一個(gè)簡單易用的Python庫,它讓音頻處理變得像處理列表或字符串一樣簡單,你可以用Pydub來剪輯、合并、調(diào)整音頻文件,以及執(zhí)行許多其他的音頻處理任務(wù),它支持多種音頻格式,包括常見的MP3、WAV和AAC
    2024-01-01
  • 閉包在python中的應(yīng)用之translate和maketrans用法詳解

    閉包在python中的應(yīng)用之translate和maketrans用法詳解

    這篇文章主要介紹了閉包在python中的應(yīng)用之translate和maketrans用法,是比較實(shí)用的技巧,需要的朋友可以參考下
    2014-08-08
  • 簡單談?wù)凱ython中的閉包

    簡單談?wù)凱ython中的閉包

    一般來說閉包這個(gè)概念在很多語言中都有涉及,簡單說,閉包就是根據(jù)不同的配置信息得到不同的結(jié)果,下面我們來專門講下在Python中的閉包
    2016-11-11
  • Python每天必學(xué)之bytes字節(jié)

    Python每天必學(xué)之bytes字節(jié)

    Python每天必學(xué)之bytes字節(jié),針對(duì)Python中的bytes字節(jié)進(jìn)行學(xué)習(xí)理解,感興趣的小伙伴們可以參考一下
    2016-01-01
  • Python IndexError報(bào)錯(cuò)分析及解決方法

    Python IndexError報(bào)錯(cuò)分析及解決方法

    在Python編程中,IndexError是一種常見的異常類型,它通常發(fā)生在嘗試訪問序列(如列表、元組或字符串)中不存在的索引時(shí),本文將深入分析IndexError的成因、表現(xiàn)形式,并提供相應(yīng)的解決辦法,同時(shí)附帶詳細(xì)的代碼示例,需要的朋友可以參考下
    2024-07-07
  • Python訪問OPCUA服務(wù)器,訂閱一個(gè)變量標(biāo)簽方式

    Python訪問OPCUA服務(wù)器,訂閱一個(gè)變量標(biāo)簽方式

    這篇文章主要介紹了Python訪問OPCUA服務(wù)器,訂閱一個(gè)變量標(biāo)簽方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02

最新評(píng)論