關(guān)于利用RabbitMQ實現(xiàn)延遲任務(wù)的方法詳解
開發(fā)過程中通常會碰到這樣的需求:
- 淘寶訂單業(yè)務(wù):下單后 30min 之內(nèi)沒有付款,就自動取消訂單。
- 餓了嗎訂餐通知:下單成功后 60s 之后給用戶發(fā)送短信通知。
- 關(guān)閉空閑連接:服務(wù)器中有很多客戶端的連接,空閑一段時間之后需要關(guān)閉之。
- 緩存:緩存中的對象,超過了空閑時間,從緩存中移出。
- 任務(wù)超時處理:在網(wǎng)絡(luò)協(xié)議滑動窗口請求應(yīng)答式交互時,處理超時未響應(yīng)的請求。
- 失敗重試機制:業(yè)務(wù)操作失敗后,間隔一定的時間進行失敗重試。
這類業(yè)務(wù)的特點就是:需要延遲工作,需要進行失敗重試。一種比較笨的方式是使用一個后臺線程,遍歷所有對象,挨個檢查。這種方法簡單好用,但是對象數(shù)量過多時,可能存在性能問題,檢查間隔時間不好設(shè)置,間隔時間過大,影響精確度,過小則存在效率問題,而且做不到按超時的時間順序處理。
再比如常見的場景:
場景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會遇到向終端下發(fā)命令,如果命令一段時間沒有應(yīng)答,就需要設(shè)置成超時。
場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動取消訂單。
上述類似的需求是我們經(jīng)常會遇見的問題。最常用的方法是定期輪訓(xùn)數(shù)據(jù)庫,設(shè)置狀態(tài)。在數(shù)據(jù)量小的時候并沒有什么大的問題,但是數(shù)據(jù)量一大輪訓(xùn)數(shù)據(jù)庫的方式就會變得特別耗資源。當面對千萬級、上億級數(shù)據(jù)量時,本身寫入的IO就比較高,導(dǎo)致長時間查詢或者根本就查不出來,更別說分庫分表以后了。除此之外,還有優(yōu)先級隊列,基于優(yōu)先級隊列的JDK延遲隊列,時間輪等方式。但如果系統(tǒng)的架構(gòu)中本身就有RabbitMQ的話,那么選擇RabbitMQ來實現(xiàn)類似的功能也是一種選擇。
使用RabbitMQ來實現(xiàn)延遲任務(wù)必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,通過這兩者的組合來實現(xiàn)上述需求。
消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設(shè)置TTL。對隊列設(shè)置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設(shè)置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設(shè)置了,消息也設(shè)置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設(shè)置)。這里單講單個消息的TTL,因為它才是實現(xiàn)延遲任務(wù)的關(guān)鍵。
可以通過設(shè)置消息的expiration字段或者x-message-ttl屬性來設(shè)置時間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個int類型的字符串:
byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
當上面的消息扔到隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去。單靠死信還不能實現(xiàn)延遲任務(wù),還要靠Dead Letter Exchange。
Dead Letter Exchanges
Exchage的概念在這里就不在贅述,一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應(yīng)很多隊列。
1. 一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
2. 上面的消息的TTL到了,消息過期了。
3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創(chuàng)建其他exchange沒有兩樣。只是在某一個設(shè)置Dead Letter Exchange的隊列中有消息過期了,會自動觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去。
實現(xiàn)延遲隊列
延遲任務(wù)通過消息的TTL和Dead Letter Exchange來實現(xiàn)。我們需要建立2個隊列,一個用于發(fā)送消息,一個用于消息過期后的轉(zhuǎn)發(fā)目標隊列。
生產(chǎn)者輸出消息到Queue1,并且這個消息是設(shè)置有有效時間的,比如60s。消息會在Queue1中等待60s,如果沒有消費者收掉的話,它就是被轉(zhuǎn)發(fā)到Queue2,Queue2有消費者,收到,處理延遲任務(wù)。
具體實現(xiàn)步驟如下:
第一步, 首先需要創(chuàng)建2個隊列。Queue1和Queue2。Queue1是一個消息緩沖隊列,在這個隊列里面實現(xiàn)消息的過期轉(zhuǎn)發(fā)。如下圖,設(shè)置Dead letter exchange和Dead letter routing key。設(shè)置這兩個屬性就是當消息在這個隊列中expire后,采用哪個路由發(fā)送。這個dlx的exchange需要事先創(chuàng)建好,就是一個普通的exchange。由于我們還需要向Queue1發(fā)送消息,那么還需要創(chuàng)建一個exchange,并且和Queue1綁定。例子中,exchange同樣取名:queue1。
我們還需要建一個Queue2,這個隊列用于消息在Queue1中過期后轉(zhuǎn)發(fā)的目標隊列。所以這個Queue2隊列建好以后,需要綁定Queue1設(shè)置的死信路由:dlx。完成Queue2的綁定以后,環(huán)境就搭建完成了。
第二步,實現(xiàn)消息的Producer。由于我們的目的是讓進入Queue1的消息過期,然后自動轉(zhuǎn)送到Queue2中,所以發(fā)送的時候,需要設(shè)置過期時間。
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("bsp"); factory.setPassword("123456"); factory.setVirtualHost("/"); factory.setHost("10.23.22.42"); factory.setPort(5672); conn = factory.newConnection(); channel = conn.createChannel(); byte[] messageBodyBytes = "Hello, world!".getBytes(); byte i = 10; while (i-- > 0) { channel.basicPublish("queue1", "queue1", new AMQP.BasicProperties.Builder().expiration(String.valueOf(i * 1000)).build(), new byte[] { i }); }
上面的代碼我模擬了1-10號消息,消息的內(nèi)容里面是1-10。過期的時間是10-1秒。這里要注意,雖然10是第一個發(fā)送,但是它過期的時間最長。
第三步,實現(xiàn)消息的Consumer。Consumer就是延遲任務(wù)的具體實施者。由于具體的任務(wù)往往是一個比較耗時的任務(wù),所以一般來說,任務(wù)一般在異步線程中執(zhí)行。
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("bsp"); factory.setPassword("123456"); factory.setVirtualHost("/"); factory.setHost("10.23.22.42"); factory.setPort(5672); conn = factory.newConnection(); channel = conn.createChannel(); channel.basicConsume("queue2", true, "consumer", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); //do some work async System.out.println(body[0]); } });
運行后如上面的程序,過了10s以后,消費者開始收到數(shù)據(jù),但是它是一次性收到如下結(jié)果:
10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一個收到的還是10。雖然10是第一個放進隊列,但是它的過期時間最長。所以由此可見,即使一個消息比在同一隊列中的其他消息提前過期,提前過期的也不會優(yōu)先進入死信隊列,它們還是按照入庫的順序讓消費者消費。如果第一進去的消息過期時間是1小時,那么死信隊列的消費者也許等1小時才能收到第一個消息。參考官方文檔發(fā)現(xiàn)“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當過期的消息到了隊列的頂端(隊首),才會被真正的丟棄或者進入死信隊列。
所以在考慮使用RabbitMQ來實現(xiàn)延遲任務(wù)隊列的時候,需要確保業(yè)務(wù)上每個任務(wù)的延遲時間是一致的。如果遇到不同的任務(wù)類型需要不同的延時的話,需要為每一種不同延遲時間的消息建立單獨的消息隊列。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關(guān)文章
ASP.NET Core 3.x 并發(fā)限制的實現(xiàn)代碼
這篇文章主要介紹了ASP.NET Core 3.x 并發(fā)限制的實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11ASP.NET Core使用JWT自定義角色并實現(xiàn)策略授權(quán)需要的接口
這篇文章介紹了ASP.NET Core使用JWT自定義角色并實現(xiàn)策略授權(quán)需要的接口,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-01-01asp.net(c#)ref,out ,params的區(qū)別
C#中有三個關(guān)鍵字-ref,out ,params,雖然本人不喜歡這三個關(guān)鍵字,因為它們疑似破壞面向?qū)ο筇匦?。但是既然m$把融入在c#體系中,那么我們就來認識一下參數(shù)修飾符ref,out ,params吧,還有它們的區(qū)別。2009-12-12