如何通過(guò)Python實(shí)現(xiàn)RabbitMQ延遲隊(duì)列
最近在做一任務(wù)時(shí),遇到需要延遲處理的數(shù)據(jù),最開(kāi)始的做法是現(xiàn)將數(shù)據(jù)存儲(chǔ)在數(shù)據(jù)庫(kù),然后寫(xiě)個(gè)腳本,隔五分鐘掃描數(shù)據(jù)表再處理數(shù)據(jù),實(shí)際效果并不好。因?yàn)橄到y(tǒng)本身一直在用RabbitMQ做異步處理任務(wù)的中間件,所以想到是否可以利用RabbitMQ實(shí)現(xiàn)延遲隊(duì)列。功夫不負(fù)有心人,RabbitMQ雖然沒(méi)有現(xiàn)成可用的延遲隊(duì)列,但是可以利用其兩個(gè)重要特性來(lái)實(shí)現(xiàn)之:1、Time To Live(TTL)消息超時(shí)機(jī)制;2、Dead Letter Exchanges(DLX)死信隊(duì)列。下面將具體描述實(shí)現(xiàn)原理以及實(shí)現(xiàn)代
延遲隊(duì)列的基礎(chǔ)原理Time To Live(TTL)
RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl,來(lái)控制消息的生存時(shí)間,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)
RabbitMQ消息的過(guò)期時(shí)間有兩種方法設(shè)置。
通過(guò)隊(duì)列(Queue)的屬性設(shè)置,隊(duì)列中所有的消息都有相同的過(guò)期時(shí)間。(本次延遲隊(duì)列采用的方案)對(duì)消息單獨(dú)設(shè)置,每條消息TTL可以不同。
如果同時(shí)使用,則消息的過(guò)期時(shí)間以?xún)烧咧gTTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過(guò)設(shè)置的TTL值,就成為死信(dead letter)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。
- x-dead-letter-exchange:出現(xiàn)死信(dead letter)之后將dead letter重新發(fā)送到指定exchange
- x-dead-letter-routing-key:出現(xiàn)死信(dead letter)之后將dead letter重新按照指定的routing-key發(fā)送
隊(duì)列中出現(xiàn)死信(dead letter)的情況有:
- 消息或者隊(duì)列的TTL過(guò)期。(延遲隊(duì)列利用的特性)
- 隊(duì)列達(dá)到最大長(zhǎng)度
- 消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false
綜合上面兩個(gè)特性,將隊(duì)列設(shè)置TTL規(guī)則,隊(duì)列TTL過(guò)期后消息會(huì)變成死信,然后利用DLX特性將其轉(zhuǎn)發(fā)到另外的交換機(jī)和隊(duì)列就可以被重新消費(fèi),達(dá)到延遲消費(fèi)效果。

延遲隊(duì)列設(shè)計(jì)及實(shí)現(xiàn)(Python)
從上面描述,延遲隊(duì)列的實(shí)現(xiàn)大致分為兩步:
產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,因?yàn)槲业男枨笾惺撬械南⒀舆t處理時(shí)間相同,所以本實(shí)現(xiàn)中采用 Queue TTL設(shè)置隊(duì)列的TTL,如果需要將隊(duì)列中的消息設(shè)置不同的延遲處理時(shí)間,則設(shè)置Per-Message TTL(官方文檔)
設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則,Dead Letter Exchanges設(shè)置方法(官方文檔)
完整代碼如下:
"""
Created on Fri Aug 3 17:00:44 2018
@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
self.exchange_type = "direct"
self.connection_string = conn_str
self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
self.channel = self.connection.channel()
self._declare_retry_queue() #RetryQueue and RetryExchange
logging.debug("connection established")
def close_connection(self):
self.connection.close()
logging.debug("connection closed")
def declare_exchange(self, exchange):
self.channel.exchange_declare(exchange=exchange,
exchange_type=self.exchange_type,
durable=True)
def declare_queue(self, queue):
self.channel.queue_declare(queue=queue,
durable=True,)
def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
"""
創(chuàng)建延遲隊(duì)列
:param TTL: ttl的單位是us,ttl=60000 表示 60s
:param queue:
:param DLX:死信轉(zhuǎn)發(fā)的exchange
:return:
"""
arguments={}
if DLX:
#設(shè)置死信轉(zhuǎn)發(fā)的exchange
arguments[ 'x-dead-letter-exchange']=DLX
if TTL:
arguments['x-message-ttl']=TTL
print(arguments)
self.channel.queue_declare(queue=queue,
durable=True,
arguments=arguments)
def _declare_retry_queue(self):
"""
創(chuàng)建異常交換器和隊(duì)列,用于存放沒(méi)有正常處理的消息。
:return:
"""
self.channel.exchange_declare(exchange='RetryExchange',
exchange_type='fanout',
durable=True)
self.channel.queue_declare(queue='RetryQueue',
durable=True)
self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
"""
發(fā)送消息到指定的交換器
:param exchange: RabbitMQ交換器
:param msg: 消息實(shí)體,是一個(gè)序列化的JSON字符串
:return:
"""
if delay==0:
self.declare_queue(routing_key)
else:
self.declare_delay_queue(routing_key,TTL=TTL)
if exchange!='':
self.declare_exchange(exchange)
self.channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2,
type=exchange
))
self.close_connection()
print("message send out to %s" % exchange)
logging.debug("message send out to %s" % exchange)
def start_consume(self,callback,queue='#',delay=1):
"""
啟動(dòng)消費(fèi)者,開(kāi)始消費(fèi)RabbitMQ中的消息
:return:
"""
if delay==1:
queue='RetryQueue'
else:
self.declare_queue(queue)
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume( # 消費(fèi)消息
callback, # 如果收到消息,就調(diào)用callback函數(shù)來(lái)處理消息
queue=queue, # 你要從那個(gè)隊(duì)列里收消息
)
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop_consuming()
def stop_consuming(self):
self.channel.stop_consuming()
self.close_connection()
def message_handle_successfully(channel, method):
"""
如果消息處理正常完成,必須調(diào)用此方法,
否則RabbitMQ會(huì)認(rèn)為消息處理不成功,重新將消息放回待執(zhí)行隊(duì)列中
:param channel: 回調(diào)函數(shù)的channel參數(shù)
:param method: 回調(diào)函數(shù)的method參數(shù)
:return:
"""
channel.basic_ack(delivery_tag=method.delivery_tag)
def message_handle_failed(channel, method):
"""
如果消息處理失敗,應(yīng)該調(diào)用此方法,會(huì)自動(dòng)將消息放入異常隊(duì)列
:param channel: 回調(diào)函數(shù)的channel參數(shù)
:param method: 回調(diào)函數(shù)的method參數(shù)
:return:
"""
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
發(fā)布消息代碼如下:
from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")
消費(fèi)者代碼如下:
from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
msg = body.decode()
print(msg)
# 如果處理成功,則調(diào)用此消息回復(fù)ack,表示消息成功處理完成。
RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解
- RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)
- 手把手帶你掌握SpringBoot RabbitMQ延遲隊(duì)列
- RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)詳解
- Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- C#實(shí)現(xiàn)rabbitmq 延遲隊(duì)列功能實(shí)例代碼
- rabbitmq延遲隊(duì)列的使用方式
相關(guān)文章
編寫(xiě)Python腳本把sqlAlchemy對(duì)象轉(zhuǎn)換成dict的教程
這篇文章主要介紹了編寫(xiě)Python腳本把sqlAlchemy對(duì)象轉(zhuǎn)換成dict的教程,主要是基于Python的model類(lèi)構(gòu)建一個(gè)轉(zhuǎn)換的方法,需要的朋友可以參考下2015-05-05
Python如何對(duì)音視頻文件進(jìn)行解析詳解
在腳本或應(yīng)用程序中,我們需要執(zhí)行音頻處理任務(wù),下面這篇文章主要給大家介紹了關(guān)于Python如何對(duì)音視頻文件進(jìn)行解析的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11
Pandas?計(jì)算相關(guān)性系數(shù)corr()方式
這篇文章主要介紹了Pandas?計(jì)算相關(guān)性系數(shù)corr()方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
淺談python編譯pyc工程--導(dǎo)包問(wèn)題解決
這篇文章主要介紹了python編譯pyc工程--導(dǎo)包問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03
Django使用jinja2模板的實(shí)現(xiàn)
本文主要介紹了Django使用jinja2模板的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02
基于python的漢字轉(zhuǎn)GBK碼實(shí)現(xiàn)代碼
今天想用python調(diào)用百度框計(jì)算的搜過(guò)結(jié)果,看到了URL里面的漢字用GBK編碼,雖然可以直接在URL里面加入中文,之前也做過(guò)一個(gè)簡(jiǎn)體字轉(zhuǎn)GBK碼的python函數(shù),但還是略嫌麻煩,今天改了一下2012-02-02
OpenCV?Python身份證信息識(shí)別過(guò)程詳解
本篇文章使用OpenCV-Python和CnOcr來(lái)實(shí)現(xiàn)身份證信息識(shí)別的案例,本篇文章使用的Python版本為3.6,OpenCV-Python版本為3.4.1.15,如果是4.x版本的同學(xué),可能會(huì)有一些Api操作不同,下面跟隨小編看下OpenCV?Python身份證信息識(shí)別過(guò)程2022-04-04

