python對RabbitMQ的簡單入門使用教程
(一)RabbitMQ的簡介
RabbitMq 是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理中間件。消息隊列是一種應(yīng)用程序?qū)?yīng)用程序的通行方式,應(yīng)用程序通過寫消息,將消息傳遞于隊列,由另一應(yīng)用程序讀取 完成通信。而作為中間件的 RabbitMq 無疑是目前最流行的消息隊列之一。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
RabbitMQ總體架構(gòu)
PS:生產(chǎn)者和消費者可能在不同的程序或主機(jī)中,當(dāng)然也有可能一個程序有可能既是生產(chǎn)者,也是消費者。
RabbitMq 應(yīng)用場景廣泛:
1.系統(tǒng)的高可用:日常生活當(dāng)中各種商城秒殺,高流量,高并發(fā)的場景。當(dāng)服務(wù)器接收到如此大量請求處理業(yè)務(wù)時,有宕機(jī)的風(fēng)險。某些業(yè)務(wù)可能極其復(fù)雜,但這部分不是高時效性,不需要立即反饋給用戶,我們可以將這部分處理請求拋給隊列,讓程序后置去處理,減輕服務(wù)器在高并發(fā)場景下的壓力。
2.分布式系統(tǒng),集成系統(tǒng),子系統(tǒng)之間的對接,以及架構(gòu)設(shè)計中常常需要考慮消息隊列的應(yīng)用。
(二)RabbitMQ的安裝
apt-get update apt-get install erlang apt-get install rabbitmq-server #啟動rabbitmq: service rabbitmq-server start #停止rabbitmq: service rabbitmq-server stop #重啟rabbitmq: service rabbitmq-server restart #啟動rabbitmq插件:rabbitmq-plugins enable rabbitmq_management
啟用rabbitmq_management插件后就可以登錄后臺管理頁面了,瀏覽器輸入ip:15672
自帶的密碼和用戶名都是guest,但是只能本機(jī)登錄
所以下面我們添加新用戶,和自定義權(quán)限
#添加新用戶 rabbitmqctl add_user 用戶名 密碼 #給指定用戶添加管理員權(quán)限 rabbitmqctl set_user_tags 用戶名 administrator 給用戶添加權(quán)限 rabbitmqctl set_permissions -p / 用戶名 ".*" ".*" ".*"
在web頁面輸入用戶名,和密碼
(三)python操作RabbitMQ
python中使用pika操作RabbitMQ
pip install pika #皮卡皮卡,哈哈
(四)RabbitMQ簡單模式
上代碼
# coding=utf-8 ### 生產(chǎn)者 import pika import time user_info = pika.PlainCredentials('root', 'root')#用戶名和密碼 connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))#連接服務(wù)器上的RabbitMQ服務(wù) # 創(chuàng)建一個channel channel = connection.channel() # 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句 channel.queue_declare(queue='hello') for i in range(0, 100): channel.basic_publish(exchange='',#當(dāng)前是一個簡單模式,所以這里設(shè)置為空字符串就可以了 routing_key='hello',# 指定消息要發(fā)送到哪個queue body='{}'.format(i)# 指定要發(fā)送的消息 ) time.sleep(1) # 關(guān)閉連接 # connection.close()
PS:RabbitMQ中所有的消息都要先通過交換機(jī),空字符串表示使用默認(rèn)的交換機(jī)
# coding=utf-8 ### 消費者 import pika user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,生產(chǎn)者和消費者都做這一步的好處是 # 這樣生產(chǎn)者和消費者就沒有必要的先后啟動順序了 channel.queue_declare(queue='hello') # 回調(diào)函數(shù) def callback(ch, method, properties, body): print('消費者收到:{}'.format(body)) # channel: 包含channel的一切屬性和方法 # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key # properties: basic_publish 通過 properties 傳入的參數(shù) # body: basic_publish發(fā)送的消息 channel.basic_consume(queue='hello', # 接收指定queue的消息 auto_ack=True, # 指定為True,表示消息接收到后自動給消息發(fā)送方回復(fù)確認(rèn),已收到消息 on_message_callback=callback # 設(shè)置收到消息的回調(diào)函數(shù) ) print('Waiting for messages. To exit press CTRL+C') # 一直處于等待接收消息的狀態(tài),如果沒收到消息就一直處于阻塞狀態(tài),收到消息就調(diào)用上面的回調(diào)函數(shù) channel.start_consuming()
對于上面的這種模式,有一下兩個不好的地方:
一個是在我們的消費者還沒開始消費完隊列里的消息,如果這時rabbitmq服務(wù)掛了,那么消息隊列里的消息將會全部丟失,解決方法是在聲明隊列時,聲明隊列為可持久化存儲隊列,并且在生產(chǎn)者在將消息插入到消息隊列時,設(shè)置消息持久化存儲,具體如下
# coding=utf-8 ### 生產(chǎn)者 import pika import time user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) # 創(chuàng)建一個channel channel = connection.channel() # 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句 channel.queue_declare(queue='durable_queue',durable=True) #PS:這里不同種隊列不允許名字相同 for i in range(0, 100): channel.basic_publish(exchange='', routing_key='durable_queue', body='{}'.format(i), properties=pika.BasicProperties(delivery_mode=2) ) # 關(guān)閉連接 # connection.close()
消費者與上面的消費者沒有什么不同,具體的就是消費聲明的隊列,也要是可持久化的隊列,還有就是,即使在生產(chǎn)者插入消息時,設(shè)置當(dāng)前消息持久化存儲(properties=pika.BasicProperties(delivery_mode=2)),并不能百分百保證消息真的被持久化,因為RabbitMQ掛掉的時候它可能還保存在緩存中,沒來得及同步到磁盤中
在生產(chǎn)者插入消息后,立刻停止rabbitmq,并重新啟動,其實我們在web管理頁面也可看到未被消費的信息,當(dāng)然在啟動消費者后也成功接收到了消息
上面說的第二點不好就是,如果在消費者獲取到隊列里的消息后,在回調(diào)函數(shù)的處理過程中,消費者突然出錯或程序崩潰等異常,那么就會造成這條消息并未被實際正常的處理掉。為了解決這個問題,我們只需在消費者basic_consume(auto_ack=False),并在回調(diào)函數(shù)中設(shè)置手動應(yīng)答即可ch.basic_ack(delivery_tag=method.delivery_tag),具體如下
# coding=utf-8 ### 消費者 import pika import time user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,生產(chǎn)者和消費者都做這一步的好處是 # 這樣生產(chǎn)者和消費者就沒有必要的先后啟動順序了 channel.queue_declare(queue='queue') # 回調(diào)函數(shù) def callback(ch, method, properties, body): time.sleep(5) ch.basic_ack(delivery_tag=method.delivery_tag) print('消費者收到:{}'.format(body.decode('utf-8'))) # channel: 包含channel的一切屬性和方法 # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key # properties: basic_publish 通過 properties 傳入的參數(shù) # body: basic_publish發(fā)送的消息 channel.basic_consume(queue='queue', # 接收指定queue的消息 auto_ack=False, # 指定為False,表示取消自動應(yīng)答,交由回調(diào)函數(shù)手動應(yīng)答 on_message_callback=callback # 設(shè)置收到消息的回調(diào)函數(shù) ) # 應(yīng)答的本質(zhì)是告訴消息隊列可以將這條消息銷毀了 print('Waiting for messages. To exit press CTRL+C') # 一直處于等待接收消息的狀態(tài),如果沒收到消息就一直處于阻塞狀態(tài),收到消息就調(diào)用上面的回調(diào)函數(shù) channel.start_consuming()
這里只需要配置消費者,生產(chǎn)者并不要修改
還有就是在上的使用方式在,都是一個生產(chǎn)者和一個消費者,還有一種情況就是,一個生產(chǎn)者和多個消費者,即多個消費者同時監(jiān)聽一個消息隊列,這時候隊列里的消息就是輪詢分發(fā)(即如果消息隊列里有100條信息,如果有2個消費者,那么每個就會收到50條信息),但是在某些情況下,不同的消費者處理任務(wù)的能力是不同的,這時還按照輪詢的方式分發(fā)消息并不是很合理,那么只需要再配合手動應(yīng)答的方式,設(shè)置消費者接收的消息沒有處理完,隊列就不要給我放送新的消息即可,具體配置方式如下:
# coding=utf-8 ### 消費者 import pika import time user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,生產(chǎn)者和消費者都做這一步的好處是 # 這樣生產(chǎn)者和消費者就沒有必要的先后啟動順序了 channel.queue_declare(queue='queue') # 回調(diào)函數(shù) def callback(ch, method, properties, body): time.sleep(0)#通過設(shè)置休眠時間來模擬不同消費者的處理時間 ch.basic_ack(delivery_tag=method.delivery_tag) print('消費者收到:{}'.format(body.decode('utf-8'))) # prefetch_count表示接收的消息數(shù)量,當(dāng)我接收的消息沒有處理完(用basic_ack標(biāo)記消息已處理完畢)之前不會再接收新的消息了 channel.basic_qos(prefetch_count=1) # 還有就是這個設(shè)置必須在basic_consume之上,否則不生效 channel.basic_consume(queue='queue', # 接收指定queue的消息 auto_ack=False, # 指定為False,表示取消自動應(yīng)答,交由回調(diào)函數(shù)手動應(yīng)答 on_message_callback=callback # 設(shè)置收到消息的回調(diào)函數(shù) ) # 應(yīng)答的本質(zhì)是告訴消息隊列可以將這條消息銷毀了 print('Waiting for messages. To exit press CTRL+C') # 一直處于等待接收消息的狀態(tài),如果沒收到消息就一直處于阻塞狀態(tài),收到消息就調(diào)用上面的回調(diào)函數(shù) channel.start_consuming()
PS:這種情況必須關(guān)閉自動應(yīng)答ack,改成手動應(yīng)答。使用basicQos(perfetch=1)限制每次只發(fā)送不超過1條消息到同一個消費者,消費者必須手動反饋告知隊列,才會發(fā)送下一個
(五)RabbitMQ發(fā)布訂閱模式
發(fā)布訂閱會將消息發(fā)送給所有的訂閱者,而消息隊列中的數(shù)據(jù)被消費一次便消失。所以,RabbitMQ實現(xiàn)發(fā)布和訂閱時,會為每一個訂閱者創(chuàng)建一個隊列,而發(fā)布者發(fā)布消息時,會將消息放置在所有相關(guān)隊列中
這個模式中會引入交換機(jī)的概念,其實在RabbitMQ中,所有的生產(chǎn)者都不會直接把消息發(fā)送到隊列中,甚至生產(chǎn)者都不知道消息在發(fā)出后有沒有發(fā)送到queue中,事實上,生產(chǎn)者只能將消息發(fā)送給交換機(jī),由交換機(jī)來決定發(fā)送到哪個隊列中。
交換機(jī)的一端用來從生產(chǎn)者中接收消息,另一端用來發(fā)送消息到隊列,交換機(jī)的類型規(guī)定了怎么處理接收到的消息,發(fā)布訂閱模式使用到的交換機(jī)類型為 fanout ,這種交換機(jī)類型非常簡單,就是將接收到的消息廣播給已知的(即綁定到此交換機(jī)的)所有消費者。
當(dāng)然,如果不想使用特定的交換機(jī),可以使用 exchange=’’ 表示使用默認(rèn)的交換機(jī),默認(rèn)的交換機(jī)會將消息發(fā)送到 routing_key 指定的queue,可以參考簡單模式。
上代碼:
#生產(chǎn)者 import pika user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 創(chuàng)建一個指定名稱的交換機(jī),并指定類型為fanout,用于將接收到的消息廣播到所有queue中 channel.exchange_declare(exchange='交換機(jī)', exchange_type='fanout') # 將消息發(fā)送給指定的交換機(jī),在fanout類型中,routing_key=''表示不用發(fā)送到指定queue中, # 而是將發(fā)送到綁定到此交換機(jī)的所有queue channel.basic_publish(exchange='交換機(jī)', routing_key='', body='這是一條測試消息')
#消費者 import pika user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() channel.exchange_declare(exchange='交換機(jī)', exchange_type='fanout') # 使用RabbitMQ給自己生成一個專有的queue result = channel.queue_declare(queue='333') # result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # 這里如果設(shè)置exclusive=True參數(shù),那么該隊列就是一個只有隊列,在消費者結(jié)束后,該專有隊列也會自動清除,如果queue=''沒有設(shè)置名字的話,那么就會自動生成一個 # 不會重復(fù)的隊列名 # 將queue綁定到指定交換機(jī) channel.queue_bind(exchange='交換機(jī)', queue=queue_name) print(' [*] Waiting for message.') def callback(ch, method, properties, body): print("消費者收到:{}".format(body.decode('utf-8'))) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
該模式與簡單模式的還有一個區(qū)別就是,這里的消息隊列都是由消費者聲明的,所以如果是生產(chǎn)者先啟動,并將消息發(fā)給交換機(jī)的畫,這里的消息就會丟失,所以我們也可以在消費者端聲明隊列并綁定交換機(jī)(不能是專有隊列),所以仔細(xì)想想,其實這所謂的發(fā)布訂閱模式并沒有說什么了不起,它不過是讓交換機(jī)同時推送多條消息給綁定的隊列,我們當(dāng)然也可以在簡單模式的基礎(chǔ)上多進(jìn)行幾次basic_publish發(fā)送消息到指定的隊列。當(dāng)然我們這樣做的話,可能就沒辦法做到由交換機(jī)的同時發(fā)送了,效率可能也沒有一次basic_publish的高
(六)RabbitMQ RPC模式
下面實現(xiàn)由rpc遠(yuǎn)程調(diào)用加減運算
客戶端
import pika import uuid import json class RPC(object): def __init__(self): self.call_id = None self.response = None user_info = pika.PlainCredentials('root', 'root') self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) self.channel = self.connection.channel() # 創(chuàng)建一個此客戶端專用的queue,用于接收服務(wù)端發(fā)過來的消息 result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): # 判斷接收到的response是否屬于對應(yīng)request if self.call_id == props.correlation_id: self.response = json.loads(body.decode('utf-8')).get('result') def call(self, func, param): self.response = None self.call_id = str(uuid.uuid4()) # 為該消息指定uuid,類似于請求id self.channel.queue_declare(queue='rpc_queue') self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 將消息發(fā)送到該queue properties=pika.BasicProperties( reply_to=self.callback_queue, # 從該queue中取消息 correlation_id=self.call_id, # 為此次消息指定uuid ), body=json.dumps( { 'func': func, 'param': {'a': param[0], 'b': param[1]} } ) ) self.connection.process_data_events(time_limit=3)# 與start_consuming()相似,可以設(shè)置超時參數(shù) return self.response rpc = RPC() print("發(fā)送消息到消費者,等待返回結(jié)果") response = rpc.call(func='del', param=(1, 2)) print("收到來自消費者返回的結(jié)果:{}".format(response))
服務(wù)端
import pika import json user_info = pika.PlainCredentials('root', 'root') connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info)) channel = connection.channel() # 指定接收消息的queue channel.queue_declare(queue='rpc_queue') def add_number(a, b): return a + b def del_num(a, b): return a - b execute_map = { 'add': add_number, 'del': del_num } def on_request(ch, method, props, body): body = json.loads(body.decode('utf-8')) func = body.get('func') param = body.get('param') result = execute_map.get(func)(param.get('a'), param.get('b')) print('進(jìn)行{}運算,并將結(jié)果返回個消費者'.format(func)) ch.basic_publish(exchange='', # 使用默認(rèn)交換機(jī) routing_key=props.reply_to, # response發(fā)送到該queue properties=pika.BasicProperties( correlation_id=props.correlation_id), # 使用correlation_id讓此response與請求消息對應(yīng)起來 body=json.dumps({'result': result})) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) # 從rpc_queue中取消息,然后使用on_request進(jìn)行處理 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()
(七)說點啥
對于rabbitmq的模式還有Routing模式和Topics模式等,這里就不復(fù)述了,其實pika對于RabbitMQ的使用還有很多細(xì)節(jié)和參數(shù)值得深究。這篇博客也就是簡單的記錄下我對pika操作raabbitmq過程和簡單的理解
參考鏈接:
https://www.cnblogs.com/guyuyun/p/14970592.html
https://blog.csdn.net/wohu1104/category_9023593.html
(八)結(jié)語
到此這篇關(guān)于python對RabbitMQ的簡單入門使用的文章就介紹到這了,更多相關(guān)python RabbitMQ使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
用sqlalchemy構(gòu)建Django連接池的實例
今天小編就為大家分享一篇用sqlalchemy構(gòu)建Django連接池的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08python rolling regression. 使用 Python 實現(xiàn)滾動回歸操作
這篇文章主要介紹了python rolling regression. 使用 Python 實現(xiàn)滾動回歸操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-06-06Effective Python bytes 與 str 的區(qū)別
這篇文章主要介紹了Effective Python bytes 與 str 的區(qū)別,Python 有兩種類型可以表示字符序列,下面圍繞Python bytes 與 str 的相關(guān)資料展開內(nèi)容,需要的朋友可以參考一下2021-11-11詳解Python數(shù)據(jù)可視化編程 - 詞云生成并保存(jieba+WordCloud)
這篇文章主要介紹了Python數(shù)據(jù)可視化編程 - 詞云生成并保存(jieba+WordCloud),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03Python-apply(lambda x: )的使用及說明
這篇文章主要介紹了Python-apply(lambda x: )的使用及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02簡要講解Python編程中線程的創(chuàng)建與鎖的使用
這篇文章主要介紹了簡要講解Python編程中線程的創(chuàng)建與鎖的使用,Python中雖然有GIL的存在,但依然是能夠創(chuàng)建多個線程來交替使用的,需要的朋友可以參考下2016-02-02Pycharm及python安裝詳細(xì)步驟及PyCharm配置整理(推薦)
這篇文章主要介紹了Pycharm及python安裝詳細(xì)步驟以及PyCharm配置整理,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04