Java延遲隊列原理與用法實例詳解
本文實例講述了Java延遲隊列原理與用法。分享給大家供大家參考,具體如下:
延時隊列,第一他是個隊列,所以具有對列功能第二就是延時,這就是延時對列,功能也就是將任務(wù)放在該延時對列中,只有到了延時時刻才能從該延時對列中獲取任務(wù)否則獲取不到……
應(yīng)用場景比較多,比如延時1分鐘發(fā)短信,延時1分鐘再次執(zhí)行等,下面先看看延時隊列demo之后再看延時隊列在項目中的使用:
簡單的延時隊列要有三部分:第一實現(xiàn)了Delayed接口的消息體、第二消費消息的消費者、第三存放消息的延時隊列,那下面就來看看延時隊列demo。
一、消息體
package com.delqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 消息體定義 實現(xiàn)Delayed接口就是實現(xiàn)兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期…… * * @author whd * @date 2017年9月24日 下午8:57:14 */ public class Message implements Delayed { private int id; private String body; // 消息內(nèi)容 private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。 public int getId() { return id; } public String getBody() { return body; } public long getExcuteTime() { return excuteTime; } public Message(int id, String body, long delayTime) { this.id = id; this.body = body; this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); } // 自定義實現(xiàn)比較方法返回 1 0 -1三個參數(shù) @Override public int compareTo(Delayed delayed) { Message msg = (Message) delayed; return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); } // 延遲任務(wù)是否到時就是按照這個方法判斷如果返回的是負(fù)數(shù)則說明到期否則還沒到期 @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); } }
二、消息消費者
package com.delqueue; import java.util.concurrent.DelayQueue; public class Consumer implements Runnable { // 延時隊列 ,消費者從其中獲取消息進(jìn)行消費 private DelayQueue<Message> queue; public Consumer(DelayQueue<Message> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Message take = queue.take(); System.out.println("消費消息id:" + take.getId() + " 消息體:" + take.getBody()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
三、延時隊列
package com.delqueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class DelayQueueTest { public static void main(String[] args) { // 創(chuàng)建延時隊列 DelayQueue<Message> queue = new DelayQueue<Message>(); // 添加延時消息,m1 延時3s Message m1 = new Message(1, "world", 3000); // 添加延時消息,m2 延時10s Message m2 = new Message(2, "hello", 10000); //將延時消息放到延時隊列中 queue.offer(m2); queue.offer(m1); // 啟動消費線程 消費添加到延時隊列中的消息,前提是任務(wù)到了延期時間 ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new Consumer(queue)); exec.shutdown(); } }
將消息體放入延遲隊列中,在啟動消費者線程去消費延遲隊列中的消息,如果延遲隊列中的消息到了延遲時間則可以從中取出消息否則無法取出消息也就無法消費。
這就是延遲隊列demo,下面我們來說說在真實環(huán)境下的使用。
使用場景描述:
在打車軟件中對訂單進(jìn)行派單的流程,當(dāng)有訂單的時候給該訂單篩選司機(jī),然后給當(dāng)訂單綁定司機(jī),但是有時運氣沒那么好,訂單進(jìn)來后第一次沒有篩選到合適的司機(jī),但我們也不能就此結(jié)束派單,而是將該訂單的信息放到延時隊列中過個2秒鐘在進(jìn)行一次,其實這個2秒鐘就是一個延遲,所以這里我們就可以使用延時隊列來實現(xiàn)……
下面看看簡單的流程圖:
下面來看看具體代碼實現(xiàn):
在項目中有如下幾個類:第一 、任務(wù)類 第二、按照任務(wù)類組裝的消息體類 第三、延遲隊列管理類
任務(wù)類即執(zhí)行篩選司機(jī)、綁單、push消息的任務(wù)類
package com.test.delayqueue; /** * 具體執(zhí)行相關(guān)業(yè)務(wù)的業(yè)務(wù)類 * @author whd * @date 2017年9月25日 上午12:49:32 */ public class DelayOrderWorker implements Runnable { @Override public void run() { // TODO Auto-generated method stub //相關(guān)業(yè)務(wù)邏輯處理 System.out.println(Thread.currentThread().getName()+" do something ……"); } }
消息體類,在延時隊列中這個實現(xiàn)了Delayed接口的消息類是比不可少的,實現(xiàn)接口時有一個getDelay(TimeUnit unit)方法,這個方法就是判斷是否到期的
這里定義的是一個泛型類,所以可以將我們上面的任務(wù)類作為其中的task,這樣就將任務(wù)類分裝成了一個消息體
package com.test.delayqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 延時隊列中的消息體將任務(wù)封裝為消息體 * * @author whd * @date 2017年9月25日 上午12:48:30 * @param <T> */ public class DelayOrderTask<T extends Runnable> implements Delayed { private final long time; private final T task; // 任務(wù)類,也就是之前定義的任務(wù)類 /** * @param timeout * 超時時間(秒) * @param task * 任務(wù) */ public DelayOrderTask(long timeout, T task) { this.time = System.nanoTime() + timeout; this.task = task; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub DelayOrderTask other = (DelayOrderTask) o; long diff = time - other.time; if (diff > 0) { return 1; } else if (diff < 0) { return -1; } else { return 0; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int hashCode() { return task.hashCode(); } public T getTask() { return task; } }
延時隊列管理類,這個類主要就是將任務(wù)類封裝成消息并并添加到延時隊列中,以及輪詢延時隊列從中取出到時的消息體,在獲取任務(wù)類放到線程池中執(zhí)行任務(wù)
package com.test.delayqueue; import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 延時隊列管理類,用來添加任務(wù)、執(zhí)行任務(wù) * * @author whd * @date 2017年9月25日 上午12:44:59 */ public class DelayOrderQueueManager { private final static int DEFAULT_THREAD_NUM = 5; private static int thread_num = DEFAULT_THREAD_NUM; // 固定大小線程池 private ExecutorService executor; // 守護(hù)線程 private Thread daemonThread; // 延時隊列 private DelayQueue<DelayOrderTask<?>> delayQueue; private static final AtomicLong atomic = new AtomicLong(0); private final long n = 1; private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); private DelayOrderQueueManager() { executor = Executors.newFixedThreadPool(thread_num); delayQueue = new DelayQueue<>(); init(); } public static DelayOrderQueueManager getInstance() { return instance; } /** * 初始化 */ public void init() { daemonThread = new Thread(() -> { execute(); }); daemonThread.setName("DelayQueueMonitor"); daemonThread.start(); } private void execute() { while (true) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); System.out.println("當(dāng)前存活線程數(shù)量:" + map.size()); int taskNum = delayQueue.size(); System.out.println("當(dāng)前延時任務(wù)數(shù)量:" + taskNum); try { // 從延時隊列中獲取任務(wù) DelayOrderTask<?> delayOrderTask = delayQueue.take(); if (delayOrderTask != null) { Runnable task = delayOrderTask.getTask(); if (null == task) { continue; } // 提交到線程池執(zhí)行task executor.execute(task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 添加任務(wù) * * @param task * @param time * 延時時間 * @param unit * 時間單位 */ public void put(Runnable task, long time, TimeUnit unit) { // 獲取延時時間 long timeout = TimeUnit.NANOSECONDS.convert(time, unit); // 將任務(wù)封裝成實現(xiàn)Delayed接口的消息體 DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); // 將消息體放到延時隊列中 delayQueue.put(delayOrder); } /** * 刪除任務(wù) * * @param task * @return */ public boolean removeTask(DelayOrderTask task) { return delayQueue.remove(task); } }
測試類
package com.delqueue; import java.util.concurrent.TimeUnit; import com.test.delayqueue.DelayOrderQueueManager; import com.test.delayqueue.DelayOrderWorker; public class Test { public static void main(String[] args) { DelayOrderWorker work1 = new DelayOrderWorker();// 任務(wù)1 DelayOrderWorker work2 = new DelayOrderWorker();// 任務(wù)2 DelayOrderWorker work3 = new DelayOrderWorker();// 任務(wù)3 // 延遲隊列管理類,將任務(wù)轉(zhuǎn)化消息體并將消息體放入延遲對列中等待執(zhí)行 DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); manager.put(work1, 3000, TimeUnit.MILLISECONDS); manager.put(work2, 6000, TimeUnit.MILLISECONDS); manager.put(work3, 9000, TimeUnit.MILLISECONDS); } }
OK 這就是項目中的具體使用情況,當(dāng)然具體內(nèi)容被忽略,整體框架就是這樣,還有這里使用java的延時隊列但是這種方式是有問題的如果如果down機(jī)則會出現(xiàn)任務(wù)丟失,所以也可以考慮使用mq、redis來實現(xiàn)……
更多關(guān)于java算法相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Java數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Java操作DOM節(jié)點技巧總結(jié)》、《Java文件與目錄操作技巧匯總》和《Java緩存操作技巧匯總》
希望本文所述對大家java程序設(shè)計有所幫助。
相關(guān)文章
Springboot詳細(xì)講解RocketMQ實現(xiàn)順序消息的發(fā)送與消費流程
RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時消息、消息回溯等,本篇我們了解如何實現(xiàn)順序消息的發(fā)送與消費2022-06-06springboot使用war包部署到外部tomcat過程解析
這篇文章主要介紹了springboot使用war包部署到外部tomcat過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-01-01Red?Hat?安裝JDK與IntelliJ?IDEA的詳細(xì)過程
YUM是基于Red Hat的Linux發(fā)行版的一個強(qiáng)大而用戶友好的包管理工具,這篇文章主要介紹了Red?Hat安裝JDK與IntelliJ IDEA,需要的朋友可以參考下2023-08-08Struts2中validate數(shù)據(jù)校驗的兩種方法詳解附Struts2常用校驗器
這篇文章主要介紹了Struts2中validate數(shù)據(jù)校驗的兩種方法及Struts2常用校驗器,本文介紹的非常詳細(xì),具有參考借鑒價值,感興趣的朋友一起看看吧2016-09-09