欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

用Python的線程來解決生產(chǎn)者消費問題的示例

 更新時間:2015年04月02日 17:00:21   作者:Akshar Raaj  
這篇文章主要介紹了用Python的線程來解決生產(chǎn)者消費問題的示例,包括對使用線程中容易出現(xiàn)的一些問題給出了相關解答,需要的朋友可以參考下

我們將使用Python線程來解決Python中的生產(chǎn)者—消費者問題。這個問題完全不像他們在學校中說的那么難。

如果你對生產(chǎn)者—消費者問題有了解,看這篇博客會更有意義。

為什么要關心生產(chǎn)者—消費者問題:

  •     可以幫你更好地理解并發(fā)和不同概念的并發(fā)。
  •     信息隊列中的實現(xiàn)中,一定程度上使用了生產(chǎn)者—消費者問題的概念,而你某些時候必然會用到消息隊列。

當我們在使用線程時,你可以學習以下的線程概念:

  •     Condition:線程中的條件。
  •     wait():在條件實例中可用的wait()。
  •     notify() :在條件實例中可用的notify()。

我假設你已經(jīng)有這些基本概念:線程、競態(tài)條件,以及如何解決靜態(tài)條件(例如使用lock)。否則的話,你建議你去看我上一篇文章basics of Threads。

引用維基百科:

生產(chǎn)者的工作是產(chǎn)生一塊數(shù)據(jù),放到buffer中,如此循環(huán)。與此同時,消費者在消耗這些數(shù)據(jù)(例如從buffer中把它們移除),每次一塊。

這里的關鍵詞是“同時”。所以生產(chǎn)者和消費者是并發(fā)運行的,我們需要對生產(chǎn)者和消費者做線程分離。
 

from threading import Thread
 
class ProducerThread(Thread):
  def run(self):
    pass
 
class ConsumerThread(Thread):
  def run(self):
    pass

再次引用維基百科:

這個為描述了兩個共享固定大小緩沖隊列的進程,即生產(chǎn)者和消費者。

假設我們有一個全局變量,可以被生產(chǎn)者和消費者線程修改。生產(chǎn)者產(chǎn)生數(shù)據(jù)并把它加入到隊列。消費者消耗這些數(shù)據(jù)(例如把它移出)。

queue = []

在剛開始,我們不會設置固定大小的條件,而在實際運行時加入(指下述例子)。

一開始帶bug的程序:

from threading import Thread, Lock
import time
import random
 
queue = []
lock = Lock()
 
class ProducerThread(Thread):
  def run(self):
    nums = range(5) #Will create the list [0, 1, 2, 3, 4]
    global queue
    while True:
      num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4]
      lock.acquire()
      queue.append(num)
      print "Produced", num
      lock.release()
      time.sleep(random.random())
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      lock.acquire()
      if not queue:
        print "Nothing in queue, but consumer will try to consume"
      num = queue.pop(0)
      print "Consumed", num
      lock.release()
      time.sleep(random.random())
 
ProducerThread().start()
ConsumerThread().start()

運行幾次并留意一下結果。如果程序在IndexError異常后并沒有自動結束,用Ctrl+Z結束運行。

樣例輸出:
 

Produced 3
Consumed 3
Produced 4
Consumed 4
Produced 1
Consumed 1
Nothing in queue, but consumer will try to consume
Exception in thread Thread-2:
Traceback (most recent call last):
 File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
  self.run()
 File "producer_consumer.py", line 31, in run
  num = queue.pop(0)
IndexError: pop from empty list

解釋:

  •     我們開始了一個生產(chǎn)者線程(下稱生產(chǎn)者)和一個消費者線程(下稱消費者)。
  •     生產(chǎn)者不停地添加(數(shù)據(jù))到隊列,而消費者不停地消耗。
  •     由于隊列是一個共享變量,我們把它放到lock程序塊內(nèi),以防發(fā)生競態(tài)條件。
  •     在某一時間點,消費者把所有東西消耗完畢而生產(chǎn)者還在掛起(sleep)。消費者嘗試繼續(xù)進行消耗,但此時隊列為空,出現(xiàn)IndexError異常。
  •     在每次運行過程中,在發(fā)生IndexError異常之前,你會看到print語句輸出”Nothing in queue, but consumer will try to consume”,這是你出錯的原因。

我們把這個實現(xiàn)作為錯誤行為(wrong behavior)。

什么是正確行為?

當隊列中沒有任何數(shù)據(jù)的時候,消費者應該停止運行并等待(wait),而不是繼續(xù)嘗試進行消耗。而當生產(chǎn)者在隊列中加入數(shù)據(jù)之后,應該有一個渠道去告訴(notify)消費者。然后消費者可以再次從隊列中進行消耗,而IndexError不再出現(xiàn)。

關于條件

    條件(condition)可以讓一個或多個線程進入wait,直到被其他線程notify。參考:?http://docs.python.org/2/library/threading.html#condition-objects

這就是我們所需要的。我們希望消費者在隊列為空的時候wait,只有在被生產(chǎn)者notify后恢復。生產(chǎn)者只有在往隊列中加入數(shù)據(jù)后進行notify。因此在生產(chǎn)者notify后,可以確保隊列非空,因此消費者消費時不會出現(xiàn)異常。

  •     condition內(nèi)含lock。
  •     condition有acquire()和release()方法,用以調(diào)用內(nèi)部的lock的對應方法。

condition的acquire()和release()方法內(nèi)部調(diào)用了lock的acquire()和release()。所以我們可以用condiction實例取代lock實例,但lock的行為不會改變。
生產(chǎn)者和消費者需要使用同一個condition實例, 保證wait和notify正常工作。

重寫消費者代碼:
 

from threading import Condition
 
condition = Condition()
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      condition.acquire()
      if not queue:
        print "Nothing in queue, consumer is waiting"
        condition.wait()
        print "Producer added something to queue and notified the consumer"
      num = queue.pop(0)
      print "Consumed", num
      condition.release()
      time.sleep(random.random())

重寫生產(chǎn)者代碼:
 

class ProducerThread(Thread):
  def run(self):
    nums = range(5)
    global queue
    while True:
      condition.acquire()
      num = random.choice(nums)
      queue.append(num)
      print "Produced", num
      condition.notify()
      condition.release()
      time.sleep(random.random())

樣例輸出:
 

Produced 3
Consumed 3
Produced 1
Consumed 1
Produced 4
Consumed 4
Produced 3
Consumed 3
Nothing in queue, consumer is waiting
Produced 2
Producer added something to queue and notified the consumer
Consumed 2
Nothing in queue, consumer is waiting
Produced 2
Producer added something to queue and notified the consumer
Consumed 2
Nothing in queue, consumer is waiting
Produced 3
Producer added something to queue and notified the consumer
Consumed 3
Produced 4
Consumed 4
Produced 1
Consumed 1

解釋:

  •     對于消費者,在消費前檢查隊列是否為空。
  •     如果為空,調(diào)用condition實例的wait()方法。
  •     消費者進入wait(),同時釋放所持有的lock。
  •     除非被notify,否則它不會運行。
  •     生產(chǎn)者可以acquire這個lock,因為它已經(jīng)被消費者release。
  •     當調(diào)用了condition的notify()方法后,消費者被喚醒,但喚醒不意味著它可以開始運行。
  •     notify()并不釋放lock,調(diào)用notify()后,lock依然被生產(chǎn)者所持有。
  •     生產(chǎn)者通過condition.release()顯式釋放lock。
  •     消費者再次開始運行,現(xiàn)在它可以得到隊列中的數(shù)據(jù)而不會出現(xiàn)IndexError異常。

為隊列增加大小限制

生產(chǎn)者不能向一個滿隊列繼續(xù)加入數(shù)據(jù)。

它可以用以下方式來實現(xiàn):

  •     在加入數(shù)據(jù)前,生產(chǎn)者檢查隊列是否為滿。
  •     如果不為滿,生產(chǎn)者可以繼續(xù)正常流程。
  •     如果為滿,生產(chǎn)者必須等待,調(diào)用condition實例的wait()。
  •     消費者可以運行。消費者消耗隊列,并產(chǎn)生一個空余位置。
  •     然后消費者notify生產(chǎn)者。
  •     當消費者釋放lock,消費者可以acquire這個lock然后往隊列中加入數(shù)據(jù)。

最終程序如下:

from threading import Thread, Condition
import time
import random
 
queue = []
MAX_NUM = 10
condition = Condition()
 
class ProducerThread(Thread):
  def run(self):
    nums = range(5)
    global queue
    while True:
      condition.acquire()
      if len(queue) == MAX_NUM:
        print "Queue full, producer is waiting"
        condition.wait()
        print "Space in queue, Consumer notified the producer"
      num = random.choice(nums)
      queue.append(num)
      print "Produced", num
      condition.notify()
      condition.release()
      time.sleep(random.random())
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      condition.acquire()
      if not queue:
        print "Nothing in queue, consumer is waiting"
        condition.wait()
        print "Producer added something to queue and notified the consumer"
      num = queue.pop(0)
      print "Consumed", num
      condition.notify()
      condition.release()
      time.sleep(random.random())
 
ProducerThread().start()
ConsumerThread().start()

樣例輸出:
 

Produced 0
Consumed 0
Produced 0
Produced 4
Consumed 0
Consumed 4
Nothing in queue, consumer is waiting
Produced 4
Producer added something to queue and notified the consumer
Consumed 4
Produced 3
Produced 2
Consumed 3

更新:
很多網(wǎng)友建議我在lock和condition下使用Queue來代替使用list。我同意這種做法,但我的目的是展示Condition,wait()和notify()如何工作,所以使用了list。

以下用Queue來更新一下代碼。

Queue封裝了Condition的行為,如wait(),notify(),acquire()。

現(xiàn)在不失為一個好機會讀一下Queue的文檔(http://docs.python.org/2/library/queue.html)。

更新程序:

from threading import Thread
import time
import random
from Queue import Queue
 
queue = Queue(10)
 
class ProducerThread(Thread):
  def run(self):
    nums = range(5)
    global queue
    while True:
      num = random.choice(nums)
      queue.put(num)
      print "Produced", num
      time.sleep(random.random())
 
class ConsumerThread(Thread):
  def run(self):
    global queue
    while True:
      num = queue.get()
      queue.task_done()
      print "Consumed", num
      time.sleep(random.random())
 
ProducerThread().start()
ConsumerThread().start()

解釋:

  •     在原來使用list的位置,改為使用Queue實例(下稱隊列)。
  •     這個隊列有一個condition,它有自己的lock。如果你使用Queue,你不需要為condition和lock而煩惱。
  •     生產(chǎn)者調(diào)用隊列的put方法來插入數(shù)據(jù)。
  •     put()在插入數(shù)據(jù)前有一個獲取lock的邏輯。
  •     同時,put()也會檢查隊列是否已滿。如果已滿,它會在內(nèi)部調(diào)用wait(),生產(chǎn)者開始等待。
  •     消費者使用get方法。
  •     get()從隊列中移出數(shù)據(jù)前會獲取lock。
  •     get()會檢查隊列是否為空,如果為空,消費者進入等待狀態(tài)。
  •     get()和put()都有適當?shù)膎otify()。現(xiàn)在就去看Queue的源碼吧。

相關文章

最新評論