欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python中的并發(fā)編程實(shí)例

 更新時(shí)間:2014年07月07日 12:05:58   投稿:junjie  
這篇文章主要介紹了Python中的并發(fā)編程實(shí)例,主要是對(duì)Threading模塊的應(yīng)用,文中自定義了一個(gè)Threading類庫(kù),需要的朋友可以參考下

一、簡(jiǎn)介

  我們將一個(gè)正在運(yùn)行的程序稱為進(jìn)程。每個(gè)進(jìn)程都有它自己的系統(tǒng)狀態(tài),包含內(nèi)存狀態(tài)、打開文件列表、追蹤指令執(zhí)行情況的程序指針以及一個(gè)保存局部變量的調(diào)用棧。通常情況下,一個(gè)進(jìn)程依照一個(gè)單序列控制流順序執(zhí)行,這個(gè)控制流被稱為該進(jìn)程的主線程。在任何給定的時(shí)刻,一個(gè)程序只做一件事情。

  一個(gè)程序可以通過(guò)Python庫(kù)函數(shù)中的os或subprocess模塊創(chuàng)建新進(jìn)程(例如os.fork()或是subprocess.Popen())。然而,這些被稱為子進(jìn)程的進(jìn)程卻是獨(dú)立運(yùn)行的,它們有各自獨(dú)立的系統(tǒng)狀態(tài)以及主線程。因?yàn)檫M(jìn)程之間是相互獨(dú)立的,因此它們同原有的進(jìn)程并發(fā)執(zhí)行。這是指原進(jìn)程可以在創(chuàng)建子進(jìn)程后去執(zhí)行其它工作。

  雖然進(jìn)程之間是相互獨(dú)立的,但是它們能夠通過(guò)名為進(jìn)程間通信(IPC)的機(jī)制進(jìn)行相互通信。一個(gè)典型的模式是基于消息傳遞,可以將其簡(jiǎn)單地理解為一個(gè)純字節(jié)的緩沖區(qū),而send()或recv()操作原語(yǔ)可以通過(guò)諸如管道(pipe)或是網(wǎng)絡(luò)套接字(network socket)等I/O通道傳輸或接收消息。還有一些IPC模式可以通過(guò)內(nèi)存映射(memory-mapped)機(jī)制完成(例如mmap模塊),通過(guò)內(nèi)存映射,進(jìn)程可以在內(nèi)存中創(chuàng)建共享區(qū)域,而對(duì)這些區(qū)域的修改對(duì)所有的進(jìn)程可見。

  多進(jìn)程能夠被用于需要同時(shí)執(zhí)行多個(gè)任務(wù)的場(chǎng)景,由不同的進(jìn)程負(fù)責(zé)任務(wù)的不同部分。然而,另一種將工作細(xì)分到任務(wù)的方法是使用線程。同進(jìn)程類似,線程也有其自己的控制流以及執(zhí)行棧,但線程在創(chuàng)建它的進(jìn)程之內(nèi)運(yùn)行,分享其父進(jìn)程的所有數(shù)據(jù)和系統(tǒng)資源。當(dāng)應(yīng)用需要完成并發(fā)任務(wù)的時(shí)候線程是很有用的,但是潛在的問(wèn)題是任務(wù)間必須分享大量的系統(tǒng)狀態(tài)。

  當(dāng)使用多進(jìn)程或多線程時(shí),操作系統(tǒng)負(fù)責(zé)調(diào)度。這是通過(guò)給每個(gè)進(jìn)程(或線程)一個(gè)很小的時(shí)間片并且在所有活動(dòng)任務(wù)之間快速循環(huán)切換來(lái)實(shí)現(xiàn)的,這個(gè)過(guò)程將CPU時(shí)間分割為小片段分給各個(gè)任務(wù)。例如,如果你的系統(tǒng)中有10個(gè)活躍的進(jìn)程正在執(zhí)行,操作系統(tǒng)將會(huì)適當(dāng)?shù)膶⑹种坏腃PU時(shí)間分配給每個(gè)進(jìn)程并且循環(huán)地在十個(gè)進(jìn)程之間切換。當(dāng)系統(tǒng)不止有一個(gè)CPU核時(shí),操作系統(tǒng)能夠?qū)⑦M(jìn)程調(diào)度到不同的CPU核上,保持系統(tǒng)負(fù)載平均以實(shí)現(xiàn)并行執(zhí)行。

  利用并發(fā)執(zhí)行機(jī)制寫的程序需要考慮一些復(fù)雜的問(wèn)題。復(fù)雜性的主要來(lái)源是關(guān)于同步和共享數(shù)據(jù)的問(wèn)題。通常情況下,多個(gè)任務(wù)同時(shí)試圖更新同一個(gè)數(shù)據(jù)結(jié)構(gòu)會(huì)造成臟數(shù)據(jù)和程序狀態(tài)不一致的問(wèn)題(正式的說(shuō)法是資源競(jìng)爭(zhēng)的問(wèn)題)。為了解決這個(gè)問(wèn)題,需要使用互斥鎖或是其他相似的同步原語(yǔ)來(lái)標(biāo)識(shí)并保護(hù)程序中的關(guān)鍵部分。舉個(gè)例子,如果多個(gè)不同的線程正在試圖同時(shí)向同一個(gè)文件寫入數(shù)據(jù),那么你需要一個(gè)互斥鎖使這些寫操作依次執(zhí)行,當(dāng)一個(gè)線程在寫入時(shí),其他線程必須等待直到當(dāng)前線程釋放這個(gè)資源。

Python中的并發(fā)編程

  Python長(zhǎng)久以來(lái)一直支持不同方式的并發(fā)編程,包括線程、子進(jìn)程以及其他利用生成器(generator function)的并發(fā)實(shí)現(xiàn)。

  Python在大部分系統(tǒng)上同時(shí)支持消息傳遞和基于線程的并發(fā)編程機(jī)制。雖然大部分程序員對(duì)線程接口更為熟悉,但是Python的線程機(jī)制卻有著諸多的限制。Python使用了內(nèi)部全局解釋器鎖(GIL)來(lái)保證線程安全,GIL同時(shí)只允許一個(gè)線程執(zhí)行。這使得Python程序就算在多核系統(tǒng)上也只能在單個(gè)處理器上運(yùn)行。Python界關(guān)于GIL的爭(zhēng)論盡管很多,但在可預(yù)見的未來(lái)卻沒有將其移除的可能。

  Python提供了一些很精巧的工具用于管理基于線程和進(jìn)程的并發(fā)操作。即使是簡(jiǎn)單地程序也能夠使用這些工具使得任務(wù)并發(fā)進(jìn)行從而加快運(yùn)行速度。subprocess模塊為子進(jìn)程的創(chuàng)建和通信提供了API。這特別適合運(yùn)行與文本相關(guān)的程序,因?yàn)檫@些API支持通過(guò)新進(jìn)程的標(biāo)準(zhǔn)輸入輸出通道傳送數(shù)據(jù)。signal模塊將UNIX系統(tǒng)的信號(hào)量機(jī)制暴露給用戶,用以在進(jìn)程之間傳遞事件信息。信號(hào)是異步處理的,通常有信號(hào)到來(lái)時(shí)會(huì)中斷程序當(dāng)前的工作。信號(hào)機(jī)制能夠?qū)崿F(xiàn)粗粒度的消息傳遞系統(tǒng),但是有其他更可靠的進(jìn)程內(nèi)通訊技術(shù)能夠傳遞更復(fù)雜的消息。threading模塊為并發(fā)操作提供了一系列高級(jí)的,面向?qū)ο蟮腁PI。Thread對(duì)象們?cè)谝粋€(gè)進(jìn)程內(nèi)并發(fā)地運(yùn)行,分享內(nèi)存資源。使用線程能夠更好地?cái)U(kuò)展I/O密集型的任務(wù)。multiprocessing模塊同threading模塊類似,不過(guò)它提供了對(duì)于進(jìn)程的操作。每個(gè)進(jìn)程類是真實(shí)的操作系統(tǒng)進(jìn)程,并且沒有共享內(nèi)存資源,但multiprocessing模塊提供了進(jìn)程間共享數(shù)據(jù)以及傳遞消息的機(jī)制。通常情況下,將基于線程的程序改為基于進(jìn)程的很簡(jiǎn)單,只需要修改一些import聲明即可。

Threading模塊示例

  以threading模塊為例,思考這樣一個(gè)簡(jiǎn)單的問(wèn)題:如何使用分段并行的方式完成一個(gè)大數(shù)的累加。

import threading
 
class SummingThread(threading.Thread):
  def __init__(self, low, high):
    super(SummingThread, self).__init__()
    self.low = low
    self.high = high
    self.total = 0
 
  def run(self):
    for i in range(self.low, self.high):
      self.total += i
 
thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

自定義Threading類庫(kù)

  我寫了一個(gè)易于使用threads的小型Python類庫(kù),包含了一些有用的類和函數(shù)。

關(guān)鍵參數(shù):

  * do_threaded_work – 該函數(shù)將一系列給定的任務(wù)分配給對(duì)應(yīng)的處理函數(shù)(分配順序不確定)

  * ThreadedWorker – 該類創(chuàng)建一個(gè)線程,它將從一個(gè)同步的工作隊(duì)列中拉取工作任務(wù)并將處理結(jié)果寫入同步結(jié)果隊(duì)列

  * start_logging_with_thread_info – 將線程id寫入所有日志消息。(依賴日志環(huán)境)

  * stop_logging_with_thread_info – 用于將線程id從所有的日志消息中移除。(依賴日志環(huán)境)

import threading
import logging
 
def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
  """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).
 
    Parameters:
    - num_threads        Default: len(work_items) --- Number of threads to use process items in work_items.
    - per_sync_timeout     Default: 1        --- Each synchronized operation can optionally timeout.
    - preserve_result_ordering Default: True       --- Reorders result_item to match original work_items ordering.
 
    Return: 
    --- list of results from applying work_func to each work_item. Order is optionally preserved.
 
    Example:
 
    def process_url(url):
      # TODO: Do some work with the url
      return url
 
    urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
    # process urls in parallel
    result_items = do_threaded_work(urls_to_process, process_url)
 
    # print(results)
    print(repr(result_items))
  """
  global wrapped_work_func
  if not num_threads:
    num_threads = len(work_items)
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  index = 0
  for work_item in work_items:
    if preserve_result_ordering:
      work_queue.put((index, work_item))
    else:
      work_queue.put(work_item)
    index += 1
 
  if preserve_result_ordering:
    wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))
 
  start_logging_with_thread_info()
 
  #spawn a pool of threads, and pass them queue instance 
  for _ in range(num_threads):
    if preserve_result_ordering:
      t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
    else:
      t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
    t.setDaemon(True)
    t.start()
 
  work_queue.join()
  stop_logging_with_thread_info()
 
  logging.info('work_queue joined')
 
  result_items = []
  while not result_queue.empty():
    result = result_queue.get(timeout=per_sync_timeout)
    logging.info('found result[:500]: ' + repr(result)[:500])
    if result:
      result_items.append(result)
 
  if preserve_result_ordering:
    result_items = [work_item for index, work_item in result_items]
 
  return result_items
 
class ThreadedWorker(threading.Thread):
  """ Generic Threaded Worker
    Input to work_func: item from work_queue
 
  Example usage:
 
  import Queue
 
  urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  def process_url(url):
    # TODO: Do some work with the url
    return url
 
  def main():
    # spawn a pool of threads, and pass them queue instance 
    for i in range(3):
      t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
      t.setDaemon(True)
      t.start()
 
    # populate queue with data  
    for url in urls_to_process:
      work_queue.put(url)
 
    # wait on the queue until everything has been processed   
    work_queue.join()
 
    # print results
    print repr(result_queue)
 
  main()
  """
 
  def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
    threading.Thread.__init__(self)
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.work_func = work_func
    self.stop_when_work_queue_empty = stop_when_work_queue_empty
    self.queue_timeout = queue_timeout
 
  def should_continue_running(self):
    if self.stop_when_work_queue_empty:
      return not self.work_queue.empty()
    else:
      return True
 
  def run(self):
    while self.should_continue_running():
      try:
        # grabs item from work_queue
        work_item = self.work_queue.get(timeout=self.queue_timeout)
 
        # works on item
        work_result = self.work_func(work_item)
 
        #place work_result into result_queue
        self.result_queue.put(work_result, timeout=self.queue_timeout)
 
      except Queue.Empty:
        logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')
 
      except Queue.Full:
        logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')
 
      except:
        logging.exception('Error in ThreadedWorker')
 
      finally:
        #signals to work_queue that item is done
        self.work_queue.task_done()
 
def start_logging_with_thread_info():
  try:
    formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to start logging with thread info')
 
def stop_logging_with_thread_info():
  try:
    formatter = logging.Formatter('%(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to stop logging with thread info')

 使用示例

from test import ThreadedWorker
from queue import Queue
 
urls_to_process = ["http://facebook.com", "http://pypix.com"]
 
work_queue = Queue()
result_queue = Queue()
 
def process_url(url):
  # TODO: Do some work with the url
  return url
 
def main():
  # spawn a pool of threads, and pass them queue instance 
  for i in range(5):
    t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
    t.setDaemon(True)
    t.start()
 
  # populate queue with data  
  for url in urls_to_process:
    work_queue.put(url)
 
  # wait on the queue until everything has been processed   
  work_queue.join()
 
  # print results
  print(repr(result_queue))
 
main()

相關(guān)文章

  • web.py在SAE中的Session問(wèn)題解決方法(使用mysql存儲(chǔ))

    web.py在SAE中的Session問(wèn)題解決方法(使用mysql存儲(chǔ))

    這篇文章主要介紹了web.py在SAE中的Session問(wèn)題解決方法(使用mysql存儲(chǔ)),本文直接給出實(shí)現(xiàn)代碼,代碼中包含詳細(xì)注釋,需要的朋友可以參考下
    2015-06-06
  • Windows下實(shí)現(xiàn)pytorch環(huán)境搭建

    Windows下實(shí)現(xiàn)pytorch環(huán)境搭建

    這篇文章主要介紹了Windows下實(shí)現(xiàn)pytorch環(huán)境搭建,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Python綁定方法與非綁定方法詳解

    Python綁定方法與非綁定方法詳解

    這篇文章主要為大家詳細(xì) 介紹了Python綁定方法與非綁定方法的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • python通過(guò)ElementTree操作XML

    python通過(guò)ElementTree操作XML

    這篇文章介紹了python通過(guò)ElementTree操作XML的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-07-07
  • Python自動(dòng)重試HTTP連接裝飾器

    Python自動(dòng)重試HTTP連接裝飾器

    這篇文章主要介紹了Python自動(dòng)重試HTTP連接裝飾器,有時(shí)候我們要去別的接口取數(shù)據(jù),可能因?yàn)榫W(wǎng)絡(luò)原因偶爾失敗,為了能自動(dòng)重試,寫了這么一個(gè)裝飾器,可以實(shí)現(xiàn)自動(dòng)重連2次,需要的朋友可以參考下
    2015-04-04
  • PIL圖像處理模塊paste方法簡(jiǎn)單使用詳解

    PIL圖像處理模塊paste方法簡(jiǎn)單使用詳解

    這篇文章主要介紹了PIL圖像處理模塊paste方法簡(jiǎn)單使用詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • 關(guān)于Python中的海象運(yùn)算符使用方法詳解

    關(guān)于Python中的海象運(yùn)算符使用方法詳解

    這篇文章主要介紹了關(guān)于Python中的海象運(yùn)算符“:=”使用方法詳解,海象運(yùn)算符(walrus?operator)是?Python?3.8?中引入的一種新的語(yǔ)法,需要的朋友可以參考下
    2023-04-04
  • python 的集合類型詳解

    python 的集合類型詳解

    這篇文章主要為大家介紹了python的集合類型,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2021-12-12
  • tensorflow 使用flags定義命令行參數(shù)的方法

    tensorflow 使用flags定義命令行參數(shù)的方法

    本篇文章主要介紹了tensorflow 使用flags定義命令行參數(shù)的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-04-04
  • Windows平臺(tái)Python編程必會(huì)模塊之pywin32介紹

    Windows平臺(tái)Python編程必會(huì)模塊之pywin32介紹

    在Windows平臺(tái)上,從原來(lái)使用C/C++編寫原生EXE程序,到使用Python編寫一些常用腳本程序,成熟的模塊的使用使得編程效率大大提高了
    2019-10-10

最新評(píng)論