欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解決python3 pika之連接斷開的問題

 更新時(shí)間:2018年12月18日 09:51:07   作者:moxiaomomo  
今天小編就為大家分享一篇解決python3 pika之連接斷開的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧

問題描述

在消費(fèi)rabbitMQ隊(duì)列時(shí), 每次進(jìn)入回調(diào)函數(shù)內(nèi)需要進(jìn)行一些比較耗時(shí)的操作;操作完成后給rabbitMQ server發(fā)送ack信號(hào)以dequeue本條消息。

問題就發(fā)生在發(fā)送ack操作時(shí), 程序提示鏈接已被斷開或socket error。

源碼示例

#!/usr/bin
#coding: utf-8

import pika
import time


USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

def callback(ch, method, properties, body):
 print(body)
 time.sleep(600)
 ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
 ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1', 
   credentials=pika.PlainCredentials(USER, PWD)))
 chan = s_conn.channel()
 chan.queue_declare(queue=TEST_QUEUE)

 chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
 chan.basic_consume(callback, queue=TEST_QUEUE)
 chan.start_consuming()

if __name__ == "__main__":
 test_main()

運(yùn)行一段時(shí)間后, 就會(huì)報(bào)錯(cuò):

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None
[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

問題排查

猜測(cè):pika客戶端沒有及時(shí)發(fā)送心跳,連接被server斷開

一開始修改了heartbeat_interval參數(shù)值, 示例如下:

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1', 
   heartbeat_interval=10,
   socket_timeout=5,
   credentials=pika.PlainCredentials(USER, PWD)))
 # ....

修改后運(yùn)行依然報(bào)錯(cuò),后來(lái)想想應(yīng)該單線程被一直占用,pika無(wú)法發(fā)送心跳;

于是又加了個(gè)心跳線程, 示例如下:

#!/usr/bin
#coding: utf-8

import pika
import time
import logging
import threading

USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

class Heartbeat(threading.Thread):
 def __init__(self, connection):
  super(Heartbeat, self).__init__()
  self.lock = threading.Lock()
  self.connection = connection
  self.quitflag = False
  self.stopflag = True
  self.setDaemon(True)

 def run(self):
  while not self.quitflag:
   time.sleep(10)
   self.lock.acquire()
   if self.stopflag :
    self.lock.release()
    continue
   try:
    self.connection.process_data_events()
   except Exception as ex:
    logging.warn("Error format: %s"%(str(ex)))
    self.lock.release()
    return
   self.lock.release()

 def startHeartbeat(self):
  self.lock.acquire()
  if self.quitflag==True:
   self.lock.release()
   return
  self.stopflag=False
  self.lock.release()

def callback(ch, method, properties, body):
 logging.info("recv_body:%s" % body)
 time.sleep(600)
 ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1', 
   heartbeat_interval=10,
   socket_timeout=5,
   credentials=pika.PlainCredentials(USER, PWD)))
 chan = s_conn.channel()
 chan.queue_declare(queue=TEST_QUEUE)
 chan.basic_consume(callback,
      queue=TEST_QUEUE)

 heartbeat = Heartbeat(s_conn)
 heartbeat.start()   #開啟心跳線程
 heartbeat.startHeartbeat()
 chan.start_consuming()

if __name__ == "__main__":
 test_main()

嘗試運(yùn)行,結(jié)果還是不行,不得不安靜下來(lái)思考自己是不是想錯(cuò)了。

去看它的api,看到heartbeat_interval的解析:

:param int heartbeat_interval: How often to send heartbeats.
         Min between this value and server's proposal
         will be used. Use 0 to deactivate heartbeats
         and None to accept server's proposal.

按這樣說(shuō)法,應(yīng)該還是沒有把心跳值給設(shè)置好。上面的程序期望是10秒發(fā)一次心跳,但是理論上發(fā)送心跳的間隔會(huì)比10秒多一點(diǎn)。所以艾瑪,我應(yīng)該是把heartbeat_interval的作用搞錯(cuò)了, 它是指超過(guò)這個(gè)時(shí)間間隔不發(fā)心跳或不給server任何信息,server就會(huì)斷開連接, 而不是說(shuō)pika會(huì)按這個(gè)間隔來(lái)發(fā)心跳。 結(jié)果我把heartbeat_interval值設(shè)置高一點(diǎn)(比實(shí)際發(fā)送心跳/信息的間隔更長(zhǎng)),比如上面設(shè)置成60秒,就正常運(yùn)行了。

如果不指定heartbeat_interval, 它默認(rèn)為None, 意味著按rabbitMQ server的配置來(lái)檢測(cè)心跳是否正常。

如果設(shè)置heartbeat_interval=0, 意味著不檢測(cè)心跳,server端將不會(huì)主動(dòng)斷開連接。

以上這篇解決python3 pika之連接斷開的問題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論