python使用pika庫(kù)調(diào)用rabbitmq參數(shù)使用詳情
前言:
python使用pika庫(kù)調(diào)用rabbitmq的參數(shù)有三種方式,分別如下所述:
1、應(yīng)答參數(shù)
auto_ack=False ch.basic_ack(delivery_tag=method.delivery_tag)
生產(chǎn)者模式:
示例代碼:
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建隊(duì)列 channel.queue_declare(queue='hello') # 3.向指定隊(duì)列插入數(shù)據(jù) channel.basic_publish(exchange='', # 簡(jiǎn)單模式 routing_key='hello', # 指定隊(duì)列 body='Hello World!') # 向隊(duì)列中添加的數(shù)據(jù) print(" [x] Sent 'Hello World!'")
運(yùn)行結(jié)果:
消費(fèi)者模式:
示例代碼:
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建隊(duì)列 # 注意:這一步不是必須的,但是如果消費(fèi)者先啟動(dòng)而不是生成者先啟動(dòng)時(shí),這時(shí)隊(duì)列中還沒有hello隊(duì)列,這時(shí)就會(huì)報(bào)錯(cuò) channel.queue_declare(queue='hello') # 3.確定回調(diào)函數(shù) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 4.確定監(jiān)聽隊(duì)列參數(shù) channel.basic_consume(queue='hello', auto_ack=False, # 手動(dòng)應(yīng)答方式 on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') # 5.正式監(jiān)聽 channel.start_consuming()
運(yùn)行結(jié)果:
注意:添加應(yīng)答參數(shù)的好處是當(dāng)消費(fèi)者處理回調(diào)函數(shù)的時(shí),萬(wàn)一程序報(bào)錯(cuò),此時(shí)數(shù)據(jù)就會(huì)消失的。使用應(yīng)答方式后,消費(fèi)者程序萬(wàn)一報(bào)錯(cuò),修改完程序后重新啟動(dòng)程序還是可以繼續(xù)消費(fèi)上一次的數(shù)據(jù)的。使用應(yīng)答參數(shù)后,沒處理完一條數(shù)據(jù)都會(huì)給隊(duì)列一個(gè)反饋消息的,也就是說消費(fèi)完一條消息后隊(duì)列才會(huì)刪除這條消息。這種方式效率會(huì)降低一些,根據(jù)項(xiàng)目中數(shù)據(jù)的重要性可以選擇是否需要這個(gè)參數(shù)。
2、持久化參數(shù)
#聲明queue channel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個(gè)名字 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) )
生成者方式:
示例代碼:
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建持久化隊(duì)列 # 注意:非持久化隊(duì)列不能變持久化隊(duì)列,反之也是這樣的,所有創(chuàng)建隊(duì)列中不能創(chuàng)建和非持久化隊(duì)列重名的隊(duì)列 channel.queue_declare(queue='hello2', durable=True) # 3.向指定隊(duì)列插入數(shù)據(jù) channel.basic_publish(exchange='', # 簡(jiǎn)單模式 routing_key='hello2', # 指定隊(duì)列 body='Hello World!', # 向隊(duì)列中添加的數(shù)據(jù) properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent 'Hello World!'")
運(yùn)行結(jié)果:
消費(fèi)者方式:
示例代碼:
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建持久化隊(duì)列 # 注意:非持久化隊(duì)列不能變持久化隊(duì)列,反之也是這樣的,所有創(chuàng)建隊(duì)列中不能創(chuàng)建和非持久化隊(duì)列重名的隊(duì)列 # 注意:這一步不是必須的,但是如果消費(fèi)者先啟動(dòng)而不是生成者先啟動(dòng)時(shí),這時(shí)隊(duì)列中還沒有hello2隊(duì)列,這時(shí)就會(huì)報(bào)錯(cuò) channel.queue_declare(queue='hello2', durable=True) # 3.確定回調(diào)函數(shù) def callback(ch, method, properties, body): print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 4.確定監(jiān)聽隊(duì)列參數(shù) channel.basic_consume(queue='hello2', # 指定隊(duì)列 auto_ack=False, # 手動(dòng)應(yīng)答方式 on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') # 5.正式監(jiān)聽 channel.start_consuming()
運(yùn)行結(jié)果:
注意:加入持久化參數(shù)的好處,當(dāng)rabbitmq隊(duì)列萬(wàn)一崩了時(shí),此時(shí)隊(duì)列中的所有數(shù)據(jù)都會(huì)丟失,rabbitmq隊(duì)列中的數(shù)據(jù)是保存在內(nèi)存中,當(dāng)加入持久化參數(shù)后,數(shù)據(jù)將會(huì)保存在硬盤中,rabbitmq崩了或者重啟不會(huì)丟失數(shù)據(jù)。
3、分發(fā)參數(shù)
有兩個(gè)消費(fèi)者同時(shí)監(jiān)聽一個(gè)的隊(duì)列。其中一個(gè)線程sleep2秒,另一個(gè)消費(fèi)者線程sleep1秒,但是處理的消息是一樣多。這種方式叫輪詢分發(fā)(round-robin)不管誰(shuí)忙,都不會(huì)多給消息,總是你一個(gè)我一個(gè)。想要做到公平分發(fā)(fair dispatch),必須關(guān)閉自動(dòng)應(yīng)答ack,改成手動(dòng)應(yīng)答。使用basicQos(perfetch=1)限制每次只發(fā)送不超過1條消息到同一個(gè)消費(fèi)者,消費(fèi)者必須手動(dòng)反饋告知隊(duì)列,才會(huì)發(fā)送下一個(gè)。
channel.basic_qos(prefetch_count=1)
生產(chǎn)者模式:
示例代碼: 【為了產(chǎn)生多條數(shù)據(jù),將此程序執(zhí)行多次】
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建隊(duì)列 channel.queue_declare(queue='hello3') # 3.向指定隊(duì)列插入數(shù)據(jù) channel.basic_publish(exchange='', # 簡(jiǎn)單模式 routing_key='hello3', # 指定隊(duì)列 body='Hello World666!', # 向隊(duì)列中添加的數(shù)據(jù) properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) ) print(" [x] Sent 'Hello World!'")
運(yùn)行結(jié)果:
消費(fèi)者模式:
示例代碼1:
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建隊(duì)列 # 注意:這一步不是必須的,但是如果消費(fèi)者先啟動(dòng)而不是生成者先啟動(dòng)時(shí),這時(shí)隊(duì)列中還沒有hello2隊(duì)列,這時(shí)就會(huì)報(bào)錯(cuò) channel.queue_declare(queue='hello3') # 3.確定回調(diào)函數(shù) def callback(ch, method, properties, body): import time time.sleep(15) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 公平分發(fā),若不加下面這行代碼,默認(rèn)是輪詢分發(fā) channel.basic_qos(prefetch_count=1) # 4.確定監(jiān)聽隊(duì)列參數(shù) channel.basic_consume(queue='hello3', # 指定隊(duì)列 auto_ack=False, # 手動(dòng)應(yīng)答方式 on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') # 5.正式監(jiān)聽 channel.start_consuming()
運(yùn)行結(jié)果:
示例代碼2:
import pika # 1.連接rabbit connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.124.104')) channel = connection.channel() # 2.創(chuàng)建隊(duì)列 # 注意:這一步不是必須的,但是如果消費(fèi)者先啟動(dòng)而不是生成者先啟動(dòng)時(shí),這時(shí)隊(duì)列中還沒有hello2隊(duì)列,這時(shí)就會(huì)報(bào)錯(cuò) channel.queue_declare(queue='hello3') # 3.確定回調(diào)函數(shù) def callback(ch, method, properties, body): import time time.sleep(3) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) # 公平分發(fā),若不加下面這行代碼,默認(rèn)是輪詢分發(fā) channel.basic_qos(prefetch_count=1) # 4.確定監(jiān)聽隊(duì)列參數(shù) channel.basic_consume(queue='hello3', # 指定隊(duì)列 auto_ack=False, # 手動(dòng)應(yīng)答方式 on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') # 5.正式監(jiān)聽 channel.start_consuming()
注意:當(dāng)一個(gè)py文件執(zhí)行多次時(shí),會(huì)有下面提示:
運(yùn)行結(jié)果:
到此這篇關(guān)于python使用pika庫(kù)調(diào)用rabbitmq參數(shù)使用詳情的文章就介紹到這了,更多相關(guān)python pika庫(kù)調(diào)用 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)現(xiàn)藍(lán)線挑戰(zhàn)特效的示例代碼
在抖音曾經(jīng)火了一陣子的藍(lán)線挑戰(zhàn)特效,其原理很簡(jiǎn)單。本文將試著用opencv-python實(shí)現(xiàn)這個(gè)效果,做了攝像頭版本和視頻處理版本,感興趣的可以學(xué)習(xí)一下2022-10-10使用Matplotlib 繪制精美的數(shù)學(xué)圖形例子
今天小編就為大家分享一篇使用Matplotlib 繪制精美的數(shù)學(xué)圖形例子,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2019-12-12python中關(guān)于property的最詳細(xì)使用方法
這篇文章主要介紹了python中關(guān)于property的最詳細(xì)使用方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04教你用Python寫一個(gè)京東自動(dòng)下單搶購(gòu)腳本
很多朋友都有網(wǎng)購(gòu)搶購(gòu)限量商品的經(jīng)歷,有時(shí)候蹲點(diǎn)搶怎么也搶不到,今天小編帶你們學(xué)習(xí)怎么用Python寫一個(gè)京東自動(dòng)下單搶購(gòu)腳本,以后再也不用拼手速拼網(wǎng)速啦,快來(lái)一起看看吧2023-03-03使用wxPython和ECharts實(shí)現(xiàn)生成和保存HTML圖表
wxPython是一個(gè)基于wxWidgets的Python?GUI庫(kù),ECharts是一個(gè)用于數(shù)據(jù)可視化的JavaScript庫(kù),本文主要為大家介紹了如何使用wxPython和ECharts庫(kù)來(lái)生成和保存HTML圖表,感興趣的可以學(xué)習(xí)一下2023-08-08Python使用struct處理二進(jìn)制(pack和unpack用法)
這篇文章主要介紹了Python使用struct處理二進(jìn)制(pack和unpack用法),幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-11-11python簡(jiǎn)單實(shí)例訓(xùn)練(21~30)
上篇文章給大家介紹了python簡(jiǎn)單實(shí)例訓(xùn)練的1-10,這里繼續(xù)為大家介紹python的一些用法,希望大家每個(gè)例子都打出來(lái)測(cè)試一下2017-11-11