欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

用Python的線程來解決生產(chǎn)者消費(fèi)問題的示例

 更新時(shí)間:2015年04月02日 17:00:21   作者:Akshar Raaj  
這篇文章主要介紹了用Python的線程來解決生產(chǎn)者消費(fèi)問題的示例,包括對使用線程中容易出現(xiàn)的一些問題給出了相關(guān)解答,需要的朋友可以參考下

我們將使用Python線程來解決Python中的生產(chǎn)者—消費(fèi)者問題。這個(gè)問題完全不像他們在學(xué)校中說的那么難。

如果你對生產(chǎn)者—消費(fèi)者問題有了解,看這篇博客會(huì)更有意義。

為什么要關(guān)心生產(chǎn)者—消費(fèi)者問題:

  •     可以幫你更好地理解并發(fā)和不同概念的并發(fā)。
  •     信息隊(duì)列中的實(shí)現(xiàn)中,一定程度上使用了生產(chǎn)者—消費(fèi)者問題的概念,而你某些時(shí)候必然會(huì)用到消息隊(duì)列。

當(dāng)我們在使用線程時(shí),你可以學(xué)習(xí)以下的線程概念:

  •     Condition:線程中的條件。
  •     wait():在條件實(shí)例中可用的wait()。
  •     notify() :在條件實(shí)例中可用的notify()。

我假設(shè)你已經(jīng)有這些基本概念:線程、競態(tài)條件,以及如何解決靜態(tài)條件(例如使用lock)。否則的話,你建議你去看我上一篇文章basics of Threads。

引用維基百科:

生產(chǎn)者的工作是產(chǎn)生一塊數(shù)據(jù),放到buffer中,如此循環(huán)。與此同時(shí),消費(fèi)者在消耗這些數(shù)據(jù)(例如從buffer中把它們移除),每次一塊。

這里的關(guān)鍵詞是“同時(shí)”。所以生產(chǎn)者和消費(fèi)者是并發(fā)運(yùn)行的,我們需要對生產(chǎn)者和消費(fèi)者做線程分離。
 

from threading import Thread
 
class ProducerThread(Thread):
  def run(self):
    pass
 
class ConsumerThread(Thread):
  def run(self):
    pass

再次引用維基百科:

這個(gè)為描述了兩個(gè)共享固定大小緩沖隊(duì)列的進(jìn)程,即生產(chǎn)者和消費(fèi)者。

假設(shè)我們有一個(gè)全局變量,可以被生產(chǎn)者和消費(fèi)者線程修改。生產(chǎn)者產(chǎn)生數(shù)據(jù)并把它加入到隊(duì)列。消費(fèi)者消耗這些數(shù)據(jù)(例如把它移出)。

queue = []

在剛開始,我們不會(huì)設(shè)置固定大小的條件,而在實(shí)際運(yùn)行時(shí)加入(指下述例子)。

一開始帶bug的程序:

from threading import Thread, Lock
import time
import random
 
queue = []
lock = Lock()
 
class ProducerThread(Thread):
  def run(self):
    nums = range(5) #Will create the list [0, 1, 2, 3, 4]
    global queue
    while True:
      num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4]
      lock.acquire()
      queue.append(num)
      print "Produced", num
      lock.release()
      time.sleep(random.random())
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      lock.acquire()
      if not queue:
        print "Nothing in queue, but consumer will try to consume"
      num = queue.pop(0)
      print "Consumed", num
      lock.release()
      time.sleep(random.random())
 
ProducerThread().start()
ConsumerThread().start()

運(yùn)行幾次并留意一下結(jié)果。如果程序在IndexError異常后并沒有自動(dòng)結(jié)束,用Ctrl+Z結(jié)束運(yùn)行。

樣例輸出:
 

Produced 3
Consumed 3
Produced 4
Consumed 4
Produced 1
Consumed 1
Nothing in queue, but consumer will try to consume
Exception in thread Thread-2:
Traceback (most recent call last):
 File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
  self.run()
 File "producer_consumer.py", line 31, in run
  num = queue.pop(0)
IndexError: pop from empty list

解釋:

  •     我們開始了一個(gè)生產(chǎn)者線程(下稱生產(chǎn)者)和一個(gè)消費(fèi)者線程(下稱消費(fèi)者)。
  •     生產(chǎn)者不停地添加(數(shù)據(jù))到隊(duì)列,而消費(fèi)者不停地消耗。
  •     由于隊(duì)列是一個(gè)共享變量,我們把它放到lock程序塊內(nèi),以防發(fā)生競態(tài)條件。
  •     在某一時(shí)間點(diǎn),消費(fèi)者把所有東西消耗完畢而生產(chǎn)者還在掛起(sleep)。消費(fèi)者嘗試?yán)^續(xù)進(jìn)行消耗,但此時(shí)隊(duì)列為空,出現(xiàn)IndexError異常。
  •     在每次運(yùn)行過程中,在發(fā)生IndexError異常之前,你會(huì)看到print語句輸出”Nothing in queue, but consumer will try to consume”,這是你出錯(cuò)的原因。

我們把這個(gè)實(shí)現(xiàn)作為錯(cuò)誤行為(wrong behavior)。

什么是正確行為?

當(dāng)隊(duì)列中沒有任何數(shù)據(jù)的時(shí)候,消費(fèi)者應(yīng)該停止運(yùn)行并等待(wait),而不是繼續(xù)嘗試進(jìn)行消耗。而當(dāng)生產(chǎn)者在隊(duì)列中加入數(shù)據(jù)之后,應(yīng)該有一個(gè)渠道去告訴(notify)消費(fèi)者。然后消費(fèi)者可以再次從隊(duì)列中進(jìn)行消耗,而IndexError不再出現(xiàn)。

關(guān)于條件

    條件(condition)可以讓一個(gè)或多個(gè)線程進(jìn)入wait,直到被其他線程notify。參考:?http://docs.python.org/2/library/threading.html#condition-objects

這就是我們所需要的。我們希望消費(fèi)者在隊(duì)列為空的時(shí)候wait,只有在被生產(chǎn)者notify后恢復(fù)。生產(chǎn)者只有在往隊(duì)列中加入數(shù)據(jù)后進(jìn)行notify。因此在生產(chǎn)者notify后,可以確保隊(duì)列非空,因此消費(fèi)者消費(fèi)時(shí)不會(huì)出現(xiàn)異常。

  •     condition內(nèi)含lock。
  •     condition有acquire()和release()方法,用以調(diào)用內(nèi)部的lock的對應(yīng)方法。

condition的acquire()和release()方法內(nèi)部調(diào)用了lock的acquire()和release()。所以我們可以用condiction實(shí)例取代lock實(shí)例,但lock的行為不會(huì)改變。
生產(chǎn)者和消費(fèi)者需要使用同一個(gè)condition實(shí)例, 保證wait和notify正常工作。

重寫消費(fèi)者代碼:
 

from threading import Condition
 
condition = Condition()
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      condition.acquire()
      if not queue:
        print "Nothing in queue, consumer is waiting"
        condition.wait()
        print "Producer added something to queue and notified the consumer"
      num = queue.pop(0)
      print "Consumed", num
      condition.release()
      time.sleep(random.random())

重寫生產(chǎn)者代碼:
 

class ProducerThread(Thread):
  def run(self):
    nums = range(5)
    global queue
    while True:
      condition.acquire()
      num = random.choice(nums)
      queue.append(num)
      print "Produced", num
      condition.notify()
      condition.release()
      time.sleep(random.random())

樣例輸出:
 

Produced 3
Consumed 3
Produced 1
Consumed 1
Produced 4
Consumed 4
Produced 3
Consumed 3
Nothing in queue, consumer is waiting
Produced 2
Producer added something to queue and notified the consumer
Consumed 2
Nothing in queue, consumer is waiting
Produced 2
Producer added something to queue and notified the consumer
Consumed 2
Nothing in queue, consumer is waiting
Produced 3
Producer added something to queue and notified the consumer
Consumed 3
Produced 4
Consumed 4
Produced 1
Consumed 1

解釋:

  •     對于消費(fèi)者,在消費(fèi)前檢查隊(duì)列是否為空。
  •     如果為空,調(diào)用condition實(shí)例的wait()方法。
  •     消費(fèi)者進(jìn)入wait(),同時(shí)釋放所持有的lock。
  •     除非被notify,否則它不會(huì)運(yùn)行。
  •     生產(chǎn)者可以acquire這個(gè)lock,因?yàn)樗呀?jīng)被消費(fèi)者release。
  •     當(dāng)調(diào)用了condition的notify()方法后,消費(fèi)者被喚醒,但喚醒不意味著它可以開始運(yùn)行。
  •     notify()并不釋放lock,調(diào)用notify()后,lock依然被生產(chǎn)者所持有。
  •     生產(chǎn)者通過condition.release()顯式釋放lock。
  •     消費(fèi)者再次開始運(yùn)行,現(xiàn)在它可以得到隊(duì)列中的數(shù)據(jù)而不會(huì)出現(xiàn)IndexError異常。

為隊(duì)列增加大小限制

生產(chǎn)者不能向一個(gè)滿隊(duì)列繼續(xù)加入數(shù)據(jù)。

它可以用以下方式來實(shí)現(xiàn):

  •     在加入數(shù)據(jù)前,生產(chǎn)者檢查隊(duì)列是否為滿。
  •     如果不為滿,生產(chǎn)者可以繼續(xù)正常流程。
  •     如果為滿,生產(chǎn)者必須等待,調(diào)用condition實(shí)例的wait()。
  •     消費(fèi)者可以運(yùn)行。消費(fèi)者消耗隊(duì)列,并產(chǎn)生一個(gè)空余位置。
  •     然后消費(fèi)者notify生產(chǎn)者。
  •     當(dāng)消費(fèi)者釋放lock,消費(fèi)者可以acquire這個(gè)lock然后往隊(duì)列中加入數(shù)據(jù)。

最終程序如下:

from threading import Thread, Condition
import time
import random
 
queue = []
MAX_NUM = 10
condition = Condition()
 
class ProducerThread(Thread):
  def run(self):
    nums = range(5)
    global queue
    while True:
      condition.acquire()
      if len(queue) == MAX_NUM:
        print "Queue full, producer is waiting"
        condition.wait()
        print "Space in queue, Consumer notified the producer"
      num = random.choice(nums)
      queue.append(num)
      print "Produced", num
      condition.notify()
      condition.release()
      time.sleep(random.random())
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      condition.acquire()
      if not queue:
        print "Nothing in queue, consumer is waiting"
        condition.wait()
        print "Producer added something to queue and notified the consumer"
      num = queue.pop(0)
      print "Consumed", num
      condition.notify()
      condition.release()
      time.sleep(random.random())
 
ProducerThread().start()
ConsumerThread().start()

樣例輸出:
 

Produced 0
Consumed 0
Produced 0
Produced 4
Consumed 0
Consumed 4
Nothing in queue, consumer is waiting
Produced 4
Producer added something to queue and notified the consumer
Consumed 4
Produced 3
Produced 2
Consumed 3

更新:
很多網(wǎng)友建議我在lock和condition下使用Queue來代替使用list。我同意這種做法,但我的目的是展示Condition,wait()和notify()如何工作,所以使用了list。

以下用Queue來更新一下代碼。

Queue封裝了Condition的行為,如wait(),notify(),acquire()。

現(xiàn)在不失為一個(gè)好機(jī)會(huì)讀一下Queue的文檔(http://docs.python.org/2/library/queue.html)。

更新程序:

from threading import Thread
import time
import random
from Queue import Queue
 
queue = Queue(10)
 
class ProducerThread(Thread):
  def run(self):
    nums = range(5)
    global queue
    while True:
      num = random.choice(nums)
      queue.put(num)
      print "Produced", num
      time.sleep(random.random())
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      num = queue.get()
      queue.task_done()
      print "Consumed", num
      time.sleep(random.random())
 
ProducerThread().start()
ConsumerThread().start()

解釋:

  •     在原來使用list的位置,改為使用Queue實(shí)例(下稱隊(duì)列)。
  •     這個(gè)隊(duì)列有一個(gè)condition,它有自己的lock。如果你使用Queue,你不需要為condition和lock而煩惱。
  •     生產(chǎn)者調(diào)用隊(duì)列的put方法來插入數(shù)據(jù)。
  •     put()在插入數(shù)據(jù)前有一個(gè)獲取lock的邏輯。
  •     同時(shí),put()也會(huì)檢查隊(duì)列是否已滿。如果已滿,它會(huì)在內(nèi)部調(diào)用wait(),生產(chǎn)者開始等待。
  •     消費(fèi)者使用get方法。
  •     get()從隊(duì)列中移出數(shù)據(jù)前會(huì)獲取lock。
  •     get()會(huì)檢查隊(duì)列是否為空,如果為空,消費(fèi)者進(jìn)入等待狀態(tài)。
  •     get()和put()都有適當(dāng)?shù)膎otify()?,F(xiàn)在就去看Queue的源碼吧。

相關(guān)文章

  • Python?迭代器Iterator詳情

    Python?迭代器Iterator詳情

    這篇文章主要介紹了Python?迭代器Iterator詳情,迭代器可以幫助我們解決面對復(fù)雜的數(shù)據(jù)場景時(shí),快速簡便的獲取數(shù)據(jù),下文關(guān)于其詳細(xì)介紹,需要的小伙伴可以參考一下
    2022-05-05
  • Python 內(nèi)置函數(shù)之隨機(jī)函數(shù)詳情

    Python 內(nèi)置函數(shù)之隨機(jī)函數(shù)詳情

    這篇文章主要介紹了Python 內(nèi)置函數(shù)之隨機(jī)函數(shù),文章將圍繞Python 內(nèi)置函數(shù)、隨機(jī)函數(shù)的相關(guān)資料展開內(nèi)容,需要的朋友可以參考一下,希望對你有所幫助
    2021-11-11
  • python如何快速拼接字符串

    python如何快速拼接字符串

    這篇文章主要介紹了python如何快速拼接字符串,幫助大家理解和學(xué)習(xí)python,感興趣的朋友可以了解下
    2020-10-10
  • Django如何實(shí)現(xiàn)上傳圖片功能

    Django如何實(shí)現(xiàn)上傳圖片功能

    這篇文章主要介紹了Django如何實(shí)現(xiàn)上傳圖片功能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-08-08
  • python打包壓縮、讀取指定目錄下的指定類型文件

    python打包壓縮、讀取指定目錄下的指定類型文件

    這篇文章主要介紹了python打包壓縮、讀取指定目錄下的指定類型文件,需要的朋友可以參考下
    2018-04-04
  • python自動(dòng)生成證件號的方法示例

    python自動(dòng)生成證件號的方法示例

    這篇文章主要給大家介紹了關(guān)于python自動(dòng)生成證件號的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • python使用正則表達(dá)式替換匹配成功的組并輸出替換的次數(shù)

    python使用正則表達(dá)式替換匹配成功的組并輸出替換的次數(shù)

    正則表達(dá)式是一個(gè)特殊的字符序列,它能幫助你方便的檢查一個(gè)字符串是否與某種模式匹配。這篇文章主要介紹了python使用正則表達(dá)式替換匹配成功的組并輸出替換的次數(shù),需要的朋友可以參考下
    2017-11-11
  • python GUI庫圖形界面開發(fā)之pyinstaller打包python程序?yàn)閑xe安裝文件

    python GUI庫圖形界面開發(fā)之pyinstaller打包python程序?yàn)閑xe安裝文件

    這篇文章主要介紹了python GUI庫圖形界面開發(fā)之pyinstaller打包python程序?yàn)閑xe安裝文件,需要的朋友可以參考下
    2020-02-02
  • 代碼實(shí)例講解python3的編碼問題

    代碼實(shí)例講解python3的編碼問題

    在本篇內(nèi)容里小編給各位分享了關(guān)于python3的編碼問題以及相關(guān)實(shí)例代碼,有需要的朋友們參考一下。
    2019-07-07
  • PyCharm專業(yè)最新版2019.1安裝步驟(含激活碼)

    PyCharm專業(yè)最新版2019.1安裝步驟(含激活碼)

    這篇文章主要介紹了PyCharm專業(yè)最新版2019.1安裝步驟(含激活碼),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-10-10

最新評論