欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何通過Python實現(xiàn)RabbitMQ延遲隊列

 更新時間:2020年11月28日 15:12:04   作者:Bge的博客  
這篇文章主要介紹了如何通過Python實現(xiàn)RabbitMQ延遲隊列,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

最近在做一任務(wù)時,遇到需要延遲處理的數(shù)據(jù),最開始的做法是現(xiàn)將數(shù)據(jù)存儲在數(shù)據(jù)庫,然后寫個腳本,隔五分鐘掃描數(shù)據(jù)表再處理數(shù)據(jù),實際效果并不好。因為系統(tǒng)本身一直在用RabbitMQ做異步處理任務(wù)的中間件,所以想到是否可以利用RabbitMQ實現(xiàn)延遲隊列。功夫不負有心人,RabbitMQ雖然沒有現(xiàn)成可用的延遲隊列,但是可以利用其兩個重要特性來實現(xiàn)之:1、Time To Live(TTL)消息超時機制;2、Dead Letter Exchanges(DLX)死信隊列。下面將具體描述實現(xiàn)原理以及實現(xiàn)代

延遲隊列的基礎(chǔ)原理Time To Live(TTL)

RabbitMQ可以針對Queue設(shè)置x-expires 或者 針對Message設(shè)置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設(shè)置以最先到期的時間為準),則消息變?yōu)閐ead letter(死信)
RabbitMQ消息的過期時間有兩種方法設(shè)置。

通過隊列(Queue)的屬性設(shè)置,隊列中所有的消息都有相同的過期時間。(本次延遲隊列采用的方案)對消息單獨設(shè)置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準。消息在隊列的生存時間一旦超過設(shè)置的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了dead letter,則按照這兩個參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊列。

  • x-dead-letter-exchange:出現(xiàn)死信(dead letter)之后將dead letter重新發(fā)送到指定exchange
  • x-dead-letter-routing-key:出現(xiàn)死信(dead letter)之后將dead letter重新按照指定的routing-key發(fā)送

隊列中出現(xiàn)死信(dead letter)的情況有:

  • 消息或者隊列的TTL過期。(延遲隊列利用的特性)
  • 隊列達到最大長度
  • 消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false

綜合上面兩個特性,將隊列設(shè)置TTL規(guī)則,隊列TTL過期后消息會變成死信,然后利用DLX特性將其轉(zhuǎn)發(fā)到另外的交換機和隊列就可以被重新消費,達到延遲消費效果。

延遲隊列設(shè)計及實現(xiàn)(Python)

從上面描述,延遲隊列的實現(xiàn)大致分為兩步:

產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,因為我的需求中是所有的消息延遲處理時間相同,所以本實現(xiàn)中采用 Queue TTL設(shè)置隊列的TTL,如果需要將隊列中的消息設(shè)置不同的延遲處理時間,則設(shè)置Per-Message TTL(官方文檔

設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則,Dead Letter Exchanges設(shè)置方法(官方文檔

完整代碼如下:

"""
Created on Fri Aug 3 17:00:44 2018

@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
  def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
    self.exchange_type = "direct"
    self.connection_string = conn_str
    self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
    self.channel = self.connection.channel()
    self._declare_retry_queue() #RetryQueue and RetryExchange
    logging.debug("connection established")
  def close_connection(self):
    self.connection.close()
    logging.debug("connection closed")
  def declare_exchange(self, exchange):
    self.channel.exchange_declare(exchange=exchange,
                   exchange_type=self.exchange_type,
                   durable=True)
  def declare_queue(self, queue):
    self.channel.queue_declare(queue=queue,
                  durable=True,)
  def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
    """
    創(chuàng)建延遲隊列
    :param TTL: ttl的單位是us,ttl=60000 表示 60s
    :param queue:
    :param DLX:死信轉(zhuǎn)發(fā)的exchange
    :return:
    """
    arguments={}
    if DLX:
      #設(shè)置死信轉(zhuǎn)發(fā)的exchange
      arguments[ 'x-dead-letter-exchange']=DLX
    if TTL:
      arguments['x-message-ttl']=TTL
    print(arguments)
    self.channel.queue_declare(queue=queue,
                  durable=True,
                  arguments=arguments)
  def _declare_retry_queue(self):
    """
    創(chuàng)建異常交換器和隊列,用于存放沒有正常處理的消息。
    :return:
    """
    self.channel.exchange_declare(exchange='RetryExchange',
                   exchange_type='fanout',
                   durable=True)
    self.channel.queue_declare(queue='RetryQueue',
                  durable=True)
    self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
  def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
    """
    發(fā)送消息到指定的交換器
    :param exchange: RabbitMQ交換器
    :param msg: 消息實體,是一個序列化的JSON字符串
    :return:
    """
    if delay==0:
      self.declare_queue(routing_key)
    else:
      self.declare_delay_queue(routing_key,TTL=TTL)
    if exchange!='':
      self.declare_exchange(exchange)
    self.channel.basic_publish(exchange=exchange,
                  routing_key=routing_key,
                  body=msg,
                  properties=pika.BasicProperties(
                    delivery_mode=2,
                    type=exchange
                  ))
    self.close_connection()
    print("message send out to %s" % exchange)
    logging.debug("message send out to %s" % exchange)
  def start_consume(self,callback,queue='#',delay=1):
    """
    啟動消費者,開始消費RabbitMQ中的消息
    :return:
    """
    if delay==1:
      queue='RetryQueue'
    else:
      self.declare_queue(queue)
    self.channel.basic_qos(prefetch_count=1)
    try:
      self.channel.basic_consume( # 消費消息
        callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
        queue=queue, # 你要從那個隊列里收消息
      )
      self.channel.start_consuming()
    except KeyboardInterrupt:
      self.stop_consuming()
  def stop_consuming(self):
    self.channel.stop_consuming()
    self.close_connection()
  def message_handle_successfully(channel, method):
    """
    如果消息處理正常完成,必須調(diào)用此方法,
    否則RabbitMQ會認為消息處理不成功,重新將消息放回待執(zhí)行隊列中
    :param channel: 回調(diào)函數(shù)的channel參數(shù)
    :param method: 回調(diào)函數(shù)的method參數(shù)
    :return:
    """
    channel.basic_ack(delivery_tag=method.delivery_tag)
  def message_handle_failed(channel, method):
    """
    如果消息處理失敗,應(yīng)該調(diào)用此方法,會自動將消息放入異常隊列
    :param channel: 回調(diào)函數(shù)的channel參數(shù)
    :param method: 回調(diào)函數(shù)的method參數(shù)
    :return:
    """
    channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

發(fā)布消息代碼如下:

from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")

消費者代碼如下:

from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
    msg = body.decode()
    print(msg)
    # 如果處理成功,則調(diào)用此消息回復(fù)ack,表示消息成功處理完成。
    RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • Python新手學習raise用法

    Python新手學習raise用法

    在本篇文章里小編給大家分享的是一篇關(guān)于Python新手學習raise用法的相關(guān)知識點,需要的朋友們可以參考下。
    2020-06-06
  • 編寫Python腳本把sqlAlchemy對象轉(zhuǎn)換成dict的教程

    編寫Python腳本把sqlAlchemy對象轉(zhuǎn)換成dict的教程

    這篇文章主要介紹了編寫Python腳本把sqlAlchemy對象轉(zhuǎn)換成dict的教程,主要是基于Python的model類構(gòu)建一個轉(zhuǎn)換的方法,需要的朋友可以參考下
    2015-05-05
  • Python如何對音視頻文件進行解析詳解

    Python如何對音視頻文件進行解析詳解

    在腳本或應(yīng)用程序中,我們需要執(zhí)行音頻處理任務(wù),下面這篇文章主要給大家介紹了關(guān)于Python如何對音視頻文件進行解析的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-11-11
  • minconda安裝pytorch的詳細方法

    minconda安裝pytorch的詳細方法

    這篇文章主要介紹了minconda安裝pytorch的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • Pandas?計算相關(guān)性系數(shù)corr()方式

    Pandas?計算相關(guān)性系數(shù)corr()方式

    這篇文章主要介紹了Pandas?計算相關(guān)性系數(shù)corr()方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • 淺談python編譯pyc工程--導(dǎo)包問題解決

    淺談python編譯pyc工程--導(dǎo)包問題解決

    這篇文章主要介紹了python編譯pyc工程--導(dǎo)包問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-03-03
  • Django使用jinja2模板的實現(xiàn)

    Django使用jinja2模板的實現(xiàn)

    本文主要介紹了Django使用jinja2模板的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-02-02
  • 基于python的漢字轉(zhuǎn)GBK碼實現(xiàn)代碼

    基于python的漢字轉(zhuǎn)GBK碼實現(xiàn)代碼

    今天想用python調(diào)用百度框計算的搜過結(jié)果,看到了URL里面的漢字用GBK編碼,雖然可以直接在URL里面加入中文,之前也做過一個簡體字轉(zhuǎn)GBK碼的python函數(shù),但還是略嫌麻煩,今天改了一下
    2012-02-02
  • Python中Collection的使用小技巧

    Python中Collection的使用小技巧

    這篇文章主要介紹了Python中Collection的使用小技巧,對初學者來說有不錯的學習借鑒價值,需要的朋友可以參考下
    2014-08-08
  • OpenCV?Python身份證信息識別過程詳解

    OpenCV?Python身份證信息識別過程詳解

    本篇文章使用OpenCV-Python和CnOcr來實現(xiàn)身份證信息識別的案例,本篇文章使用的Python版本為3.6,OpenCV-Python版本為3.4.1.15,如果是4.x版本的同學,可能會有一些Api操作不同,下面跟隨小編看下OpenCV?Python身份證信息識別過程
    2022-04-04

最新評論