欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python中線程的MQ消息隊(duì)列實(shí)現(xiàn)以及消息隊(duì)列的優(yōu)點(diǎn)解析

 更新時(shí)間:2016年06月29日 17:14:07   作者:bravezhe  
消息隊(duì)列(MQ,Message Queue)在消息數(shù)據(jù)傳輸中的保存作用為數(shù)據(jù)通信提供了保障和實(shí)時(shí)處理上的便利,這里我們就來看一下Python中線程的MQ消息隊(duì)列實(shí)現(xiàn)以及消息隊(duì)列的優(yōu)點(diǎn)解析

“消息隊(duì)列”是在消息的傳輸過程中保存消息的容器。消息隊(duì)列管理器在將消息從它的源中繼到它的目標(biāo)時(shí)充當(dāng)中間人。隊(duì)列的主要目的是提供路由并保證消息的傳遞;如果發(fā)送消息時(shí)接收者不可用,消息隊(duì)列會(huì)保留消息,直到可以成功地傳遞它。相信對(duì)任何架構(gòu)或應(yīng)用來說,消息隊(duì)列都是一個(gè)至關(guān)重要的組件,下面是十個(gè)理由:

Python的消息隊(duì)列示例:

1.threading+Queue實(shí)現(xiàn)線程隊(duì)列

#!/usr/bin/env python
 
import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
 """沒打印一個(gè)數(shù)字等待1秒,并發(fā)打印10個(gè)數(shù)字需要多少秒?"""
 def __init__(self, queue):
  threading.Thread.__init__(self)
  self.queue = queue
 
 def run(self):
  whileTrue:
   #消費(fèi)者端,從隊(duì)列中獲取num
   num = self.queue.get()
   print"i'm num %s"%(num)
   time.sleep(1)
   #在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)
   self.queue.task_done()
 
start = time.time()
def main():
 #產(chǎn)生一個(gè) threads pool, 并把消息傳遞給thread函數(shù)進(jìn)行處理,這里開啟10個(gè)并發(fā)
 for i in range(10):
  t = ThreadNum(queue)
  t.setDaemon(True)
  t.start()
  
 #往隊(duì)列中填錯(cuò)數(shù)據(jù) 
 for num in range(10):
   queue.put(num)
 #wait on the queue until everything has been processed
 queue.join()
 
main()
print"Elapsed Time: %s" % (time.time() - start)

運(yùn)行結(jié)果:

i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896

解讀:
具體工作步驟描述如下:
1,創(chuàng)建一個(gè) Queue.Queue() 的實(shí)例,然后使用數(shù)據(jù)對(duì)它進(jìn)行填充。
2,將經(jīng)過填充數(shù)據(jù)的實(shí)例傳遞給線程類,后者是通過繼承 threading.Thread 的方式創(chuàng)建的。
3,生成守護(hù)線程池。
4,每次從隊(duì)列中取出一個(gè)項(xiàng)目,并使用該線程中的數(shù)據(jù)和 run 方法以執(zhí)行相應(yīng)的工作。
5,在完成這項(xiàng)工作之后,使用 queue.task_done() 函數(shù)向任務(wù)已經(jīng)完成的隊(duì)列發(fā)送一個(gè)信號(hào)。
6,對(duì)隊(duì)列執(zhí)行 join 操作,實(shí)際上意味著等到隊(duì)列為空,再退出主程序。
在使用這個(gè)模式時(shí)需要注意一點(diǎn):通過將守護(hù)線程設(shè)置為 true,程序運(yùn)行完自動(dòng)退出。好處是在退出之前,可以對(duì)隊(duì)列執(zhí)行 join 操作、或者等到隊(duì)列為空。


2.多個(gè)隊(duì)列
所謂多個(gè)隊(duì)列,一個(gè)隊(duì)列的輸出可以作為另一個(gè)隊(duì)列的輸入!

#!/usr/bin/env python
import Queue
import threading
import time
 
queue = Queue.Queue()
out_queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  """bkeep"""
  def __init__(self, queue, out_queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #從隊(duì)列中取消息
      num = self.queue.get()
      bkeep = num
      
      #將bkeep放入隊(duì)列中
      self.out_queue.put(bkeep)
 
      #signals to queue job is done
      self.queue.task_done()
 
class PrintLove(threading.Thread):
  """Threaded Url Grab"""
  def __init__(self, out_queue):
    threading.Thread.__init__(self)
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #從隊(duì)列中獲取消息并賦值給bkeep
      bkeep = self.out_queue.get()  
      keke = "I love " + str(bkeep)
      print keke,
      print self.getName()
      time.sleep(1)
 
      #signals to queue job is done
      self.out_queue.task_done()
 
start = time.time()
def main():
  #populate queue with data
  for num in range(10):
    queue.put(num)
    
  #spawn a pool of threads, and pass them queue instance
  for i in range(5):
    t = ThreadNum(queue, out_queue)
    t.setDaemon(True)
    t.start()
 
 
  for i in range(5):
    pl = PrintLove(out_queue)
    pl.setDaemon(True)
    pl.start()
 
  #wait on the queue until everything has been processed
  queue.join()
  out_queue.join()
 
main()
print"Elapsed Time: %s" % (time.time() - start)

運(yùn)行結(jié)果:

I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098

 
解讀:
ThreadNum 類工作流程
定義隊(duì)列--->繼承threading---->初始化queue---->定義run函數(shù)--->get queue中的數(shù)據(jù)---->處理數(shù)據(jù)---->put數(shù)據(jù)到另外一個(gè)queue-->發(fā)信號(hào)告訴queue該條處理完畢
 
main函數(shù)工作流程:
--->往自定義queue中扔數(shù)據(jù)
--->for循環(huán)確定啟動(dòng)的線程數(shù)---->實(shí)例化ThreadNum類---->啟動(dòng)線程并設(shè)置守護(hù)
--->for循環(huán)確定啟動(dòng)的線程數(shù)---->實(shí)例化PrintLove類--->啟動(dòng)線程并設(shè)置為守護(hù)
--->等待queue中的消息處理完畢后執(zhí)行join。即退出主程序。

了解了MQ的大概實(shí)現(xiàn)以后,我們來總結(jié)一下消息隊(duì)列的優(yōu)點(diǎn)
1. 解耦

在項(xiàng)目啟動(dòng)之初來預(yù)測(cè)將來項(xiàng)目會(huì)碰到什么需求,是極其困難的。消息隊(duì)列在處理過程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2. 冗余

有時(shí)在處理數(shù)據(jù)的時(shí)候處理過程會(huì)失敗。除非數(shù)據(jù)被持久化,否則將永遠(yuǎn)丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。在被許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理過程明確的指出該消息已經(jīng)被處理完畢,確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

3. 擴(kuò)展性

因?yàn)橄㈥?duì)列解耦了你的處理過程,所以增大消息入隊(duì)和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡(jiǎn)單。

4. 靈活性 & 峰值處理能力

當(dāng)你的應(yīng)用上了Hacker News的首頁,你將發(fā)現(xiàn)訪問流量攀升到一個(gè)不同尋常的水平。在訪問量劇增的情況下,你的應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為 以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住增長(zhǎng)的訪問壓力,而不是因?yàn)槌鲐?fù)荷的請(qǐng)求而完全崩潰。 請(qǐng)查看我們關(guān)于峰值處理能力的博客文章了解更多此方面的信息。

5. 可恢復(fù)性

當(dāng)體系的一部分組件失效,不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。而這種允許重試或者延后處理請(qǐng)求的能力通常是造就一個(gè)略感不便的用戶和一個(gè)沮喪透頂?shù)挠脩糁g的區(qū)別。

6. 送達(dá)保證

消息隊(duì)列提供的冗余機(jī)制保證了消息能被實(shí)際的處理,只要一個(gè)進(jìn)程讀取了該隊(duì)列即可。在此基礎(chǔ)上,IronMQ提供了一個(gè)"只送達(dá)一次"保證。無論有多少進(jìn) 程在從隊(duì)列中領(lǐng)取數(shù)據(jù),每一個(gè)消息只能被處理一次。這之所以成為可能,是因?yàn)楂@取一個(gè)消息只是"預(yù)定"了這個(gè)消息,暫時(shí)把它移出了隊(duì)列。除非客戶端明確的 表示已經(jīng)處理完了這個(gè)消息,否則這個(gè)消息會(huì)被放回隊(duì)列中去,在一段可配置的時(shí)間之后可再次被處理。

7.排序保證

在許多情況下,數(shù)據(jù)處理的順序都很重要。消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理。IronMO保證消息漿糊通過FIFO(先進(jìn)先出)的順序來處理,因此消息在隊(duì)列中的位置就是從隊(duì)列中檢索他們的位置。

8.緩沖

在任何重要的系統(tǒng)中,都會(huì)有需要不同的處理時(shí)間的元素。例如,加載一張圖片比應(yīng)用過濾器花費(fèi)更少的時(shí)間。消息隊(duì)列通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行--寫入隊(duì)列的處理會(huì)盡可能的快速,而不受從隊(duì)列讀的預(yù)備處理的約束。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。

9. 理解數(shù)據(jù)流

在一個(gè)分布式系統(tǒng)里,要得到一個(gè)關(guān)于用戶操作會(huì)用多長(zhǎng)時(shí)間及其原因的總體印象,是個(gè)巨大的挑戰(zhàn)。消息系列通過消息被處理的頻率,來方便的輔助確定那些表現(xiàn)不佳的處理過程或領(lǐng)域,這些地方的數(shù)據(jù)流都不夠優(yōu)化。

10. 異步通信

很多時(shí)候,你不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許你把一個(gè)消息放入隊(duì)列,但并不立即處理它。你想向隊(duì)列中放入多少消息就放多少,然后在你樂意的時(shí)候再去處理它們。

相關(guān)文章

  • Tensorflow tf.dynamic_partition矩陣拆分示例(Python3)

    Tensorflow tf.dynamic_partition矩陣拆分示例(Python3)

    今天小編就為大家分享一篇Tensorflow tf.dynamic_partition矩陣拆分示例(Python3) ,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-02-02
  • PyCharm控制臺(tái)堆棧亂碼問題解決方案

    PyCharm控制臺(tái)堆棧亂碼問題解決方案

    PyCharm環(huán)境都已經(jīng)配置成了UTF-8編碼,控制臺(tái)打印中文也不會(huì)出現(xiàn)亂碼,但報(bào)錯(cuò)堆棧信息中如果有中文會(huì)出現(xiàn)中文亂碼,遇到這樣的問題如何解決呢,下面小編給大家?guī)砹薖yCharm控制臺(tái)堆棧亂碼問題解決方案,感興趣的朋友一起看看吧
    2023-12-12
  • Python 爬取攜程所有機(jī)票的實(shí)例代碼

    Python 爬取攜程所有機(jī)票的實(shí)例代碼

    這篇文章主要介紹了Python 爬取攜程所有機(jī)票功能,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2018-06-06
  • win10下opencv-python特定版本手動(dòng)安裝與pip自動(dòng)安裝教程

    win10下opencv-python特定版本手動(dòng)安裝與pip自動(dòng)安裝教程

    這篇文章主要介紹了win10下opencv-python特定版本手動(dòng)安裝與pip自動(dòng)安裝教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-03-03
  • Python應(yīng)用實(shí)現(xiàn)雙指數(shù)函數(shù)及擬合代碼實(shí)例

    Python應(yīng)用實(shí)現(xiàn)雙指數(shù)函數(shù)及擬合代碼實(shí)例

    這篇文章主要介紹了Python應(yīng)用實(shí)現(xiàn)雙指數(shù)函數(shù)及擬合代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Python3.7 新特性之dataclass裝飾器

    Python3.7 新特性之dataclass裝飾器

    Python 3.7中一個(gè)令人興奮的新特性是 data classes 。這篇文章主要介紹了Python3.7 新特性之dataclass裝飾器,需要的朋友可以參考下
    2019-05-05
  • python爬蟲爬取淘寶商品信息(selenum+phontomjs)

    python爬蟲爬取淘寶商品信息(selenum+phontomjs)

    這篇文章主要為大家詳細(xì)介紹了python爬蟲爬取淘寶商品信息,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-02-02
  • Python求離散序列導(dǎo)數(shù)的示例

    Python求離散序列導(dǎo)數(shù)的示例

    今天小編就為大家分享一篇Python求離散序列導(dǎo)數(shù)的示例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-07-07
  • Python高級(jí)過濾器之filter函數(shù)詳解

    Python高級(jí)過濾器之filter函數(shù)詳解

    在Python中,filter()是一個(gè)非常有用的內(nèi)置函數(shù),它能夠根據(jù)指定的函數(shù)來篩選出可迭代對(duì)象中滿足條件的元素,本文將從入門到精通,全面介紹filter()函數(shù)的用法和相關(guān)知識(shí)點(diǎn)
    2023-08-08
  • Python使用multiprocessing如何實(shí)現(xiàn)多進(jìn)程

    Python使用multiprocessing如何實(shí)現(xiàn)多進(jìn)程

    這篇文章主要介紹了Python使用multiprocessing如何實(shí)現(xiàn)多進(jìn)程問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02

最新評(píng)論