利用Python學習RabbitMQ消息隊列
RabbitMQ可以當做一個消息代理,它的核心原理非常簡單:即接收和發(fā)送消息,可以把它想象成一個郵局:我們把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處,RabbitMQ就是一個郵箱、郵局、投遞員功能綜合體,整個過程就是:郵箱接收信件,郵局轉發(fā)信件,投遞員投遞信件到達收件人處。
RabbitMQ和郵局的主要區(qū)別就是RabbitMQ接收、存儲和發(fā)送的是二進制數(shù)據(jù)----消息。
rabbitmq基本管理命令:
一步啟動Erlang node和Rabbit應用:sudo rabbitmq-server
在后臺啟動Rabbit node:sudo rabbitmq-server -detached
關閉整個節(jié)點(包括應用):sudo rabbitmqctl stop
add_user <UserName> <Password> delete_user <UserName> change_password <UserName> <NewPassword> list_users add_vhost <VHostPath> delete_vhost <VHostPath> list_vhosts set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> clear_permissions [-p <VHostPath>] <UserName> list_permissions [-p <VHostPath>] list_user_permissions <UserName> list_queues [-p <VHostPath>] [<QueueInfoItem> ...] list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...]
Demo:
producer.py
#!/usr/bin/env python # -*- coding: utf_ -*- # Date: 年月日 # Author:蔚藍行 # 博客 http://www.cnblogs.com/duanv/ import pika import sys #創(chuàng)建連接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #創(chuàng)建虛擬連接channel cha = con.channel() #創(chuàng)建隊列anheng,durable參數(shù)為真時,隊列將持久化;exclusive為真時,建立臨時隊列 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False) #創(chuàng)建名為yanfa,類型為fanout的exchange,其他類型還有direct和topic,如果指定durable為真,exchange將持久化 cha.exchange_declare(durable=False, exchange='yanfa', type='direct',) #綁定exchange和queue,result.method.queue獲取的是隊列名稱 cha.queue_bind(exchange='yanfa', queue=result.method.queue, routing_key='',) #公平分發(fā),使每個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message cha.basic_qos(prefetch_count=) #發(fā)送信息到隊列‘a(chǎn)nheng' message = ' '.join(sys.argv[:]) #消息持久化指定delivery_mode=; cha.basic_publish(exchange='', routing_key='anheng', body=message, properties=pika.BasicProperties( delivery_mode = , )) print '[x] Sent %r' % (message,) #關閉連接 con.close()
consumer.py
#!/usr/bin/env python # -*- coding: utf_ -*- # Date: 年月日 # Author:蔚藍行 # 博客 http://www.cnblogs.com/duanv/ import pika #建立連接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #創(chuàng)建虛擬連接channel cha = con.channel() #創(chuàng)建隊列anheng result=cha.queue_declare(queue='anheng',durable=True) #創(chuàng)建名為yanfa,類型為fanout的交換機,其他類型還有direct和topic cha.exchange_declare(durable=False, exchange='yanfa', type='direct',) #綁定exchange和queue,result.method.queue獲取的是隊列名稱 cha.queue_bind(exchange='yanfa', queue=result.method.queue, routing_key='',) #公平分發(fā),使每個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message cha.basic_qos(prefetch_count=) print ' [*] Waiting for messages. To exit press CTRL+C' #定義回調(diào)函數(shù) def callback(ch, method, properties, body): print " [x] Received %r" % (body,) ch.basic_ack(delivery_tag = method.delivery_tag) cha.basic_consume(callback, queue='anheng', no_ack=False,) cha.start_consuming()
一、概念:
Connection: 一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。程序的起始處就是建立這個TCP連接。
Channels: 虛擬連接。建立在上述的TCP連接中。數(shù)據(jù)流動都是在Channel中進行的。一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
二、隊列:
首先建立一個Connection,然后建立Channels,在channel上建立隊列
建立時指定durable參數(shù)為真,隊列將持久化;指定exclusive為真,隊列為臨時隊列,關閉consumer后該隊列將不再存在,一般情況下建立臨時隊列并不指定隊列名稱,rabbitmq將隨機起名,通過result.method.queue來獲取隊列名:
result = channel.queue_declare(exclusive=True)
result.method.queue
區(qū)別:durable是隊列持久化與否,如果為真,隊列將在rabbitmq服務重啟后仍存在,如果為假,rabbitmq服務重啟前不會消失,與consumer關閉與否無關;
而exclusive是建立臨時隊列,當consumer關閉后,該隊列就會被刪除
三、exchange和bind
Exchange中durable參數(shù)指定exchange是否持久化,exchange參數(shù)指定exchange名稱,type指定exchange類型。Exchange類型有direct,fanout和topic。
Bind是將exchange與queue進行關聯(lián),exchange參數(shù)和queue參數(shù)分別指定要進行bind的exchange和queue,routing_key為可選參數(shù)。
Exchange的三種模式:
Direct:
任何發(fā)送到Direct Exchange的消息都會被轉發(fā)到routing_key中指定的Queue
1.一般情況可以使用rabbitMQ自帶的Exchange:””(該Exchange的名字為空字符串);
2.這種模式下不需要將Exchange進行任何綁定(bind)操作;
3.消息傳遞時需要一個“routing_key”,可以簡單的理解為要發(fā)送到的隊列名字;
4.如果vhost中不存在routing_key中指定的隊列名,則該消息會被拋棄。
Demo中雖然聲明了一個exchange='yanfa'和queue='anheng'的bind,但是在后面發(fā)送消息時并沒有使用該exchange和bind,而是采用了direct的模式,沒有指定exchange,而是指定了routing_key的名稱為隊列名,消息將發(fā)送到指定隊列。
如果一個exchange 聲明為direct,并且bind中指定了routing_key,那么發(fā)送消息時需要同時指明該exchange和routing_key.
Fanout:
任何發(fā)送到Fanout Exchange的消息都會被轉發(fā)到與該Exchange綁定(Binding)的所有Queue上
1.可以理解為路由表的模式
2.這種模式不需要routing_key
3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
Demo中創(chuàng)建了一個將一個exchange和一個queue進行fanout類型的bind.但是發(fā)送信息時沒有用到它,如果要用到它,只要在發(fā)送消息時指定該exchange的名稱即可,該exchange就會將消息發(fā)送到所有和它bind的隊列中。在fanout模式下,指定的routing_key是無效的 。
Topic:
任何發(fā)送到Topic Exchange的消息都會被轉發(fā)到所有關心routing_key中指定話題的Queue上
1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(routing_key),Exchange會將消息轉發(fā)到所有關注主題能與routing_key模糊匹配的隊列。
2.這種模式需要routing_key,也許要提前綁定Exchange與Queue。
3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及l(fā)og的消息(一個routing_key為”MQ.log.error”的消息會被轉發(fā)到該隊列)。
4.“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“l(fā)og.*”能與“l(fā)og.warn”匹配,無法與“l(fā)og.warn.timeout”匹配;但是“l(fā)og.#”能與上述兩者匹配。
5.同樣,如果Exchange沒有發(fā)現(xiàn)能夠與routing_key匹配的Queue,則會拋棄此消息。
四、任務分發(fā)
1.Rabbitmq的任務是循環(huán)分發(fā)的,如果開啟兩個consumer,producer發(fā)送的信息是輪流發(fā)送到兩個consume的。
2.在producer端使用cha.basic_publish()來發(fā)送消息,其中body參數(shù)就是要發(fā)送的消息,properties=pika.BasicProperties(delivery_mode = 2,)啟用消息持久化,可以防止RabbitMQ Server 重啟或者crash引起的數(shù)據(jù)丟失。
3.在接收端使用cha.basic_consume()無限循環(huán)監(jiān)聽,如果設置no-ack參數(shù)為真,每次Consumer接到數(shù)據(jù)后,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。為了保證數(shù)據(jù)不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數(shù)據(jù)后發(fā)送ack。
在處理數(shù)據(jù)后發(fā)送的ack,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收,處理完成,RabbitMQ可以去安全的刪除它了。如果Consumer退出了但是沒有發(fā)送ack,那么RabbitMQ就會把這個Message發(fā)送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會丟失。
這里并沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message并沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數(shù)據(jù)處理。
Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告訴rabbitmq消息已經(jīng)正確處理。如果沒有這條代碼,Consumer退出時,Message會重新分發(fā)。然后RabbitMQ會占用越來越多的內(nèi)存,由于RabbitMQ會長時間運行,因此這個“內(nèi)存泄漏”是致命的。去調(diào)試這種錯誤,可以通過一下命令打印un-acked Messages:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
4.公平分發(fā):設置cha.basic_qos(prefetch_count=1),這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發(fā)給它。
五、注意:
生產(chǎn)者和消費者都應該聲明建立隊列,網(wǎng)上教程上說第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會被修改。
可能因為版本問題,在我的測試中如果第二次聲明建立的隊列屬性和第一次不完全相同,將報類似這種錯406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"
如果是exchange第二次創(chuàng)建屬性不同,將報這種錯406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"
如果第一次聲明建立隊列也出現(xiàn)這個錯誤,說明之前存在名字相同的隊列且本次聲明的某些屬性和之前聲明不同,可通過命令sudo rabbitmqctl list_queues查看當前有哪些隊列。解決方法是聲明建立另一名稱的隊列或刪除原有隊列,如果原有隊列是非持久化的,可通過重啟rabbitmq服務刪除原有隊列,如果原有隊列是持久化的,只能刪除它所在的vhost,然后再重建vhost,再設置vhost的權限(先確認該vhost中沒有其他有用隊列)。
sudo rabbitmqctl delete_vhost / sudo rabbitmqctl add_vhost / sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'
以上內(nèi)容是小編給大家介紹的利用Python學習RabbitMQ消息隊列,希望大家喜歡。
- Python rabbitMQ如何實現(xiàn)生產(chǎn)消費者模式
- Python RabbitMQ實現(xiàn)簡單的進程間通信示例
- Python實現(xiàn)RabbitMQ6種消息模型的示例代碼
- Python隊列RabbitMQ 使用方法實例記錄
- Python操作rabbitMQ的示例代碼
- python實現(xiàn)RabbitMQ的消息隊列的示例代碼
- python RabbitMQ 使用詳細介紹(小結)
- Python RabbitMQ消息隊列實現(xiàn)rpc
- Python+Pika+RabbitMQ環(huán)境部署及實現(xiàn)工作隊列的實例教程
- 基于python實現(xiàn)監(jiān)聽Rabbitmq系統(tǒng)日志代碼示例
相關文章
Windows11使用Cpython?編譯文件報錯?error:?Unable?to?find?vcvars
這篇文章主要介紹了Windows11使用Cpython編譯文件報錯error:Unable?to find?vcvarsall.bat完美解決方法,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-05-05python KNN算法實現(xiàn)鳶尾花數(shù)據(jù)集分類
這篇文章主要介紹了python KNN算法實現(xiàn)鳶尾花數(shù)據(jù)集分類,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-10-10Python實現(xiàn)將Excel內(nèi)容批量導出為PDF文件
這篇文章主要為大家介紹了如何利用Python實現(xiàn)將Excel表格內(nèi)容批量導出為PDF文件,文中的實現(xiàn)步驟講解詳細,感興趣的小伙伴可以了解一下2022-04-04Python的Django框架中的數(shù)據(jù)庫配置指南
這篇文章主要介紹了Python的Django框架中的數(shù)據(jù)庫配置指南,文中舉了Python內(nèi)置的SQLite的示例,需要的朋友可以參考下2015-07-07