欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python+Pika+RabbitMQ環(huán)境部署及實(shí)現(xiàn)工作隊(duì)列的實(shí)例教程

 更新時(shí)間:2016年06月29日 17:59:35   作者:陳杰斌  
RabbitMQ是一個(gè)消息隊(duì)列服務(wù)器,在本文中我們將學(xué)習(xí)到Python+Pika+RabbitMQ環(huán)境部署及實(shí)現(xiàn)工作隊(duì)列的實(shí)例教程,需要的朋友可以參考下

rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,即消息隊(duì)列的意思。前面還有個(gè)rabbit單詞,就是兔子的意思,和python語(yǔ)言叫python一樣,老外還是蠻幽默的。rabbitmq服務(wù)類似于mysql、apache服務(wù),只是提供的功能不一樣。rabbimq是用來(lái)提供發(fā)送消息的服務(wù),可以用在不同的應(yīng)用程序之間進(jìn)行通信。

安裝rabbitmq
先來(lái)安裝下rabbitmq,在ubuntu 12.04下可以直接通過(guò)apt-get安裝:

sudo apt-get install rabbitmq-server

安裝好后,rabbitmq服務(wù)就已經(jīng)啟動(dòng)好了。接下來(lái)看下python編寫Hello World!的實(shí)例。實(shí)例的內(nèi)容就是從send.py發(fā)送“Hello World!”到rabbitmq,receive.py從rabbitmq接收send.py發(fā)送的信息。

2016629175148628.png (288×100)

其中P表示produce,生產(chǎn)者的意思,也可以稱為發(fā)送者,實(shí)例中表現(xiàn)為send.py;C表示consumer,消費(fèi)者的意思,也可以稱為接收者,實(shí)例中表現(xiàn)為receive.py;中間紅色的表示隊(duì)列的意思,實(shí)例中表現(xiàn)為hello隊(duì)列。

python使用rabbitmq服務(wù),可以使用現(xiàn)成的類庫(kù)pika、txAMQP或者py-amqplib,這里選擇了pika。

安裝pika

安裝pika可以使用pip來(lái)進(jìn)行安裝,pip是python的軟件管理包,如果沒(méi)有安裝,可以通過(guò)apt-get安裝

sudo apt-get install python-pip

通過(guò)pip安裝pika:

sudo pip install pika

send.py代碼

連接到rabbitmq服務(wù)器,因?yàn)槭窃诒镜販y(cè)試,所以就用localhost就可以了。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

聲明消息隊(duì)列,消息將在這個(gè)隊(duì)列中進(jìn)行傳遞。如果將消息發(fā)送到不存在的隊(duì)列,rabbitmq將會(huì)自動(dòng)清除這些消息。

channel.queue_declare(queue='hello')

發(fā)送消息到上面聲明的hello隊(duì)列,其中exchange表示交換器,能精確指定消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列,routing_key設(shè)置為隊(duì)列的名稱,body就是發(fā)送的內(nèi)容,具體發(fā)送細(xì)節(jié)暫時(shí)先不關(guān)注。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

關(guān)閉連接

connection.close()

完整代碼

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

先來(lái)執(zhí)行下這個(gè)程序,執(zhí)行成功的話,rabbitmqctl應(yīng)該成功增加了hello隊(duì)列,并且隊(duì)列里應(yīng)該有一條信息,用rabbitmqctl命令來(lái)查看下

rabbitmqctl list_queues

在筆者的電腦上輸出如下信息:

2016629175215597.png (193×71)

確實(shí)有一個(gè)hello隊(duì)列,并且隊(duì)列里有一條信息。接下來(lái)用receive.py來(lái)獲取隊(duì)列里的信息。

receive.py代碼

和send.py的前面兩個(gè)步驟一樣,都是要先連接服務(wù)器,然后聲明消息的隊(duì)列,這里就不再貼同樣代碼了。

接收消息更為復(fù)雜一些,需要定義一個(gè)回調(diào)函數(shù)來(lái)處理,這邊的回調(diào)函數(shù)就是將信息打印出來(lái)。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

告訴rabbitmq使用callback來(lái)接收信息

channel.basic_consume(callback, queue='hello', no_ack=True)

開始接收信息,并進(jìn)入阻塞狀態(tài),隊(duì)列里有信息才會(huì)調(diào)用callback進(jìn)行處理。按ctrl+c退出。

channel.start_consuming()

完整代碼

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

執(zhí)行程序,就能夠接收到隊(duì)列hello里的消息Hello World!,然后打印在屏幕上。換一個(gè)終端,再次執(zhí)行send.py,可以看到receive.py這邊會(huì)再次接收到信息。

工作隊(duì)列示例

1.準(zhǔn)備工作(Preparation)

在實(shí)例程序中,用new_task.py來(lái)模擬任務(wù)分配者, worker.py來(lái)模擬工作者。

修改send.py,從命令行參數(shù)里接收信息,并發(fā)送

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

修改receive.py的回調(diào)函數(shù)。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

這邊先打開兩個(gè)終端,都運(yùn)行worker.py,處于監(jiān)聽(tīng)狀態(tài),這邊就相當(dāng)于兩個(gè)工作者。打開第三個(gè)終端,運(yùn)行new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

觀察worker.py接收到任務(wù),其中一個(gè)工作者接收到3個(gè)任務(wù) :
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

另外一個(gè)工作者接收到2個(gè)任務(wù) :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

從上面來(lái)看,每個(gè)工作者,都會(huì)依次分配到任務(wù)。那么如果一個(gè)工作者,在處理任務(wù)的時(shí)候掛掉,這個(gè)任務(wù)就沒(méi)有完成,應(yīng)當(dāng)交由其他工作者處理。所以應(yīng)當(dāng)有一種機(jī)制,當(dāng)一個(gè)工作者完成任務(wù)時(shí),會(huì)反饋消息。

2.消息確認(rèn)(Message acknowledgment)

消息確認(rèn)就是當(dāng)工作者完成任務(wù)后,會(huì)反饋給rabbitmq。修改worker.py中的回調(diào)函數(shù):

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

這邊停頓5秒,可以方便ctrl+c退出。

去除no_ack=True參數(shù)或者設(shè)置為False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用這個(gè)代碼運(yùn)行,即使其中一個(gè)工作者ctrl+c退出后,正在執(zhí)行的任務(wù)也不會(huì)丟失,rabbitmq會(huì)將任務(wù)重新分配給其他工作者。

3.消息持久化存儲(chǔ)(Message durability)

雖然有了消息反饋機(jī)制,但是如果rabbitmq自身掛掉的話,那么任務(wù)還是會(huì)丟失。所以需要將任務(wù)持久化存儲(chǔ)起來(lái)。聲明持久化存儲(chǔ):

channel.queue_declare(queue='hello', durable=True)

但是這個(gè)程序會(huì)執(zhí)行錯(cuò)誤,因?yàn)閔ello這個(gè)隊(duì)列已經(jīng)存在,并且是非持久化的,rabbitmq不允許使用不同的參數(shù)來(lái)重新定義存在的隊(duì)列。重新定義一個(gè)隊(duì)列:

channel.queue_declare(queue='task_queue', durable=True)

在發(fā)送任務(wù)的時(shí)候,用delivery_mode=2來(lái)標(biāo)記任務(wù)為持久化存儲(chǔ):

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平調(diào)度(Fair dispatch)

上面實(shí)例中,雖然每個(gè)工作者是依次分配到任務(wù),但是每個(gè)任務(wù)不一定一樣??赡苡械娜蝿?wù)比較重,執(zhí)行時(shí)間比較久;有的任務(wù)比較輕,執(zhí)行時(shí)間比較短。如果能公平調(diào)度就最好了,使用basic_qos設(shè)置prefetch_count=1,使得rabbitmq不會(huì)在同一時(shí)間給工作者分配多個(gè)任務(wù),即只有工作者完成任務(wù)之后,才會(huì)再次接收到任務(wù)。

channel.basic_qos(prefetch_count=1)

new_task.py完整代碼

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代碼

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()

相關(guān)文章

  • python文件讀寫操作與linux shell變量命令交互執(zhí)行的方法

    python文件讀寫操作與linux shell變量命令交互執(zhí)行的方法

    這篇文章主要介紹了python文件讀寫操作與linux shell變量命令交互執(zhí)行的方法,涉及對(duì)文件操作及Linux shell交互的技巧,需要的朋友可以參考下
    2015-01-01
  • python實(shí)現(xiàn)從web抓取文檔的方法

    python實(shí)現(xiàn)從web抓取文檔的方法

    這篇文章主要介紹了python實(shí)現(xiàn)從web抓取文檔的方法,以抓取人人網(wǎng)頁(yè)面為例講述了完整的web文檔抓取方法,需要的朋友可以參考下
    2014-09-09
  • OpenCV-Python直方圖均衡化實(shí)現(xiàn)圖像去霧

    OpenCV-Python直方圖均衡化實(shí)現(xiàn)圖像去霧

    直方圖均衡化可以達(dá)到增強(qiáng)圖像顯示效果的目的。最常用的比如去霧。本文就來(lái)實(shí)現(xiàn)直方圖均衡化實(shí)現(xiàn)圖像去霧,感興趣的可以了解一下
    2021-06-06
  • Python綁定方法與非綁定方法詳解

    Python綁定方法與非綁定方法詳解

    這篇文章主要為大家詳細(xì) 介紹了Python綁定方法與非綁定方法的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-08-08
  • Python3.4解釋器用法簡(jiǎn)單示例

    Python3.4解釋器用法簡(jiǎn)單示例

    這篇文章主要介紹了Python3.4解釋器用法,結(jié)合簡(jiǎn)單實(shí)例形式分析了Windows與Linux系統(tǒng)環(huán)境中Python3.4解釋器的簡(jiǎn)單使用方法,需要的朋友可以參考下
    2019-03-03
  • Python實(shí)現(xiàn)二分查找與bisect模塊詳解

    Python實(shí)現(xiàn)二分查找與bisect模塊詳解

    二分查找又叫折半查找,二分查找應(yīng)該屬于減治技術(shù)的成功應(yīng)用。python標(biāo)準(zhǔn)庫(kù)中還有一個(gè)灰常給力的模塊,那就是bisect。這個(gè)庫(kù)接受有序的序列,內(nèi)部實(shí)現(xiàn)就是二分。下面這篇文章就詳細(xì)介紹了Python如何實(shí)現(xiàn)二分查找與bisect模塊,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。
    2017-01-01
  • Python Tkinter GUI編程實(shí)現(xiàn)Frame切換

    Python Tkinter GUI編程實(shí)現(xiàn)Frame切換

    本文主要介紹了Python Tkinter GUI編程實(shí)現(xiàn)Frame切換,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • YOLOv5車牌識(shí)別實(shí)戰(zhàn)教程(四)模型優(yōu)化與部署

    YOLOv5車牌識(shí)別實(shí)戰(zhàn)教程(四)模型優(yōu)化與部署

    這篇文章主要介紹了YOLOv5車牌識(shí)別實(shí)戰(zhàn)教程(四)模型優(yōu)化與部署,在這個(gè)教程中,我們將一步步教你如何使用YOLOv5進(jìn)行車牌識(shí)別,幫助你快速掌握YOLOv5車牌識(shí)別技能,需要的朋友可以參考下
    2023-04-04
  • Python實(shí)現(xiàn)圖算法、堆操作和并查集代碼實(shí)例

    Python實(shí)現(xiàn)圖算法、堆操作和并查集代碼實(shí)例

    這篇文章主要介紹了Python實(shí)現(xiàn)圖算法、堆操作和并查集代碼實(shí)例,圖算法、堆操作和并查集是計(jì)算機(jī)科學(xué)中常用的數(shù)據(jù)結(jié)構(gòu)和算法,它們?cè)诮鉀Q各種實(shí)際問(wèn)題中具有重要的應(yīng)用價(jià)值,需要的朋友可以參考下
    2023-08-08
  • Python中使用Chaco繪圖庫(kù)

    Python中使用Chaco繪圖庫(kù)

    這篇文章主要介紹了Python中使用Chaco繪圖庫(kù),Chaco是一個(gè)2D的繪圖庫(kù),如果你安裝了Python(x,y)的話,可以在pythonxy的安裝目錄下的找到Chaco的demo程序,Chaco提供了類似Matlab和pylab的繪圖方式,我們稱之為面向腳本的繪圖方式
    2023-11-11

最新評(píng)論