一文講透RabbitMQ 消息隊(duì)列中的拒絕機(jī)制
在消息隊(duì)列系統(tǒng)中,如果消費(fèi)者由于某些原因無(wú)法處理當(dāng)前接收到的消息,可以通過(guò)以下機(jī)制拒絕消息,并控制消息的后續(xù)處理方式(如重新入隊(duì)或丟棄)。以下是具體實(shí)現(xiàn)和注意事項(xiàng):
一. RabbitMQ 中的拒絕機(jī)制
1. RabbitMQ 中的拒絕機(jī)制
在 RabbitMQ 中,消費(fèi)者可以通過(guò)以下兩種方式拒絕消息:
(1)basic.reject(拒絕單條消息)
功能:拒絕單條消息,并指定是否將消息重新放回隊(duì)列。
參數(shù):
deliveryTag:消息的唯一標(biāo)識(shí)符(由 RabbitMQ 分配)。requeue:布爾值,決定消息是否重新入隊(duì)(true表示重新入隊(duì),false表示丟棄)。
示例代碼(Python):
import pika
def callback(ch, method, properties, body):
print(f"Received message: {body}")
# 模擬處理失敗
if some_condition:
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='my_queue', on_message_callback=callback)
(2)basic.nack(批量拒絕消息)
功能:拒絕多條消息,支持批量操作。
參數(shù):
deliveryTag:消息的唯一標(biāo)識(shí)符。multiple:布爾值,決定是否拒絕deliveryTag之前的所有未確認(rèn)消息。requeue:布爾值,決定消息是否重新入隊(duì)。
示例代碼(Python):
def callback(ch, method, properties, body):
print(f"Received message: {body}")
# 模擬處理失敗
if some_condition:
ch.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=True)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
2. 拒絕消息后的處理方式
拒絕消息后,消息的處理方式取決于 requeue 參數(shù)的值:
| requeue 值 | 消息處理方式 |
|---|---|
| True | 消息重新入隊(duì),可能被其他消費(fèi)者或當(dāng)前消費(fèi)者再次消費(fèi)(需確保隊(duì)列未滿)。 |
| False | 消息直接丟棄,或進(jìn)入 死信隊(duì)列(需提前配置死信隊(duì)列規(guī)則)。 |
3. 死信隊(duì)列(Dead Letter Queue, DLQ)
如果消息被拒絕且 requeue=False,消息可能被丟棄。為了避免消息丟失,可以通過(guò)配置 死信隊(duì)列 將消息轉(zhuǎn)發(fā)到另一個(gè)隊(duì)列,供后續(xù)分析或重試。
配置死信隊(duì)列的步驟(RabbitMQ):
聲明隊(duì)列時(shí)綁定死信交換器:
channel.queue_declare(
queue='my_queue',
arguments={
'x-dead-letter-exchange': 'dlx_exchange', # 死信交換器名稱
'x-message-ttl': 60000 # 可選:消息過(guò)期時(shí)間(毫秒)
}
)
聲明死信交換器和隊(duì)列:
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct') channel.queue_declare(queue='dlx_queue') channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue', routing_key='dlx_key')
消費(fèi)者處理失敗時(shí)拒絕消息并進(jìn)入死信隊(duì)列:
def callback(ch, method, properties, body):
if some_condition:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
二. Spring AMQP 中的拒絕機(jī)制
4. Spring AMQP 中的拒絕機(jī)制
在 Spring AMQP 中,可以通過(guò)以下方式實(shí)現(xiàn)消息拒絕:
(1)手動(dòng)確認(rèn)模式(AcknowledgeMode.MANUAL)
代碼示例:
@RabbitListener(queues = "my_queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
try {
// 處理消息
if (someCondition) {
// 拒絕消息并重新入隊(duì)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else {
// 確認(rèn)消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
// 異常處理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
(2)自動(dòng)確認(rèn)模式(AcknowledgeMode.AUTO)
Spring 會(huì)根據(jù)方法是否拋出異常自動(dòng)決定是否發(fā)送 basic.nack 或 basic.ack。
配置示例(YAML):
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
三. RocketMQ 中的拒絕機(jī)制
5. RocketMQ 中的拒絕機(jī)制
在 RocketMQ 中,消費(fèi)者無(wú)法直接“拒絕”消息,但可以通過(guò)以下方式模擬:
(1)消費(fèi)失敗時(shí)返回ConsumeConcurrentlyStatus.RECONSUME_LATER
功能:消息會(huì)重新投遞(默認(rèn)延遲10秒)。
代碼示例(Java):
public class MyConsumer implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 處理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消費(fèi)失敗,重新投遞
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
(2)限制重試次數(shù)
通過(guò) maxReconsumeTimes 配置最大重試次數(shù),避免無(wú)限循環(huán):
consumer.setMaxReconsumeTimes(3); // 最大重試3次
四 、常見(jiàn)場(chǎng)景&注意事項(xiàng)
6.常見(jiàn)場(chǎng)景與處理建議
場(chǎng)景 1:消息格式錯(cuò)誤或業(yè)務(wù)邏輯異常
- 處理方式:
- 拒絕消息(
requeue=False)并記錄日志。 - 配置死信隊(duì)列,將消息轉(zhuǎn)發(fā)到專門(mén)的錯(cuò)誤隊(duì)列供人工處理。
- 拒絕消息(
場(chǎng)景 2:資源不足或臨時(shí)故障
- 處理方式:
- 拒絕消息并重新入隊(duì)(
requeue=True),等待資源恢復(fù)后重新消費(fèi)。 - 在死信隊(duì)列中設(shè)置重試邏輯(如定時(shí)任務(wù)重新投遞)。
- 拒絕消息并重新入隊(duì)(
場(chǎng)景 3:消息已過(guò)期或無(wú)效
- 處理方式:
拒絕消息(
requeue=False)并丟棄。配置死信隊(duì)列,記錄過(guò)期消息用于分析。
7. 注意事項(xiàng)
- 避免無(wú)限循環(huán):
- 如果消息多次被拒絕并重新入隊(duì),可能導(dǎo)致無(wú)限消費(fèi)循環(huán)。需結(jié)合 死信隊(duì)列 或 重試次數(shù)限制 解決。
- 資源占用:
- 頻繁拒絕消息并重新入隊(duì)可能增加系統(tǒng)負(fù)載,需合理配置
requeue和prefetchCount。
- 頻繁拒絕消息并重新入隊(duì)可能增加系統(tǒng)負(fù)載,需合理配置
- 消息可靠性:
- 使用 手動(dòng)確認(rèn) 和 死信隊(duì)列 保障消息不丟失。
總結(jié)
消費(fèi)者拒絕消息的核心在于通過(guò) basic.reject/basic.nack(RabbitMQ)或 RECONSUME_LATER(RocketMQ)控制消息的后續(xù)處理。結(jié)合 死信隊(duì)列 和 重試機(jī)制,可以有效處理異常場(chǎng)景,確保消息的可靠性和系統(tǒng)的健壯性。
到此這篇關(guān)于一文講透RabbitMQ 消息隊(duì)列中的拒絕機(jī)制的文章就介紹到這了,更多相關(guān)RabbitMQ 拒絕機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Maui Blazor 使用攝像頭實(shí)現(xiàn)代碼
由于Maui Blazor中界面是由WebView渲染,所以再使用Android的攝像頭時(shí)無(wú)法去獲取,因?yàn)樵臄z像頭需要綁定界面組件,這篇文章主要介紹了Maui Blazor 使用攝像頭實(shí)現(xiàn),需要的朋友可以參考下2023-01-01
深度卷積神經(jīng)網(wǎng)絡(luò)各種改進(jìn)結(jié)構(gòu)塊匯總
這篇文章主要為大家介紹了深度卷積神經(jīng)網(wǎng)絡(luò)各種改進(jìn)結(jié)構(gòu)塊匯總,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
asp php 清空access mysql mssql數(shù)據(jù)庫(kù)的代碼
php清空mysql asp情況access或mssql2008-12-12
TIOBE編程語(yǔ)言排行榜前20的語(yǔ)言入門(mén)書(shū)籍推薦
這篇文章主要為大家推薦了TIOBE編程語(yǔ)言排行榜前20的語(yǔ)言入門(mén)書(shū)籍,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11
skynet.rawcall使用應(yīng)用場(chǎng)景分析
skynet.rawcall 是 Skynet 框架中用于直接傳遞原始二進(jìn)制數(shù)據(jù)的低級(jí)通信接口,適用于需要繞過(guò)自動(dòng)序列化/反序列化、手動(dòng)控制內(nèi)存或?qū)崿F(xiàn)高性能傳輸?shù)膱?chǎng)景,下面給大家介紹skynet.rawcall使用應(yīng)用場(chǎng)景分析,感興趣的朋友一起看看吧2025-04-04

