RocketMQ 延時(shí)級(jí)別配置方式
RocketMQ 支持定時(shí)消息,但是不支持任意時(shí)間精度,僅支持特定的 level,例如定時(shí) 5s, 10s, 1m 等。
其中,level=0 級(jí)表示不延時(shí),level=1 表示 1 級(jí)延時(shí),level=2 表示 2 級(jí)延時(shí),以此類(lèi)推。
如何配置:
在服務(wù)器端(rocketmq-broker端)的屬性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各級(jí)別與延時(shí)時(shí)間的對(duì)應(yīng)映射關(guān)系。
這個(gè)配置項(xiàng)配置了從1級(jí)開(kāi)始各級(jí)延時(shí)的時(shí)間,如1表示延時(shí)1s,2表示延時(shí)5s,14表示延時(shí)10m,可以修改這個(gè)指定級(jí)別的延時(shí)時(shí)間;
時(shí)間單位支持:s、m、h、d,分別表示秒、分、時(shí)、天;
默認(rèn)值就是上面聲明的,可手工調(diào)整;
默認(rèn)值已經(jīng)夠用,不建議調(diào)整【僅供參考,還是根據(jù)實(shí)際需要調(diào)整。調(diào)整默認(rèn)值時(shí)注意同時(shí)要修改時(shí)間對(duì)應(yīng)的level級(jí)別的值】
如何發(fā)送延時(shí)消息:
發(fā)送延時(shí)消息只需要在客戶(hù)端(rocketmq-client端)待發(fā)送的消息( com.alibaba.rocketmq.common.message.Message )中設(shè)置延時(shí)級(jí)別delayLevel即可。
Message msg = new Message(topicName,"",keys,message.getBytes()); msg.setDelayTimeLevel(delayLevel); SendResult sendResult = getMQProducer.send(msg);
RocketMQ定時(shí)(延遲)消息
RocketMQ 不支持任意時(shí)間自定義的延遲消息,僅支持內(nèi)置預(yù)設(shè)值的延遲時(shí)間間隔的延遲消息。
預(yù)設(shè)值的延遲時(shí)間間隔為:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
延時(shí)消息的使用場(chǎng)景
比如電商里,提交了一個(gè)訂單就可以發(fā)送一個(gè)延時(shí)消息,1h后去檢查這個(gè)訂單的狀態(tài),如果還是未付款就取消訂單釋放庫(kù)存。
生產(chǎn)
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class ProducerDelay { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.10.11:9876"); producer.start(); Message msg1 = new Message( JmsConfig.TOPIC, "訂單001".getBytes()); msg1.setDelayTimeLevel(2);//延遲5秒 Message msg2 = new Message( JmsConfig.TOPIC, "訂單001".getBytes()); msg2.setDelayTimeLevel(4);//延遲30秒 SendResult sendResult1 = producer.send(msg1); SendResult sendResult2 = producer.send(msg2); System.out.println("Product1-同步發(fā)送-Product信息={}" + sendResult1); System.out.println("Product2-同步發(fā)送-Product信息={}" + sendResult2); producer.shutdown(); } }
消費(fèi)
package com.xin.rocketmq.demo.testrun; import com.xin.rocketmq.demo.config.JmsConfig; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ConsumerDelay { public static void main(String[] args) throws Exception { // 實(shí)例化消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 設(shè)置NameServer的地址 consumer.setNamesrvAddr("192.168.10.11:9876"); // 訂閱一個(gè)或者多個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注冊(cè)消息監(jiān)聽(tīng)者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者 consumer.start(); } }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot接入釘釘自定義機(jī)器人預(yù)警通知
本文主要介紹了SpringBoot接入釘釘自定義機(jī)器人預(yù)警通知,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07如何使用Springfox?Swagger實(shí)現(xiàn)API自動(dòng)生成單元測(cè)試
Springfox是一個(gè)使用Java語(yǔ)言開(kāi)發(fā)開(kāi)源的API Doc的框架,它的前身是swagger-springmvc,可以將我們的Controller中的方法以文檔的形式展現(xiàn),這篇文章主要介紹了如何使用Springfox?Swagger實(shí)現(xiàn)API自動(dòng)生成單元測(cè)試,感興趣的朋友跟隨小編一起看看吧2024-04-04Maven項(xiàng)目web多圖片上傳及格式驗(yàn)證的實(shí)現(xiàn)
本文主要介紹了Maven項(xiàng)目web多圖片上傳及格式驗(yàn)證的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10Java設(shè)置Excel數(shù)據(jù)驗(yàn)證的示例代碼
數(shù)據(jù)驗(yàn)證是Excel 2013版本中,數(shù)據(jù)功能組下面的一個(gè)功能。本文將通過(guò)Java程序代碼演示數(shù)據(jù)驗(yàn)證的設(shè)置方法及結(jié)果,感興趣的可以了解一下2022-05-05