python RabbitMQ 使用詳細介紹(小結)
上節(jié)回顧
主要講了協(xié)程、進程、異步IO多路復用。
協(xié)程和IO多路復用都是單線程的。
epoll 在linux下通過這個模塊libevent.so實現(xiàn)
gevent 在底層也是用了libevent.so
gevent可以理解為一個更上層的封裝。
使用select或者selectors,每接收或發(fā)送數(shù)據(jù)一次都要select一次
twisted異步網(wǎng)絡框架,強大又龐大,不支持python3 (代碼量python中排top3)。幾乎把所有的網(wǎng)絡服務都重寫了一遍。
一、RabbitMQ 消息隊列介紹
RabbitMQ也是消息隊列,那RabbitMQ和之前python的Queue有什么區(qū)別么?
py 消息隊列:
線程 queue(同一進程下線程之間進行交互)
進程 Queue(父子進程進行交互 或者 同屬于同一進程下的多個子進程進行交互)
如果是兩個完全獨立的python程序,也是不能用上面兩個queue進行交互的,或者和其他語言交互有哪些實現(xiàn)方式呢。
【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個程序之間交互,可以支持多個程序,可以維護好多個程序的隊列。
像這種公共的中間件有好多成熟的產(chǎn)品:
RabbitMQ
ZeroMQ
ActiveMQ
……
RabbitMQ:erlang語言 開發(fā)的。
Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務隊列) 、haigha
可以維護很多的隊列
RabbitMQ 教程官網(wǎng):http://www.rabbitmq.com/getstarted.html
幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關鍵字,exchange根據(jù)這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務
二、RabbitMQ基本示例.
1、Rabbitmq 安裝
ubuntu系統(tǒng)
install rabbitmq-server # 直接搞定
以下centos系統(tǒng)
1)Install Erlang
# For EL5: rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm # For EL6: rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm # For EL7: rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm yum install erlang
2)Install RabbitMQ Server
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum install rabbitmq-server-3.6.5-1.noarch.rpm
3)use RabbitMQ Server
chkconfig rabbitmq-server on service rabbitmq-server stop/start
2、基本示例
發(fā)送端 producer
import pika
# 建立一個實例
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672) # 默認端口5672,可不寫
)
# 聲明一個管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello', # queue名字
body='Hello World!') # 消息內(nèi)容
print(" [x] Sent 'Hello World!'")
connection.close() # 隊列關閉
接收端 consumer
import pika
import time
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()
# 為什么又聲明了一個‘hello'隊列?
# 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個機器先運行,所以要聲明兩次。
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body): # 四個參數(shù)為標準格式
print(ch, method, properties) # 打印看一下是什么
# 管道內(nèi)存對象 內(nèi)容相關信息 后面講
print(" [x] Received %r" % body)
time.sleep(15)
ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,消息處理完成
channel.basic_consume( # 消費消息
callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
queue='hello', # 你要從那個隊列里收消息
# no_ack=True # 寫的話,如果接收消息,機器宕機消息就丟了
# 一般不寫。宕機則生產(chǎn)者檢測到發(fā)給其他消費者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費消息
3、RabbitMQ 消息分發(fā)輪詢
上面的只是一個生產(chǎn)者、一個消費者,能不能一個生產(chǎn)者多個消費者呢?
可以上面的例子,多啟動幾個消費者consumer,看一下消息的接收情況。
采用輪詢機制;把消息依次分發(fā)
假如消費者處理消息需要15秒,如果當機了,那這個消息處理明顯還沒處理完,怎么處理?
(可以模擬消費端斷了,分別注釋和不注釋 no_ack=True 看一下)
你沒給我回復確認,就代表消息沒處理完。
上面的效果消費端斷了就轉到另外一個消費端去了,但是生產(chǎn)者怎么知道消費端斷了呢?
因為生產(chǎn)者和消費者是通過socket連接的,socket斷了,就說明消費端斷開了。
上面的模式只是依次分發(fā),實際情況是機器配置不一樣。怎么設置類似權重的操作?
RabbitMQ怎么辦呢,RabbitMQ做了簡單的處理就能實現(xiàn)公平的分發(fā)。
就是RabbitMQ給消費者發(fā)消息的時候檢測下消費者里的消息數(shù)量,如果超過指定值(比如1條),就不給你發(fā)了。
只需要在消費者端,channel.basic_consume前加上就可以了。
channel.basic_qos(prefetch_count=1) # 類似權重,按能力分發(fā),如果有一個消息,就不在給你發(fā) channel.basic_consume( # 消費消息
三、RabbitMQ 消息持久化(durable、properties)
1、RabbitMQ 相關命令
rabbitmqctl list_queues # 查看當前queue數(shù)量及queue里消息數(shù)量
2、消息持久化
如果隊列里還有消息,RabbitMQ 服務端宕機了呢?消息還在不在?
把RabbitMQ服務重啟,看一下消息在不在。
上面的情況下,宕機了,消息就久了,下面看看如何把消息持久化。
每次聲明隊列的時候,都加上durable,注意每個隊列都得寫,客戶端、服務端聲明的時候都得寫。
# 在管道里聲明queue channel.queue_declare(queue='hello2', durable=True)
測試結果發(fā)現(xiàn),只是把隊列持久化了,但是隊列里的消息沒了。
durable的作用只是把隊列持久化。離消息持久話還差一步:
發(fā)送端發(fā)送消息時,加上properties
properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 )
發(fā)送端 producer
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost',5672)) # 默認端口5672,可不寫
channel = connection.channel()
#聲明queue
channel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個名字
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello2',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
connection.close()
接收端 consumer
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(10)
ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生產(chǎn)者,消息處理完成
channel.basic_qos(prefetch_count=1) # 類似權重,按能力分發(fā),如果有一個消息,就不在給你發(fā)
channel.basic_consume( # 消費消息
callback, # 如果收到消息,就調(diào)用callback
queue='hello2',
# no_ack=True # 一般不寫,處理完接收處理結果。宕機則發(fā)給其他消費者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
四、RabbitMQ 廣播模式(exchange)
前面的效果都是一對一發(fā),如果做一個廣播效果可不可以,這時候就要用到exchange了
exchange必須精確的知道收到的消息要發(fā)給誰。exchange的類型決定了怎么處理,
類型有以下幾種:
- fanout: 所有綁定到此exchange的queue都可以接收消息
- direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
- topic: 所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
1、fanout 純廣播、all
需要queue和exchange綁定,因為消費者不是和exchange直連的,消費者是連在queue上,queue綁定在exchange上,消費者只會在queu里度消息
|------------------------|
| /—— queue <—|—> consumer1
producer —|—exchange1 <bind |
\ | \—— queue <—|—> consumer2
\-|-exchange2 …… |
|------------------------|
發(fā)送端 publisher 發(fā)布、廣播
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
# 注意:這里是廣播,不需要聲明queue
channel.exchange_declare(exchange='logs', # 聲明廣播管道
type='fanout')
# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='', # 注意此處空,必須有
body=message)
print(" [x] Sent %r" % message)
connection.close()
接收端 subscriber 訂閱
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
result = channel.queue_declare(exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)
channel.queue_bind(exchange='logs', # queue綁定到轉發(fā)器上
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
注意:廣播,是實時的,收不到就沒了,消息不會存下來,類似收音機。
2、direct 有選擇的接收消息
接收者可以過濾消息,只收我想要的消息
發(fā)送端publisher
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
# 重要程度級別,這里默認定義為 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
接收端subscriber
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運行腳本所有的參數(shù)
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 循環(huán)列表去綁定
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
運行接收端,指定接收級別的參數(shù),例:
python direct_sonsumer.py info warning
python direct_sonsumer.py warning error
3、topic 更細致的過濾
比如把error中,apache和mysql的分別或取出來
發(fā)送端publisher
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
接收端 subscriber
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
運行接收端,指定接收哪些消息,例:
python topic_sonsumer.py *.info python topic_sonsumer.py *.error mysql.* python topic_sonsumer.py '#' # 接收所有消息 # 接收所有的 logs run: # python receive_logs_topic.py "#" # To receive all logs from the facility "kern": # python receive_logs_topic.py "kern.*" # Or if you want to hear only about "critical" logs: # python receive_logs_topic.py "*.critical" # You can create multiple bindings: # python receive_logs_topic.py "kern.*" "*.critical" # And to emit a log with a routing key "kern.critical" type: # python emit_log_topic.py "kern.critical" "A critical kernel error"
4、RabbitMQ RPC 實現(xiàn)(Remote procedure call)
不知道你有沒有發(fā)現(xiàn),上面的流都是單向的,如果遠程的機器執(zhí)行完返回結果,就實現(xiàn)不了了。
如果返回,這種模式叫什么呢,RPC(遠程過程調(diào)用),snmp就是典型的RPC
RabbitMQ能不能返回呢,怎么返回呢?既是發(fā)送端又是接收端。
但是接收端返回消息怎么返回?可以發(fā)送到發(fā)過來的queue里么?不可以。
返回時,再建立一個queue,把結果發(fā)送新的queue里
為了服務端返回的queue不寫死,在客戶端給服務端發(fā)指令的的時候,同時帶一條消息說,你結果返回給哪個queue
RPC client
import pika
import uuid
import time
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, # 只要一收到消息就調(diào)用on_response
no_ack=True,
queue=self.callback_queue) # 收這個queue的消息
def on_response(self, ch, method, props, body): # 必須四個參數(shù)
# 如果收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None # 初始self.response為None
self.corr_id = str(uuid.uuid4()) # 隨機唯一字符串
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue', # 發(fā)消息到rpc_queue
properties=pika.BasicProperties( # 消息持久化
reply_to = self.callback_queue, # 讓服務端命令結果返回到callback_queue
correlation_id = self.corr_id, # 把隨機uuid同時發(fā)給服務器
),
body=str(n)
)
while self.response is None: # 當沒有數(shù)據(jù),就一直循環(huán)
# 啟動后,on_response函數(shù)接到消息,self.response 值就不為空了
self.connection.process_data_events() # 非阻塞版的start_consuming()
# print("no msg……")
# time.sleep(0.5)
# 收到消息就調(diào)用on_response
return int(self.response)
if __name__ == '__main__':
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(7)")
response = fibonacci_rpc.call(7)
print(" [.] Got %r" % response)
RPC server
import pika
import time
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(
exchange='', # 把執(zhí)行結果發(fā)回給客戶端
routing_key=props.reply_to, # 客戶端要求返回想用的queue
# 返回客戶端發(fā)過來的correction_id 為了讓客戶端驗證消息一致性
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response)
)
ch.basic_ack(delivery_tag = method.delivery_tag) # 任務完成,告訴客戶端
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue') # 聲明一個rpc_queue ,
channel.basic_qos(prefetch_count=1)
# 在rpc_queue里收消息,收到消息就調(diào)用on_request
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
- Python rabbitMQ如何實現(xiàn)生產(chǎn)消費者模式
- Python RabbitMQ實現(xiàn)簡單的進程間通信示例
- Python實現(xiàn)RabbitMQ6種消息模型的示例代碼
- Python隊列RabbitMQ 使用方法實例記錄
- Python操作rabbitMQ的示例代碼
- python實現(xiàn)RabbitMQ的消息隊列的示例代碼
- Python RabbitMQ消息隊列實現(xiàn)rpc
- Python+Pika+RabbitMQ環(huán)境部署及實現(xiàn)工作隊列的實例教程
- 利用Python學習RabbitMQ消息隊列
- 基于python實現(xiàn)監(jiān)聽Rabbitmq系統(tǒng)日志代碼示例
相關文章
Selenium及python實現(xiàn)滾動操作多種方法
這篇文章主要介紹了Selenium及python實現(xiàn)滾動操作多種方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07
python腳本實現(xiàn)音頻m4a格式轉成MP3格式的實例代碼
這篇文章主要介紹了python腳本實現(xiàn)音頻m4a格式轉成MP3格式的實例代碼,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2019-10-10
python引入requests報錯could?not?be?resolved解決方案
這篇文章主要為大家介紹了python引入requests報錯could?not?be?resolved解決方案,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
利用django和mysql實現(xiàn)一個簡單的web登錄頁面
這篇文章主要給大家介紹了關于如何利用django和mysql實現(xiàn)一個簡單的web登錄頁面的相關資料,文中通過圖文以及實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2023-05-05
詳解pandas刪除缺失數(shù)據(jù)(pd.dropna()方法)
這篇文章主要介紹了pandas刪除缺失數(shù)據(jù)(pd.dropna()方法),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-06-06

