python操作RabbitMq的三種工作模式
一、簡介:
RabbitMq 是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理中間件。消息隊列是一種應(yīng)用程序?qū)?yīng)用程序的通行方式,應(yīng)用程序通過寫消息,將消息傳遞于隊列,由另一應(yīng)用程序讀取 完成通信。而作為中間件的 RabbitMq 無疑是目前最流行的消息隊列之一。
RabbitMq 應(yīng)用場景廣泛:
- 系統(tǒng)的高可用:日常生活當(dāng)中各種商城秒殺,高流量,高并發(fā)的場景。當(dāng)服務(wù)器接收到如此大量請求處理業(yè)務(wù)時,有宕機(jī)的風(fēng)險。某些業(yè)務(wù)可能極其復(fù)雜,但這部分不是高時效性,不需要立即反饋給用戶,我們可以將這部分處理請求拋給隊列,讓程序后置去處理,減輕服務(wù)器在高并發(fā)場景下的壓力。
- 分布式系統(tǒng),集成系統(tǒng),子系統(tǒng)之間的對接,以及架構(gòu)設(shè)計中常常需要考慮消息隊列的應(yīng)用。
二、RabbitMq 生產(chǎn)和消費(fèi)
生產(chǎn)者(producter):隊列消息的產(chǎn)生者,負(fù)責(zé)生產(chǎn)消息,并將消息傳入隊列
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼 # 虛擬隊列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創(chuàng)建 result = channel.queue_declare(queue = 'python-test') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向隊列插入數(shù)值 routing_key是隊列名 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print(message) connection.close()
消費(fèi)者(consumer):隊列消息的接收者,負(fù)責(zé) 接收并處理 消息隊列中的消息
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 申明消息隊列,消息在這個隊列傳遞,如果不存在,則創(chuàng)建隊列 channel.queue_declare(queue = 'python-test', durable = False) # 定義一個回調(diào)函數(shù)來處理消息隊列中的消息,這里是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) # 告訴rabbitmq,用callback來接收消息 channel.basic_consume('python-test',callback) # 開始接收信息,并進(jìn)入阻塞狀態(tài),隊列里有信息才會調(diào)用callback進(jìn)行處理 channel.start_consuming()
三、RabbitMq 持久化
MQ默認(rèn)建立的是臨時 queue 和 exchange,如果不聲明持久化,一旦 rabbitmq 掛掉,queue、exchange 將會全部丟失。所以我們一般在創(chuàng)建 queue 或者 exchange 的時候會聲明 持久化。
1.queue 聲明持久化
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表消息隊列持久化存儲,F(xiàn)alse 非持久化存儲 result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 聲明持久化
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建.durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲 channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一個非持久化的 queue 或 exchange ,執(zhí)行上述代碼會報錯,因為當(dāng)前狀態(tài)不能更改 queue 或 exchange 存儲屬性,需要刪除重建。如果 queue 和 exchange 中一個聲明了持久化,另一個沒有聲明持久化,則不允許綁定。
3.消息持久化
雖然 exchange 和 queue 都申明了持久化,但如果消息只存在內(nèi)存里,rabbitmq 重啟后,內(nèi)存里的東西還是會丟失。所以必須聲明消息也是持久化,從內(nèi)存轉(zhuǎn)存到硬盤。
# 向隊列插入數(shù)值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 消息不丟失
消費(fèi)者(consumer)調(diào)用callback函數(shù)時,會存在處理消息失敗的風(fēng)險,如果處理失敗,則消息丟失。但是也可以選擇消費(fèi)者處理失敗時,將消息回退給 rabbitmq ,重新再被消費(fèi)者消費(fèi),這個時候需要設(shè)置確認(rèn)標(biāo)識。
channel.basic_consume(callback,queue = 'python-test', # no_ack 設(shè)置成 False,在調(diào)用callback函數(shù)時,未收到確認(rèn)標(biāo)識,消息會重回隊列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 no_ack = False)
四、RabbitMq 發(fā)布與訂閱
rabbitmq 的發(fā)布與訂閱要借助交換機(jī)(Exchange)的原理實現(xiàn):
Exchange 一共有三種工作模式:fanout, direct, topicd
模式一:fanout
這種模式下,傳遞到 exchange 的消息將會轉(zhuǎn)發(fā)到所有與其綁定的 queue 上。
- 不需要指定 routing_key ,即使指定了也是無效。
- 需要提前將 exchange 和 queue 綁定,一個 exchange 可以綁定多個 queue,一個queue可以綁定多個exchange。
- 需要先啟動 訂閱者,此模式下的隊列是 consumer 隨機(jī)生成的,發(fā)布者 僅僅發(fā)布消息到 exchange ,由 exchange 轉(zhuǎn)發(fā)消息至 queue。
發(fā)布者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼 # 虛擬隊列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向隊列插入數(shù)值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
訂閱者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 創(chuàng)建臨時隊列,隊列名傳空字符,consumer關(guān)閉后,隊列自動刪除 result = channel.queue_declare('',exclusive=True) # 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') # 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個隊列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue) # 定義一個回調(diào)函數(shù)來處理消息隊列中的消息,這里是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) channel.basic_consume(result.method.queue,callback,# 設(shè)置成 False,在調(diào)用callback函數(shù)時,未收到確認(rèn)標(biāo)識,消息會重回隊列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 auto_ack = False) channel.start_consuming()
模式二:direct
這種工作模式的原理是 消息發(fā)送至 exchange,exchange 根據(jù) 路由鍵(routing_key)轉(zhuǎn)發(fā)到相對應(yīng)的 queue 上。
- 可以使用默認(rèn) exchange =' ' ,也可以自定義 exchange
- 這種模式下不需要將 exchange 和 任何進(jìn)行綁定,當(dāng)然綁定也是可以的。可以將 exchange 和 queue ,routing_key 和 queue 進(jìn)行綁定
- 傳遞或接受消息時 需要 指定 routing_key
- 需要先啟動 訂閱者,此模式下的隊列是 consumer 隨機(jī)生成的,發(fā)布者 僅僅發(fā)布消息到 exchange ,由 exchange 轉(zhuǎn)發(fā)消息至 queue。
發(fā)布者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼 # 虛擬隊列需要指定參數(shù) virtual_host,如果是默認(rèn)的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 指定 routing_key。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
消費(fèi)者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 創(chuàng)建臨時隊列,隊列名傳空字符,consumer關(guān)閉后,隊列自動刪除 result = channel.queue_declare('',exclusive=True) # 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創(chuàng)建。durable = True 代表exchange持久化存儲,F(xiàn)alse 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') # 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應(yīng)該到哪個隊列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId') # 定義一個回調(diào)函數(shù)來處理消息隊列中的消息,這里是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #channel.basic_qos(prefetch_count=1) # 告訴rabbitmq,用callback來接受消息 channel.basic_consume(result.method.queue,callback, # 設(shè)置成 False,在調(diào)用callback函數(shù)時,未收到確認(rèn)標(biāo)識,消息會重回隊列。True,無論調(diào)用callback成功與否,消息都被消費(fèi)掉 auto_ack = False) channel.start_consuming()
模式三:topicd
這種模式和第二種模式差不多,exchange 也是通過 路由鍵 routing_key 來轉(zhuǎn)發(fā)消息到指定的 queue 。 不同點(diǎn)是 routing_key 使用正則表達(dá)式支持模糊匹配,但匹配規(guī)則又與常規(guī)的正則表達(dá)式不同,比如“#”是匹配全部,“*”是匹配一個詞。
舉例:routing_key =“#orderid#”,意思是將消息轉(zhuǎn)發(fā)至所有 routing_key 包含 “orderid” 字符的隊列中。代碼和模式二 類似,就不貼出來了。
以上就是python操作RabbitMq的三種工作模式的詳細(xì)內(nèi)容,更多關(guān)于python操作RabbitMq工作模式的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實現(xiàn)關(guān)鍵路徑和七格圖計算詳解
這篇文章主要為大家詳細(xì)介紹了如何利用Python實現(xiàn)關(guān)鍵路徑和七格圖計算,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下2023-03-03Python flask框架實現(xiàn)查詢數(shù)據(jù)庫并顯示數(shù)據(jù)
這篇文章主要介紹了Python flask框架實現(xiàn)查詢數(shù)據(jù)庫并顯示數(shù)據(jù),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06基于OpenCV(python)的實現(xiàn)文本分割之垂直投影法
本文主要介紹了基于OpenCV(python)的實現(xiàn)文本分割之垂直投影法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08Django實現(xiàn)WebSocket在線聊天室功能(channels庫)
本文基于channels庫Django實現(xiàn)WebSocket在線聊天室功能,包括安裝及創(chuàng)建django項目的全過程,通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09