Python操作RabbitMQ服務(wù)器實(shí)現(xiàn)消息隊(duì)列的路由功能
Python使用Pika庫(安裝:sudo pip install pika)可以操作RabbitMQ消息隊(duì)列服務(wù)器(安裝:sudo apt-get install rabbitmq-server),這里我們來看一下MQ相關(guān)的路由功能。
路由鍵的實(shí)現(xiàn)
比如有一個(gè)需要給所有接收端發(fā)送消息的場景,但是如果需要自由定制,有的消息發(fā)給其中一些接收端,有些消息發(fā)送給另外一些接收端,要怎么辦呢?這種情況下就要用到路由鍵了。
路由鍵的工作原理:每個(gè)接收端的消息隊(duì)列在綁定交換機(jī)的時(shí)候,可以設(shè)定相應(yīng)的路由鍵。發(fā)送端通過交換機(jī)發(fā)送信息時(shí),可以指明路由鍵 ,交換機(jī)會(huì)根據(jù)路由鍵把消息發(fā)送到相應(yīng)的消息隊(duì)列,這樣接收端就能接收到消息了。
這邊繼上一篇,還是用send.py和receive.py來模擬實(shí)現(xiàn)路由鍵的功能。send.py表示發(fā)送端,receive.py表示接收端。實(shí)例的功能就是將info、warning、error三種級別的信息發(fā)送到不同的接收端。
send.py代碼分析
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #定義交換機(jī),設(shè)置類型為direct channel.exchange_declare(exchange='messages', type='direct') #定義三個(gè)路由鍵 routings = ['info', 'warning', 'error'] #將消息依次發(fā)送到交換機(jī),并設(shè)置路由鍵 for routing in routings: message = '%s message.' % routing channel.basic_publish(exchange='messages', routing_key=routing, body=message) print message connection.close()
receive.py代碼分析
#!/usr/bin/env python #coding=utf8 import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #定義交換機(jī),設(shè)置類型為direct channel.exchange_declare(exchange='messages', type='direct') #從命令行獲取路由鍵參數(shù),如果沒有,則設(shè)置為info routings = sys.argv[1:] if not routings: routings = ['info'] #生成臨時(shí)隊(duì)列,并綁定到交換機(jī)上,設(shè)置路由鍵 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for routing in routings: channel.queue_bind(exchange='messages', queue=queue_name, routing_key=routing) def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
打開兩個(gè)終端,一個(gè)運(yùn)行代碼python receive.py info warning,表示只接收info和warning的消息。另外一個(gè)終端運(yùn)行send.py,可以觀察到接收終端只接收到了info和warning的消息。如果打開多個(gè)終端運(yùn)行receive.py,并傳入不同的路由鍵參數(shù),可以看到更明顯的效果。
當(dāng)接收端正在運(yùn)行時(shí),可以使用rabbitmqctl list_bindings來查看綁定情況。
路由鍵模糊匹配
路由鍵模糊匹配,就是可以使用正則表達(dá)式,和常用的正則表示式不同,這里的話“#”表示所有、全部的意思;“*”只匹配到一個(gè)詞??赐晔纠湍苊靼琢恕?/p>
這邊繼上面的例子,還是用send.py和receive.py來實(shí)現(xiàn)路由鍵模糊匹配的功能。send.py表示發(fā)送端,receive.py表示接收端。實(shí)例的功能大概是這樣:比如你有個(gè)知心好朋友,不管開心、傷心、工作上的還是生活上的事情都可以和她說;還有一些朋友可以分享開心的事情;還有一些朋友,你可以把不開心的事情和她說。
send.py代碼分析
因?yàn)橐M(jìn)行路由鍵模糊匹配,所以交換機(jī)的類型要設(shè)置為topic,設(shè)置為topic,就可以使用#,*的匹配符號了。
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #定義交換機(jī),設(shè)置類型為topic channel.exchange_declare(exchange='messages', type='topic') #定義路由鍵 routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life'] #將消息依次發(fā)送到交換機(jī),并設(shè)定路由鍵 for routing in routings: message = '%s message.' % routing channel.basic_publish(exchange='messages', routing_key=routing, body=message) print message connection.close()
上例中定義了四種類型的消息,容易理解,就不解釋了,然后依次發(fā)送出去。
receive.py代碼分析
同樣,交換機(jī)的類型要設(shè)定為topic就可以了。從命令行接收參數(shù)的功能稍微調(diào)整了一下,就是沒有參數(shù)時(shí)報(bào)錯(cuò)退出。
#!/usr/bin/env python #coding=utf8 import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #定義交換機(jī),設(shè)置類型為topic channel.exchange_declare(exchange='messages', type='topic') #從命令行獲取路由參數(shù),如果沒有,則報(bào)錯(cuò)退出 routings = sys.argv[1:] if not routings: print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[0],) exit() #生成臨時(shí)隊(duì)列,并綁定到交換機(jī)上,設(shè)置路由鍵 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for routing in routings: channel.queue_bind(exchange='messages', queue=queue_name, routing_key=routing) def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
打開四個(gè)終端,一個(gè)運(yùn)行如下,表示任何事情都可以和她說:
python receive.py "#"
另外一個(gè)終端 運(yùn)行如下,表示可以和她分享開心的事:
python receive.py "happy.*"
第三個(gè)運(yùn)行如下,表示工作上的事情可以和她分享:
python receive.py "*.work"
最后一個(gè)運(yùn)行python send.py。結(jié)果不難想象出來,就不貼出來了。
- Python進(jìn)程間通信Queue消息隊(duì)列用法分析
- 利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列
- Python中線程的MQ消息隊(duì)列實(shí)現(xiàn)以及消息隊(duì)列的優(yōu)點(diǎn)解析
- 詳解Python操作RabbitMQ服務(wù)器消息隊(duì)列的遠(yuǎn)程結(jié)果返回
- Python的消息隊(duì)列包SnakeMQ使用初探
- 利用Python操作消息隊(duì)列RabbitMQ的方法教程
- Python RabbitMQ消息隊(duì)列實(shí)現(xiàn)rpc
- python實(shí)現(xiàn)RabbitMQ的消息隊(duì)列的示例代碼
- Python多進(jìn)程庫multiprocessing中進(jìn)程池Pool類的使用詳解
- Python 多進(jìn)程并發(fā)操作中進(jìn)程池Pool的實(shí)例
- Python多進(jìn)程池 multiprocessing Pool用法示例
- Python高級編程之消息隊(duì)列(Queue)與進(jìn)程池(Pool)實(shí)例詳解
相關(guān)文章
Python+Matplotlib繪制發(fā)散條形圖的示例代碼
發(fā)散條形圖(Diverging Bar)是一種用于顯示數(shù)據(jù)分布的圖表,可以幫助我們比較不同類別或分組的數(shù)據(jù)的差異和相對性,本文介紹了Matplotlib繪制發(fā)散條形圖的函數(shù)源碼,需要的可以參考一下2023-06-06Python編程scoketServer實(shí)現(xiàn)多線程同步實(shí)例代碼
這篇文章主要介紹了Python編程scoketServer實(shí)現(xiàn)多線程同步實(shí)例代碼,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01jupyter notebook 中輸出pyecharts圖實(shí)例
這篇文章主要介紹了jupyter notebook 中輸出pyecharts圖實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06python從網(wǎng)絡(luò)讀取圖片并直接進(jìn)行處理的方法
這篇文章主要介紹了python從網(wǎng)絡(luò)讀取圖片并直接進(jìn)行處理的方法,涉及cStringIO模塊模擬本地文件的使用技巧,需要的朋友可以參考下2015-05-05python優(yōu)化數(shù)據(jù)預(yù)處理方法Pandas pipe詳解
在本文中,我們將重點(diǎn)討論一個(gè)將多個(gè)預(yù)處理操作組織成單個(gè)操作的特定函數(shù):pipe。我將通過示例方式來展示如何使用它,讓我們從數(shù)據(jù)創(chuàng)建數(shù)據(jù)幀開始吧2021-11-1110行Python代碼計(jì)算汽車數(shù)量的實(shí)現(xiàn)方法
這篇文章主要介紹了10行Python代碼計(jì)算汽車數(shù)量的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10