欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Python操作RabbitMQ服務(wù)器消息隊(duì)列的遠(yuǎn)程結(jié)果返回

 更新時(shí)間:2016年06月30日 09:29:19   作者:陳杰斌  
RabbitMQ是一款基于MQ的服務(wù)器,Python可以通過(guò)Pika庫(kù)來(lái)進(jìn)行程序操控,這里我們將來(lái)詳解Python操作RabbitMQ服務(wù)器消息隊(duì)列的遠(yuǎn)程結(jié)果返回:

先說(shuō)一下筆者這里的測(cè)試環(huán)境:Ubuntu14.04 + Python 2.7.4
RabbitMQ服務(wù)器

sudo apt-get install rabbitmq-server

Python使用RabbitMQ需要Pika庫(kù)

sudo pip install pika

遠(yuǎn)程結(jié)果返回
消息發(fā)送端發(fā)送消息出去后沒(méi)有結(jié)果返回。如果只是單純發(fā)送消息,當(dāng)然沒(méi)有問(wèn)題了,但是在實(shí)際中,常常會(huì)需要接收端將收到的消息進(jìn)行處理之后,返回給發(fā)送端。

處理方法描述:發(fā)送端在發(fā)送信息前,產(chǎn)生一個(gè)接收消息的臨時(shí)隊(duì)列,該隊(duì)列用來(lái)接收返回的結(jié)果。其實(shí)在這里接收端、發(fā)送端的概念已經(jīng)比較模糊了,因?yàn)榘l(fā)送端也同樣要接收消息,接收端同樣也要發(fā)送消息,所以這里筆者使用另外的示例來(lái)演示這一過(guò)程。

示例內(nèi)容:假設(shè)有一個(gè)控制中心和一個(gè)計(jì)算節(jié)點(diǎn),控制中心會(huì)將一個(gè)自然數(shù)N發(fā)送給計(jì)算節(jié)點(diǎn),計(jì)算節(jié)點(diǎn)將N值加1后,返回給控制中心。這里用center.py模擬控制中心,compute.py模擬計(jì)算節(jié)點(diǎn)。

compute.py代碼分析

#!/usr/bin/env python
#coding=utf8
import pika
 
#連接rabbitmq服務(wù)器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
#定義隊(duì)列
channel.queue_declare(queue='compute_queue')
print ' [*] Waiting for n'
 
#將n值加1
def increase(n):
  return n + 1
 
#定義接收到消息的處理方法
def request(ch, method, properties, body):
  print " [.] increase(%s)" % (body,)
 
  response = increase(int(body))
 
  #將計(jì)算結(jié)果發(fā)送回控制中心
  ch.basic_publish(exchange='',
           routing_key=properties.reply_to,
           body=str(response))
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(request, queue='compute_queue')
 
channel.start_consuming()

計(jì)算節(jié)點(diǎn)的代碼比較簡(jiǎn)單,值得一提的是,原來(lái)的接收方法都是直接將消息打印出來(lái),這邊進(jìn)行了加一的計(jì)算,并將結(jié)果發(fā)送回控制中心。

center.py代碼分析

#!/usr/bin/env python
#coding=utf8
import pika
 
class Center(object):
  def __init__(self):
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
 
    self.channel = self.connection.channel()
     
    #定義接收返回消息的隊(duì)列
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
 
    self.channel.basic_consume(self.on_response,
                  no_ack=True,
                  queue=self.callback_queue)
 
  #定義接收到返回消息的處理方法
  def on_response(self, ch, method, props, body):
    self.response = body
   
   
  def request(self, n):
    self.response = None
    #發(fā)送計(jì)算請(qǐng)求,并聲明返回隊(duì)列
    self.channel.basic_publish(exchange='',
                  routing_key='compute_queue',
                  properties=pika.BasicProperties(
                     reply_to = self.callback_queue,
                     ),
                  body=str(n))
    #接收返回的數(shù)據(jù)
    while self.response is None:
      self.connection.process_data_events()
    return int(self.response)
 
center = Center()
 
print " [x] Requesting increase(30)"
response = center.request(30)
print " [.] Got %r" % (response,)

上例代碼定義了接收返回?cái)?shù)據(jù)的隊(duì)列和處理方法,并且在發(fā)送請(qǐng)求的時(shí)候?qū)⒃撽?duì)列賦值給reply_to,在計(jì)算節(jié)點(diǎn)代碼中就是通過(guò)這個(gè)參數(shù)來(lái)獲取返回隊(duì)列的。

打開(kāi)兩個(gè)終端,一個(gè)運(yùn)行代碼python compute.py,另外一個(gè)終端運(yùn)行center.py,如果執(zhí)行成功,應(yīng)該就能看到效果了。

筆者在測(cè)試的時(shí)候,出了些小問(wèn)題,就是在center.py發(fā)送消息時(shí)沒(méi)有指明返回隊(duì)列,結(jié)果compute.py那邊在計(jì)算完結(jié)果要發(fā)回?cái)?shù)據(jù)時(shí)報(bào)錯(cuò),提示routing_key不存在,再次運(yùn)行也報(bào)錯(cuò)。用rabbitmqctl list_queues查看隊(duì)列,發(fā)現(xiàn)compute_queue隊(duì)列有1條數(shù)據(jù),每次重新運(yùn)行compute.py的時(shí)候,都會(huì)重新處理這條數(shù)據(jù)。后來(lái)使用/etc/init.d/rabbitmq-server restart重新啟動(dòng)下rabbitmq就ok了。

相互關(guān)聯(lián)編號(hào)correlation id
上一遍演示了遠(yuǎn)程結(jié)果返回的示例,但是有一個(gè)沒(méi)有提到,就是correlation id,這個(gè)是個(gè)什么東東呢?

假設(shè)有多個(gè)計(jì)算節(jié)點(diǎn),控制中心開(kāi)啟多個(gè)線程,往這些計(jì)算節(jié)點(diǎn)發(fā)送數(shù)字,要求計(jì)算結(jié)果并返回,但是控制中心只開(kāi)啟了一個(gè)隊(duì)列,所有線程都是從這個(gè)隊(duì)列里獲取消息,每個(gè)線程如何確定收到的消息就是該線程對(duì)應(yīng)的呢?這個(gè)就是correlation id的用處了。correlation翻譯成中文就是相互關(guān)聯(lián),也表達(dá)了這個(gè)意思。

correlation id運(yùn)行原理:控制中心發(fā)送計(jì)算請(qǐng)求時(shí)設(shè)置correlation id,而后計(jì)算節(jié)點(diǎn)將計(jì)算結(jié)果,連同接收到的correlation id一起返回,這樣控制中心就能通過(guò)correlation id來(lái)標(biāo)識(shí)請(qǐng)求。其實(shí)correlation id也可以理解為請(qǐng)求的唯一標(biāo)識(shí)碼。

示例內(nèi)容:控制中心開(kāi)啟多個(gè)線程,每個(gè)線程都發(fā)起一次計(jì)算請(qǐng)求,通過(guò)correlation id,每個(gè)線程都能準(zhǔn)確收到相應(yīng)的計(jì)算結(jié)果。

compute.py代碼分析

和上面一篇相比,只需修改一個(gè)地方:將計(jì)算結(jié)果發(fā)送回控制中心時(shí),增加參數(shù)correlation_id的設(shè)定,該參數(shù)的值其實(shí)是從控制中心發(fā)送過(guò)來(lái)的,這里只是再次發(fā)送回去。代碼如下:

#!/usr/bin/env python
#coding=utf8
import pika
 
#連接rabbitmq服務(wù)器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
#定義隊(duì)列
channel.queue_declare(queue='compute_queue')
print ' [*] Waiting for n'
 
#將n值加1
def increase(n):
  return n + 1
 
#定義接收到消息的處理方法
def request(ch, method, props, body):
  print " [.] increase(%s)" % (body,)
 
  response = increase(int(body))
 
  #將計(jì)算結(jié)果發(fā)送回控制中心,增加correlation_id的設(shè)定
  ch.basic_publish(exchange='',
           routing_key=props.reply_to,
           properties=pika.BasicProperties(correlation_id = \
                           props.correlation_id),
           body=str(response))
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(request, queue='compute_queue')
 
channel.start_consuming()

center.py代碼分析

控制中心代碼稍微復(fù)雜些,其中比較關(guān)鍵的有三個(gè)地方:

使用python的uuid來(lái)產(chǎn)生唯一的correlation_id。
發(fā)送計(jì)算請(qǐng)求時(shí),設(shè)定參數(shù)correlation_id。
定義一個(gè)字典來(lái)保存返回的數(shù)據(jù),并且鍵值為相應(yīng)線程產(chǎn)生的correlation_id。
代碼如下:

#!/usr/bin/env python
#coding=utf8
import pika, threading, uuid
 
#自定義線程類,繼承threading.Thread
class MyThread(threading.Thread):
  def __init__(self, func, num):
    super(MyThread, self).__init__()
    self.func = func
    self.num = num
 
  def run(self):
    print " [x] Requesting increase(%d)" % self.num
    response = self.func(self.num)
    print " [.] increase(%d)=%d" % (self.num, response)
 
#控制中心類
class Center(object):
  def __init__(self):
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
 
    self.channel = self.connection.channel()
 
    #定義接收返回消息的隊(duì)列
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
 
    self.channel.basic_consume(self.on_response,
                  no_ack=True,
                  queue=self.callback_queue)
 
    #返回的結(jié)果都會(huì)存儲(chǔ)在該字典里
    self.response = {}
 
  #定義接收到返回消息的處理方法
  def on_response(self, ch, method, props, body):
    self.response[props.correlation_id] = body
 
  def request(self, n):
    corr_id = str(uuid.uuid4())
    self.response[corr_id] = None
 
    #發(fā)送計(jì)算請(qǐng)求,并設(shè)定返回隊(duì)列和correlation_id
    self.channel.basic_publish(exchange='',
                  routing_key='compute_queue',
                  properties=pika.BasicProperties(
                     reply_to = self.callback_queue,
                     correlation_id = corr_id,
                     ),
                  body=str(n))
    #接收返回的數(shù)據(jù)
    while self.response[corr_id] is None:
      self.connection.process_data_events()
    return int(self.response[corr_id])
 
center = Center()
#發(fā)起5次計(jì)算請(qǐng)求
nums= [10, 20, 30, 40 ,50]
threads = []
for num in nums:
  threads.append(MyThread(center.request, num))
for thread in threads:
  thread.start()
for thread in threads:
  thread.join()

筆者開(kāi)啟了兩個(gè)終端,來(lái)運(yùn)行compute.py,開(kāi)啟一個(gè)終端來(lái)運(yùn)行center.py,最后結(jié)果輸出截圖如下:

201663092126912.png (280×164)

可以看到雖然獲取的結(jié)果不是順序輸出,但是結(jié)果和源數(shù)據(jù)都是對(duì)應(yīng)的。

這邊示例的做法就是創(chuàng)建一個(gè)隊(duì)列,使用correlation id來(lái)標(biāo)識(shí)每次請(qǐng)求。也有做法可以不使用correlation id,就是每請(qǐng)求一次,就創(chuàng)建一個(gè)臨時(shí)隊(duì)列,不過(guò)這樣太消耗性能了,官方也不推薦這么做。

相關(guān)文章

  • python怎么去除字符串最后的換行符‘\n’

    python怎么去除字符串最后的換行符‘\n’

    這篇文章主要介紹了python去除字符串最后的換行符'\n',本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-04-04
  • Python如何實(shí)現(xiàn)拆分?jǐn)?shù)據(jù)集

    Python如何實(shí)現(xiàn)拆分?jǐn)?shù)據(jù)集

    這篇文章主要介紹了Python如何實(shí)現(xiàn)拆分?jǐn)?shù)據(jù)集問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • python celery分布式任務(wù)隊(duì)列的使用詳解

    python celery分布式任務(wù)隊(duì)列的使用詳解

    這篇文章主要介紹了python celery分布式任務(wù)隊(duì)列的使用詳解,Celery 是一個(gè) 基于python開(kāi)發(fā)的分布式異步消息任務(wù)隊(duì)列,通過(guò)它可以輕松的實(shí)現(xiàn)任務(wù)的異步處理, 如果你的業(yè)務(wù)場(chǎng)景中需要用到異步任務(wù),就可以考慮使用celery,需要的朋友可以參考下
    2019-07-07
  • 詳解python-docx處理Word必備工具

    詳解python-docx處理Word必備工具

    這篇文章主要介紹了python-docx處理Word必備工具,我主要講講自己用到的幾個(gè)內(nèi)容是怎么設(shè)置的,對(duì)python-docx處理Word的相關(guān)知識(shí)感興趣的朋友一起看看吧
    2021-10-10
  • Python3.6+selenium2.53.6自動(dòng)化測(cè)試_讀取excel文件的方法

    Python3.6+selenium2.53.6自動(dòng)化測(cè)試_讀取excel文件的方法

    這篇文章主要介紹了Python3.6+selenium2.53.6自動(dòng)化測(cè)試_讀取excel文件的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-09-09
  • Python區(qū)塊鏈塊的添加教程

    Python區(qū)塊鏈塊的添加教程

    這篇文章主要為大家介紹了Python區(qū)塊鏈塊的添加教程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • python內(nèi)置函數(shù)zip詳解

    python內(nèi)置函數(shù)zip詳解

    這篇文章主要為大家介紹了python內(nèi)置函數(shù)zip,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-01-01
  • Python MySQL 日期時(shí)間格式化作為參數(shù)的操作

    Python MySQL 日期時(shí)間格式化作為參數(shù)的操作

    這篇文章主要介紹了Python MySQL 日期時(shí)間格式化作為參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-03-03
  • MAC中PyCharm設(shè)置python3解釋器

    MAC中PyCharm設(shè)置python3解釋器

    本文給大家分享的是修改MACA中pycharm的默認(rèn)的Python解釋器,由于默認(rèn)解釋器是Python2,使用起來(lái)各種不便,下面給大家講解下如何修改
    2017-12-12
  • Python繪制圓形方法及turtle模塊詳解

    Python繪制圓形方法及turtle模塊詳解

    這篇文章主要給大家介紹了關(guān)于Python繪制圓形方法及turtle模塊詳解的相關(guān)資料,Turtle庫(kù)是Python語(yǔ)言中一個(gè)很流行的繪制圖像的函數(shù)庫(kù),文中介紹的非常詳細(xì),需要的朋友可以參考下
    2023-12-12

最新評(píng)論