解決python3 pika之連接斷開(kāi)的問(wèn)題
問(wèn)題描述
在消費(fèi)rabbitMQ隊(duì)列時(shí), 每次進(jìn)入回調(diào)函數(shù)內(nèi)需要進(jìn)行一些比較耗時(shí)的操作;操作完成后給rabbitMQ server發(fā)送ack信號(hào)以dequeue本條消息。
問(wèn)題就發(fā)生在發(fā)送ack操作時(shí), 程序提示鏈接已被斷開(kāi)或socket error。
源碼示例
#!/usr/bin
#coding: utf-8
import pika
import time
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
def callback(ch, method, properties, body):
print(body)
time.sleep(600)
ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
chan.basic_consume(callback, queue=TEST_QUEUE)
chan.start_consuming()
if __name__ == "__main__":
test_main()
運(yùn)行一段時(shí)間后, 就會(huì)報(bào)錯(cuò):
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None [CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed [ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')
問(wèn)題排查
猜測(cè):pika客戶(hù)端沒(méi)有及時(shí)發(fā)送心跳,連接被server斷開(kāi)
一開(kāi)始修改了heartbeat_interval參數(shù)值, 示例如下:
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.PlainCredentials(USER, PWD)))
# ....
修改后運(yùn)行依然報(bào)錯(cuò),后來(lái)想想應(yīng)該單線(xiàn)程被一直占用,pika無(wú)法發(fā)送心跳;
于是又加了個(gè)心跳線(xiàn)程, 示例如下:
#!/usr/bin
#coding: utf-8
import pika
import time
import logging
import threading
USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'
class Heartbeat(threading.Thread):
def __init__(self, connection):
super(Heartbeat, self).__init__()
self.lock = threading.Lock()
self.connection = connection
self.quitflag = False
self.stopflag = True
self.setDaemon(True)
def run(self):
while not self.quitflag:
time.sleep(10)
self.lock.acquire()
if self.stopflag :
self.lock.release()
continue
try:
self.connection.process_data_events()
except Exception as ex:
logging.warn("Error format: %s"%(str(ex)))
self.lock.release()
return
self.lock.release()
def startHeartbeat(self):
self.lock.acquire()
if self.quitflag==True:
self.lock.release()
return
self.stopflag=False
self.lock.release()
def callback(ch, method, properties, body):
logging.info("recv_body:%s" % body)
time.sleep(600)
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.BlockingConnection(
pika.ConnectionParameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.PlainCredentials(USER, PWD)))
chan = s_conn.channel()
chan.queue_declare(queue=TEST_QUEUE)
chan.basic_consume(callback,
queue=TEST_QUEUE)
heartbeat = Heartbeat(s_conn)
heartbeat.start() #開(kāi)啟心跳線(xiàn)程
heartbeat.startHeartbeat()
chan.start_consuming()
if __name__ == "__main__":
test_main()
嘗試運(yùn)行,結(jié)果還是不行,不得不安靜下來(lái)思考自己是不是想錯(cuò)了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_interval: How often to send heartbeats.
Min between this value and server's proposal
will be used. Use 0 to deactivate heartbeats
and None to accept server's proposal.
按這樣說(shuō)法,應(yīng)該還是沒(méi)有把心跳值給設(shè)置好。上面的程序期望是10秒發(fā)一次心跳,但是理論上發(fā)送心跳的間隔會(huì)比10秒多一點(diǎn)。所以艾瑪,我應(yīng)該是把heartbeat_interval的作用搞錯(cuò)了, 它是指超過(guò)這個(gè)時(shí)間間隔不發(fā)心跳或不給server任何信息,server就會(huì)斷開(kāi)連接, 而不是說(shuō)pika會(huì)按這個(gè)間隔來(lái)發(fā)心跳。 結(jié)果我把heartbeat_interval值設(shè)置高一點(diǎn)(比實(shí)際發(fā)送心跳/信息的間隔更長(zhǎng)),比如上面設(shè)置成60秒,就正常運(yùn)行了。
如果不指定heartbeat_interval, 它默認(rèn)為None, 意味著按rabbitMQ server的配置來(lái)檢測(cè)心跳是否正常。
如果設(shè)置heartbeat_interval=0, 意味著不檢測(cè)心跳,server端將不會(huì)主動(dòng)斷開(kāi)連接。
以上這篇解決python3 pika之連接斷開(kāi)的問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
python 遞歸遍歷文件夾,并打印滿(mǎn)足條件的文件路徑實(shí)例
下面小編就為大家?guī)?lái)一篇python 遞歸遍歷文件夾,并打印滿(mǎn)足條件的文件路徑實(shí)例。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-08-08
通過(guò)Python編寫(xiě)一個(gè)簡(jiǎn)單登錄功能過(guò)程解析
這篇文章主要介紹了通過(guò)Python編寫(xiě)一個(gè)簡(jiǎn)單登錄功能過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09
pd.DataFrame中的幾種索引變換的實(shí)現(xiàn)
本文主要介紹了pd.DataFrame中的幾種索引變換的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
詳解Python對(duì)某地區(qū)二手房房?jī)r(jià)數(shù)據(jù)分析
這篇文章主要為大家介紹了Python數(shù)據(jù)分析某地區(qū)二手房房?jī)r(jià),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2021-12-12
Python cookbook(數(shù)據(jù)結(jié)構(gòu)與算法)將序列分解為單獨(dú)變量的方法
這篇文章主要介紹了Python cookbook(數(shù)據(jù)結(jié)構(gòu)與算法)將序列分解為單獨(dú)變量的方法,結(jié)合實(shí)例形式分析了Python序列賦值實(shí)現(xiàn)的分解成單獨(dú)變量功能相關(guān)操作技巧,需要的朋友可以參考下2018-02-02
python操作redis數(shù)據(jù)庫(kù)的三種方法
這篇文章主要介紹了python操作redis數(shù)據(jù)庫(kù)的三種方法,幫助大家更好的理解和使用python,感興趣的朋友可以了解下2020-09-09
淺析Python 實(shí)現(xiàn)一個(gè)自動(dòng)化翻譯和替換的工具
這篇文章主要介紹了Python 實(shí)現(xiàn)一個(gè)自動(dòng)化翻譯和替換的工具,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-04-04
Pytorch使用MNIST數(shù)據(jù)集實(shí)現(xiàn)基礎(chǔ)GAN和DCGAN詳解
今天小編就為大家分享一篇Pytorch使用MNIST數(shù)據(jù)集實(shí)現(xiàn)基礎(chǔ)GAN和DCGAN詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-01-01

