RabbitMQ延時(shí)隊(duì)列詳解與Java代碼實(shí)現(xiàn)
作者:小威要向諸佬學(xué)習(xí)呀
RabbitMQ 延時(shí)隊(duì)列介紹
RabbitMQ 延時(shí)隊(duì)列是指消息在發(fā)送到隊(duì)列后,并不立即被消費(fèi)者消費(fèi),而是等待一段時(shí)間后再被消費(fèi)者消費(fèi)。這種隊(duì)列通常用于實(shí)現(xiàn)定時(shí)任務(wù),例如,訂單超時(shí)未支付系統(tǒng)取消訂單釋放所占庫(kù)存等。
RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的方法有多種,其中比較常見(jiàn)的是使用插件或者通過(guò)DLX(Dead Letter Exchange)機(jī)制實(shí)現(xiàn)。
使用插件實(shí)現(xiàn)延時(shí)隊(duì)列
RabbitMQ提供了rabbitmq_delayed_message_exchange插件,可以通過(guò)該插件實(shí)現(xiàn)延時(shí)隊(duì)列。該插件的原理是在消息發(fā)送時(shí),將消息發(fā)送到一個(gè)特定的Exchange中,然后該Exchange會(huì)根據(jù)消息中的延時(shí)時(shí)間將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,從而實(shí)現(xiàn)延時(shí)隊(duì)列的功能。
使用該插件需要先安裝插件,然后創(chuàng)建一個(gè)Exchange,并將該Exchange的類型設(shè)置為x-delayed-message,然后將該Exchange與隊(duì)列綁定即可。
使用DLX機(jī)制實(shí)現(xiàn)延時(shí)隊(duì)列
消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。而對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的 設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì) 列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。可以通過(guò)設(shè)置消息的expiration字段或者x- message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。
DLX機(jī)制是RabbitMQ提供的一種消息轉(zhuǎn)發(fā)機(jī)制,它可以將無(wú)法被處理的消息轉(zhuǎn)發(fā)到指定的Exchange中,從而實(shí)現(xiàn)消息的延時(shí)處理。具體實(shí)現(xiàn)步驟如下:
- 創(chuàng)建一個(gè)普通的Exchange和Queue,并將它們綁定在一起。
- 創(chuàng)建一個(gè)DLX Exchange,并將普通Exchange綁定到該DLX Exchange上。
- 將Queue設(shè)置為具有TTL(Time To Live)屬性,并設(shè)置消息過(guò)期時(shí)間。
- 將Queue綁定到DLX Exchange上。
當(dāng)消息過(guò)期后,會(huì)被發(fā)送到DLX Exchange中,然后再由DLX Exchange將消息轉(zhuǎn)發(fā)到指定的Exchange中,從而實(shí)現(xiàn)延時(shí)隊(duì)列的功能。
使用DLX機(jī)制實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)點(diǎn)是不需要安裝額外的插件,但是需要對(duì)消息的過(guò)期時(shí)間進(jìn)行精確控制,否則可能會(huì)出現(xiàn)消息過(guò)期時(shí)間不準(zhǔn)確的情況。
Java語(yǔ)言設(shè)置延時(shí)隊(duì)列
下面是使用 Java 語(yǔ)言通過(guò) RabbitMQ 設(shè)置延時(shí)隊(duì)列的步驟:
安裝插件
首先,需要安裝 rabbitmq_delayed_message_exchange
插件??梢酝ㄟ^(guò)以下命令安裝:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
創(chuàng)建延時(shí)交換機(jī)
延時(shí)隊(duì)列需要使用延時(shí)交換機(jī)??梢允褂?x-delayed-message
類型創(chuàng)建一個(gè)延時(shí)交換機(jī)。以下是創(chuàng)建延時(shí)交換機(jī)的示例代碼:
Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
創(chuàng)建延時(shí)隊(duì)列
創(chuàng)建延時(shí)隊(duì)列時(shí),需要將隊(duì)列綁定到延時(shí)交換機(jī)上,并設(shè)置隊(duì)列的 TTL(Time To Live)參數(shù)。以下是創(chuàng)建延時(shí)隊(duì)列的示例代碼:
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "delayed-exchange"); args.put("x-dead-letter-routing-key", "delayed-queue"); args.put("x-message-ttl", 5000); channel.queueDeclare("delayed-queue", true, false, false, args); channel.queueBind("delayed-queue", "delayed-exchange", "delayed-queue");
在上述代碼中,將隊(duì)列綁定到延時(shí)交換機(jī)上,并設(shè)置了隊(duì)列的 TTL 參數(shù)為 5000 毫秒,即消息在發(fā)送到隊(duì)列后,如果在 5000 毫秒內(nèi)沒(méi)有被消費(fèi)者消費(fèi),則會(huì)被轉(zhuǎn)發(fā)到 delayed-exchange
交換機(jī)上,并發(fā)送到 delayed-queue
隊(duì)列中。
發(fā)送延時(shí)消息
發(fā)送延時(shí)消息時(shí),需要設(shè)置消息的 expiration
屬性,該屬性表示消息的過(guò)期時(shí)間。以下是發(fā)送延時(shí)消息的示例代碼:
Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", 5000); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .headers(headers) .expiration("5000") .build(); channel.basicPublish("delayed-exchange", "delayed-queue", properties, "Hello, delayed queue!".getBytes());
在上述代碼中,設(shè)置了消息的 expiration
屬性為 5000 毫秒,并將消息發(fā)送到 delayed-exchange
交換機(jī)上,路由鍵為 delayed-queue
,消息內(nèi)容為 “Hello, delayed queue!”。
消費(fèi)延時(shí)消息
消費(fèi)延時(shí)消息時(shí),需要設(shè)置消費(fèi)者的 QOS(Quality of Service)參數(shù),以控制消費(fèi)者的并發(fā)處理能力。以下是消費(fèi)延時(shí)消息的示例代碼:
channel.basicQos(1); channel.basicConsume("delayed-queue", false, (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received message: " + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); });
在上述代碼中,設(shè)置了 QOS 參數(shù)為 1,即每次只處理一個(gè)消息。然后使用 basicConsume
方法消費(fèi) delayed-queue
隊(duì)列中的消息,并在消費(fèi)完成后,使用 basicAck
方法確認(rèn)消息已被消費(fèi)。
通過(guò)上述步驟,就可以實(shí)現(xiàn) RabbitMQ 延時(shí)隊(duì)列,用于實(shí)現(xiàn)定時(shí)任務(wù)等功能。
RabbitMQ延時(shí)隊(duì)列是一種常見(jiàn)的消息隊(duì)列應(yīng)用場(chǎng)景,它可以在消息發(fā)送后指定一定的時(shí)間后才能被消費(fèi)者消費(fèi),通常用于實(shí)現(xiàn)一些延時(shí)任務(wù),例如訂單超時(shí)未支付自動(dòng)取消等。
RabbitMQ延時(shí)隊(duì)列具體代碼
下面是具體代碼(附注釋):
import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class DelayedQueueExample { private static final String EXCHANGE_NAME = "delayed_exchange"; private static final String QUEUE_NAME = "delayed_queue"; private static final String ROUTING_KEY = "delayed_routing_key"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; */ // 創(chuàng)建一個(gè)支持延時(shí)隊(duì)列的Exchange Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, arguments); // 創(chuàng)建一個(gè)延時(shí)隊(duì)列,設(shè)置x-dead-letter-exchange和x-dead-letter-routing-key參數(shù) Map<String, Object> queueArguments = new HashMap<>(); queueArguments.put("x-dead-letter-exchange", ""); queueArguments.put("x-dead-letter-routing-key", QUEUE_NAME); queueArguments.put("x-message-ttl", 5000); channel.queueDeclare(QUEUE_NAME, true, false, false, queueArguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 發(fā)送消息到延時(shí)隊(duì)列中,設(shè)置expiration參數(shù) AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") .build(); String message = "Hello, delayed queue!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes()); System.out.println("Sent message to delayed queue: " + message); channel.close(); connection.close(); } }
在上面的代碼中,我們創(chuàng)建了一個(gè)支持延時(shí)隊(duì)列的Exchange,并創(chuàng)建了一個(gè)延時(shí)隊(duì)列,設(shè)置了x-dead-letter-exchange和x-dead-letter-routing-key參數(shù)。然后,我們發(fā)送了一條消息到延時(shí)隊(duì)列中,設(shè)置了expiration參數(shù),表示這條消息延時(shí)10秒后才能被消費(fèi)。
注意,如果我們想要消費(fèi)延時(shí)隊(duì)列中的消息,需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者,并監(jiān)聽(tīng)這個(gè)隊(duì)列。當(dāng)消息被消費(fèi)時(shí),需要發(fā)送ack確認(rèn)消息已經(jīng)被消費(fèi),否則消息會(huì)一直留在隊(duì)列中。
到此這篇關(guān)于RabbitMQ延時(shí)隊(duì)列詳解與Java代碼實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java實(shí)現(xiàn)RabbitMQ延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!