python隊(duì)列通信:rabbitMQ的使用(實(shí)例講解)
(一)、前言
為什么引入消息隊(duì)列?
1.程序解耦
2.提升性能
3.降低多業(yè)務(wù)邏輯復(fù)雜度
(二)、python操作rabbit mq
rabbitmq配置安裝基本使用參見(jiàn)上節(jié)文章,不再?gòu)?fù)述。
若想使用python操作rabbitmq,需安裝pika模塊,直接pip安裝:
pip install pika
1.最簡(jiǎn)單的rabbitmq producer端與consumer端對(duì)話:
producer:
#Author :ywq import pika auth=pika.PlainCredentials('ywq','qwe') #save auth indo connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #connect to rabbit channel = connection.channel() #create channel channel.queue_declare(queue='hello') #declare queue #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') #the body is the msg content print(" [x] Sent 'Hello World!'") connection.close()
consumer:
#Author :ywq import pika auth=pika.PlainCredentials('ywq','qwe') #auth info connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #connect to rabbit channel = connection.channel() #create channel channel.queue_declare(queue='hello') #decalre queue def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
消息傳遞消費(fèi)過(guò)程中,可以在rabbit web管理頁(yè)面實(shí)時(shí)查看隊(duì)列消息信息。
2.持久化的消息隊(duì)列,避免宕機(jī)等意外情況造成消息隊(duì)列丟失。
consumer端無(wú)需改變,在producer端代碼內(nèi)加上兩個(gè)屬性,分別使消息持久化、隊(duì)列持久化,只選其一還是會(huì)出現(xiàn)消息丟失,必須同時(shí)開(kāi)啟:
delivery_mode=2 #make msg persisdent durable=True
屬性插入位置見(jiàn)如下代碼(producer端):
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.queue_declare(queue='test1',durable=True) #durable=Ture, make queue persistent msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='', routing_key='test1', body=msg, properties=pika.BasicProperties( delivery_mode=2 #make msg persisdent ) ) print('Send done:',msg) connection.close()
3.公平分發(fā)
在多consumer的情況下,默認(rèn)rabbit是輪詢發(fā)送消息的,但有的consumer消費(fèi)速度快,有的消費(fèi)速度慢,為了資源使用更平衡,引入ack確認(rèn)機(jī)制。consumer消費(fèi)完消息后會(huì)給rabbit發(fā)送ack,一旦未ack的消息數(shù)量超過(guò)指定允許的數(shù)量,則不再往該consumer發(fā)送,改為發(fā)送給其他consumer。
producer端代碼不用改變,需要給consumer端代碼插入兩個(gè)屬性:
channel.basic_qos(prefetch_count= *) #define the max non_ack_count channel.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbitmq
屬性插入位置見(jiàn)如下代碼(consumer端):
#Author :ywq import pika,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.queue_declare(queue='test2',durable=True) def callback(chann,deliver,properties,body): print('Recv:',body) time.sleep(5) chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit channel.basic_qos(prefetch_count=1) ''' 注意,no_ack=False 注意,這里的no_ack類型僅僅是告訴rabbit該消費(fèi)者隊(duì)列是否返回ack,若要返回ack,需要在callback內(nèi)定義 prefetch_count=1,未ack的msg數(shù)量超過(guò)1個(gè),則此consumer不再接受msg,此配置需寫(xiě)在channel.basic_consume上方,否則會(huì)造成non_ack情況出現(xiàn)。 ''' channel.basic_consume( callback, queue='test2' ) channel.start_consuming()
三、消息發(fā)布/訂閱
上方的幾種模式都是producer端發(fā)送一次,則consumer端接收一次,能不能實(shí)現(xiàn)一個(gè)producer發(fā)送,多個(gè)關(guān)聯(lián)的consumer同時(shí)接收呢?of course,rabbit支持消息發(fā)布訂閱,共支持三種模式,通過(guò)組件exchange轉(zhuǎn)發(fā)器,實(shí)現(xiàn)3種模式:
fanout: 所有bind到此exchange的queue都可以接收消息,類似廣播。
direct: 通過(guò)routingKey和exchange決定的哪個(gè)唯一的queue可以接收消息,推送給綁定了該queue的consumer,類似組播。
topic:所有符合routingKey(此時(shí)可以是一個(gè)表達(dá)式)的routingKey所bind的queue可以接收消息,類似前綴列表匹配路由。
1.fanout
publish端(producer):
#Author :ywq import pika,sys,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='hello', exchange_type='fanout' ) msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time() channel.basic_publish( exchange='hello', routing_key='', body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) print('send done') connection.close()
subscribe端(consumer):
#Author :ywq import pika auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare( exchange='hello', exchange_type='fanout' ) random_num=channel.queue_declare(exclusive=True) #隨機(jī)與rabbit建立一個(gè)queue,comsumer斷開(kāi)后,該queue立即刪除釋放 queue_name=random_num.method.queue channel.basic_qos(prefetch_count=1) channel.queue_bind( queue=queue_name, exchange='hello' ) def callback(chann,deliver,properties,body): print('Recv:',body) chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit channel.basic_consume( callback, queue=queue_name, ) channel.start_consuming()
實(shí)現(xiàn)producer一次發(fā)送,多個(gè)關(guān)聯(lián)consumer接收。
使用exchange模式時(shí):
1.producer端不再申明queue,直接申明exchange
2.consumer端仍需綁定隊(duì)列并指定exchange來(lái)接收message
3.consumer最好創(chuàng)建隨機(jī)queue,使用完后立即釋放。
隨機(jī)隊(duì)列名在web下可以檢測(cè)到:
2.direct
使用exchange同時(shí)consumer有選擇性的接收消息。隊(duì)列綁定關(guān)鍵字,producer將數(shù)據(jù)根據(jù)關(guān)鍵字發(fā)送到消息exchange,exchange根據(jù) 關(guān)鍵字 判定應(yīng)該將數(shù)據(jù)發(fā)送至指定隊(duì)列,consumer相應(yīng)接收。即在fanout基礎(chǔ)上增加了routing key.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='direct_log', exchange_type='direct', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='direct_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='direct_log', exchange_type='direct' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='direct_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[level:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
同時(shí)開(kāi)啟多個(gè)consumer,其中兩個(gè)接收notice,兩個(gè)接收warning,運(yùn)行效果如下:
3.topic
相較于direct,topic能實(shí)現(xiàn)模糊匹配式工作方式(在consumer端指定匹配方式),只要routing key包含指定的關(guān)鍵字,則將該msg發(fā)往綁定的queue上。
rabbitmq通配符規(guī)則:
符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“”匹配一個(gè)詞。因此“abc.#”能夠匹配到“abc.m.n”,但是“abc.*‘' 只會(huì)匹配到“abc.m”。‘.'號(hào)為分割符。使用通配符匹配時(shí)必須使用‘.'號(hào)分割。
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='topic_log', exchange_type='topic', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='topic_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='topic_log', exchange_type='topic' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='topic_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[type:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
運(yùn)行效果:
rabbitmq三種publish/subscribe模型簡(jiǎn)單介紹完畢。
以上這篇python隊(duì)列通信:rabbitMQ的使用(實(shí)例講解)就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Python實(shí)現(xiàn)RabbitMQ6種消息模型的示例代碼
- Python隊(duì)列RabbitMQ 使用方法實(shí)例記錄
- Python操作rabbitMQ的示例代碼
- python實(shí)現(xiàn)RabbitMQ的消息隊(duì)列的示例代碼
- python RabbitMQ 使用詳細(xì)介紹(小結(jié))
- Python RabbitMQ消息隊(duì)列實(shí)現(xiàn)rpc
- rabbitmq(中間消息代理)在python中的使用詳解
- 利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列
- Python rabbitMQ如何實(shí)現(xiàn)生產(chǎn)消費(fèi)者模式
相關(guān)文章
python使用MkDocs自動(dòng)生成文檔的操作方法
python代碼注釋風(fēng)格有很多,比較主流的有 reStructuredText風(fēng)格、numpy風(fēng)格、Google風(fēng)格,自動(dòng)生成文檔的工具也有很多,常見(jiàn)的有:Pydocs,Sphinx和MkDocs,本文給大家介紹了python使用MkDocs自動(dòng)生成文檔的操作方法,需要的朋友可以參考下2024-06-06python數(shù)據(jù)結(jié)構(gòu)之二叉樹(shù)的統(tǒng)計(jì)與轉(zhuǎn)換實(shí)例
這篇文章主要介紹了python數(shù)據(jù)結(jié)構(gòu)之二叉樹(shù)的統(tǒng)計(jì)與轉(zhuǎn)換實(shí)例,例如統(tǒng)計(jì)二叉樹(shù)的葉子、分支節(jié)點(diǎn),以及二叉樹(shù)的左右兩樹(shù)互換等,需要的朋友可以參考下2014-04-04用python3讀取python2的pickle數(shù)據(jù)方式
今天小編就為大家分享一篇用python3讀取python2的pickle數(shù)據(jù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12python實(shí)現(xiàn)sm2和sm4國(guó)密(國(guó)家商用密碼)算法的示例
這篇文章主要介紹了python實(shí)現(xiàn)sm2和sm4國(guó)密(國(guó)家商用密碼)算法的示例,幫助大家使用python加密文件,感興趣的朋友可以了解下2020-09-09Python實(shí)現(xiàn)按學(xué)生年齡排序的實(shí)際問(wèn)題詳解
這篇文章主要給大家介紹了關(guān)于Python實(shí)現(xiàn)按學(xué)生年齡排序?qū)嶋H問(wèn)題的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-08-08解決Python報(bào)錯(cuò)問(wèn)題[SSL:?SSLV3_ALERT_HANDSHAKE_FAILURE]
這篇文章主要介紹了解決Python報(bào)錯(cuò)問(wèn)題[SSL:?SSLV3_ALERT_HANDSHAKE_FAILURE],具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07使用PySpark實(shí)現(xiàn)數(shù)據(jù)清洗與JSON格式轉(zhuǎn)換的實(shí)踐詳解
在大數(shù)據(jù)處理中,PySpark?提供了強(qiáng)大的工具來(lái)處理海量數(shù)據(jù),特別是在數(shù)據(jù)清洗和轉(zhuǎn)換方面,本文將介紹如何使用?PySpark?進(jìn)行數(shù)據(jù)清洗,并將數(shù)據(jù)格式轉(zhuǎn)換為?JSON?格式的實(shí)踐,感興趣的可以了解下2023-12-12Python內(nèi)置函數(shù)ord()的實(shí)現(xiàn)示例
ord()函數(shù)是用于返回字符的Unicode碼點(diǎn),適用于處理文本和國(guó)際化應(yīng)用,它只能處理單個(gè)字符,超過(guò)一字符或非字符串類型會(huì)引發(fā)TypeError,示例代碼展示了如何使用ord()進(jìn)行字符轉(zhuǎn)換和比較2024-09-09