詳解Python操作RabbitMQ服務(wù)器消息隊(duì)列的遠(yuǎn)程結(jié)果返回
先說(shuō)一下筆者這里的測(cè)試環(huán)境:Ubuntu14.04 + Python 2.7.4
RabbitMQ服務(wù)器
sudo apt-get install rabbitmq-server
Python使用RabbitMQ需要Pika庫(kù)
sudo pip install pika
遠(yuǎn)程結(jié)果返回
消息發(fā)送端發(fā)送消息出去后沒(méi)有結(jié)果返回。如果只是單純發(fā)送消息,當(dāng)然沒(méi)有問(wèn)題了,但是在實(shí)際中,常常會(huì)需要接收端將收到的消息進(jìn)行處理之后,返回給發(fā)送端。
處理方法描述:發(fā)送端在發(fā)送信息前,產(chǎn)生一個(gè)接收消息的臨時(shí)隊(duì)列,該隊(duì)列用來(lái)接收返回的結(jié)果。其實(shí)在這里接收端、發(fā)送端的概念已經(jīng)比較模糊了,因?yàn)榘l(fā)送端也同樣要接收消息,接收端同樣也要發(fā)送消息,所以這里筆者使用另外的示例來(lái)演示這一過(guò)程。
示例內(nèi)容:假設(shè)有一個(gè)控制中心和一個(gè)計(jì)算節(jié)點(diǎn),控制中心會(huì)將一個(gè)自然數(shù)N發(fā)送給計(jì)算節(jié)點(diǎn),計(jì)算節(jié)點(diǎn)將N值加1后,返回給控制中心。這里用center.py模擬控制中心,compute.py模擬計(jì)算節(jié)點(diǎn)。
compute.py代碼分析
#!/usr/bin/env python #coding=utf8 import pika #連接rabbitmq服務(wù)器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定義隊(duì)列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #將n值加1 def increase(n): return n + 1 #定義接收到消息的處理方法 def request(ch, method, properties, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #將計(jì)算結(jié)果發(fā)送回控制中心 ch.basic_publish(exchange='', routing_key=properties.reply_to, body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
計(jì)算節(jié)點(diǎn)的代碼比較簡(jiǎn)單,值得一提的是,原來(lái)的接收方法都是直接將消息打印出來(lái),這邊進(jìn)行了加一的計(jì)算,并將結(jié)果發(fā)送回控制中心。
center.py代碼分析
#!/usr/bin/env python #coding=utf8 import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定義接收返回消息的隊(duì)列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定義接收到返回消息的處理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #發(fā)送計(jì)算請(qǐng)求,并聲明返回隊(duì)列 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的數(shù)據(jù) while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() print " [x] Requesting increase(30)" response = center.request(30) print " [.] Got %r" % (response,)
上例代碼定義了接收返回?cái)?shù)據(jù)的隊(duì)列和處理方法,并且在發(fā)送請(qǐng)求的時(shí)候?qū)⒃撽?duì)列賦值給reply_to,在計(jì)算節(jié)點(diǎn)代碼中就是通過(guò)這個(gè)參數(shù)來(lái)獲取返回隊(duì)列的。
打開(kāi)兩個(gè)終端,一個(gè)運(yùn)行代碼python compute.py,另外一個(gè)終端運(yùn)行center.py,如果執(zhí)行成功,應(yīng)該就能看到效果了。
筆者在測(cè)試的時(shí)候,出了些小問(wèn)題,就是在center.py發(fā)送消息時(shí)沒(méi)有指明返回隊(duì)列,結(jié)果compute.py那邊在計(jì)算完結(jié)果要發(fā)回?cái)?shù)據(jù)時(shí)報(bào)錯(cuò),提示routing_key不存在,再次運(yùn)行也報(bào)錯(cuò)。用rabbitmqctl list_queues查看隊(duì)列,發(fā)現(xiàn)compute_queue隊(duì)列有1條數(shù)據(jù),每次重新運(yùn)行compute.py的時(shí)候,都會(huì)重新處理這條數(shù)據(jù)。后來(lái)使用/etc/init.d/rabbitmq-server restart重新啟動(dòng)下rabbitmq就ok了。
相互關(guān)聯(lián)編號(hào)correlation id
上一遍演示了遠(yuǎn)程結(jié)果返回的示例,但是有一個(gè)沒(méi)有提到,就是correlation id,這個(gè)是個(gè)什么東東呢?
假設(shè)有多個(gè)計(jì)算節(jié)點(diǎn),控制中心開(kāi)啟多個(gè)線程,往這些計(jì)算節(jié)點(diǎn)發(fā)送數(shù)字,要求計(jì)算結(jié)果并返回,但是控制中心只開(kāi)啟了一個(gè)隊(duì)列,所有線程都是從這個(gè)隊(duì)列里獲取消息,每個(gè)線程如何確定收到的消息就是該線程對(duì)應(yīng)的呢?這個(gè)就是correlation id的用處了。correlation翻譯成中文就是相互關(guān)聯(lián),也表達(dá)了這個(gè)意思。
correlation id運(yùn)行原理:控制中心發(fā)送計(jì)算請(qǐng)求時(shí)設(shè)置correlation id,而后計(jì)算節(jié)點(diǎn)將計(jì)算結(jié)果,連同接收到的correlation id一起返回,這樣控制中心就能通過(guò)correlation id來(lái)標(biāo)識(shí)請(qǐng)求。其實(shí)correlation id也可以理解為請(qǐng)求的唯一標(biāo)識(shí)碼。
示例內(nèi)容:控制中心開(kāi)啟多個(gè)線程,每個(gè)線程都發(fā)起一次計(jì)算請(qǐng)求,通過(guò)correlation id,每個(gè)線程都能準(zhǔn)確收到相應(yīng)的計(jì)算結(jié)果。
compute.py代碼分析
和上面一篇相比,只需修改一個(gè)地方:將計(jì)算結(jié)果發(fā)送回控制中心時(shí),增加參數(shù)correlation_id的設(shè)定,該參數(shù)的值其實(shí)是從控制中心發(fā)送過(guò)來(lái)的,這里只是再次發(fā)送回去。代碼如下:
#!/usr/bin/env python #coding=utf8 import pika #連接rabbitmq服務(wù)器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定義隊(duì)列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #將n值加1 def increase(n): return n + 1 #定義接收到消息的處理方法 def request(ch, method, props, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #將計(jì)算結(jié)果發(fā)送回控制中心,增加correlation_id的設(shè)定 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
center.py代碼分析
控制中心代碼稍微復(fù)雜些,其中比較關(guān)鍵的有三個(gè)地方:
使用python的uuid來(lái)產(chǎn)生唯一的correlation_id。
發(fā)送計(jì)算請(qǐng)求時(shí),設(shè)定參數(shù)correlation_id。
定義一個(gè)字典來(lái)保存返回的數(shù)據(jù),并且鍵值為相應(yīng)線程產(chǎn)生的correlation_id。
代碼如下:
#!/usr/bin/env python #coding=utf8 import pika, threading, uuid #自定義線程類,繼承threading.Thread class MyThread(threading.Thread): def __init__(self, func, num): super(MyThread, self).__init__() self.func = func self.num = num def run(self): print " [x] Requesting increase(%d)" % self.num response = self.func(self.num) print " [.] increase(%d)=%d" % (self.num, response) #控制中心類 class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定義接收返回消息的隊(duì)列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #返回的結(jié)果都會(huì)存儲(chǔ)在該字典里 self.response = {} #定義接收到返回消息的處理方法 def on_response(self, ch, method, props, body): self.response[props.correlation_id] = body def request(self, n): corr_id = str(uuid.uuid4()) self.response[corr_id] = None #發(fā)送計(jì)算請(qǐng)求,并設(shè)定返回隊(duì)列和correlation_id self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = corr_id, ), body=str(n)) #接收返回的數(shù)據(jù) while self.response[corr_id] is None: self.connection.process_data_events() return int(self.response[corr_id]) center = Center() #發(fā)起5次計(jì)算請(qǐng)求 nums= [10, 20, 30, 40 ,50] threads = [] for num in nums: threads.append(MyThread(center.request, num)) for thread in threads: thread.start() for thread in threads: thread.join()
筆者開(kāi)啟了兩個(gè)終端,來(lái)運(yùn)行compute.py,開(kāi)啟一個(gè)終端來(lái)運(yùn)行center.py,最后結(jié)果輸出截圖如下:
可以看到雖然獲取的結(jié)果不是順序輸出,但是結(jié)果和源數(shù)據(jù)都是對(duì)應(yīng)的。
這邊示例的做法就是創(chuàng)建一個(gè)隊(duì)列,使用correlation id來(lái)標(biāo)識(shí)每次請(qǐng)求。也有做法可以不使用correlation id,就是每請(qǐng)求一次,就創(chuàng)建一個(gè)臨時(shí)隊(duì)列,不過(guò)這樣太消耗性能了,官方也不推薦這么做。
- Python高級(jí)編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool)實(shí)例詳解
- Python進(jìn)程間通信Queue消息隊(duì)列用法分析
- python實(shí)現(xiàn)RabbitMQ的消息隊(duì)列的示例代碼
- Python RabbitMQ消息隊(duì)列實(shí)現(xiàn)rpc
- Python的消息隊(duì)列包SnakeMQ使用初探
- Python中線程的MQ消息隊(duì)列實(shí)現(xiàn)以及消息隊(duì)列的優(yōu)點(diǎn)解析
- 利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列
- 基于python實(shí)現(xiàn)操作redis及消息隊(duì)列
相關(guān)文章
Python如何實(shí)現(xiàn)拆分?jǐn)?shù)據(jù)集
這篇文章主要介紹了Python如何實(shí)現(xiàn)拆分?jǐn)?shù)據(jù)集問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09python celery分布式任務(wù)隊(duì)列的使用詳解
這篇文章主要介紹了python celery分布式任務(wù)隊(duì)列的使用詳解,Celery 是一個(gè) 基于python開(kāi)發(fā)的分布式異步消息任務(wù)隊(duì)列,通過(guò)它可以輕松的實(shí)現(xiàn)任務(wù)的異步處理, 如果你的業(yè)務(wù)場(chǎng)景中需要用到異步任務(wù),就可以考慮使用celery,需要的朋友可以參考下2019-07-07Python3.6+selenium2.53.6自動(dòng)化測(cè)試_讀取excel文件的方法
這篇文章主要介紹了Python3.6+selenium2.53.6自動(dòng)化測(cè)試_讀取excel文件的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-09-09Python MySQL 日期時(shí)間格式化作為參數(shù)的操作
這篇文章主要介紹了Python MySQL 日期時(shí)間格式化作為參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03