Java延遲隊(duì)列原理與用法實(shí)例詳解
本文實(shí)例講述了Java延遲隊(duì)列原理與用法。分享給大家供大家參考,具體如下:
延時(shí)隊(duì)列,第一他是個(gè)隊(duì)列,所以具有對(duì)列功能第二就是延時(shí),這就是延時(shí)對(duì)列,功能也就是將任務(wù)放在該延時(shí)對(duì)列中,只有到了延時(shí)時(shí)刻才能從該延時(shí)對(duì)列中獲取任務(wù)否則獲取不到……
應(yīng)用場(chǎng)景比較多,比如延時(shí)1分鐘發(fā)短信,延時(shí)1分鐘再次執(zhí)行等,下面先看看延時(shí)隊(duì)列demo之后再看延時(shí)隊(duì)列在項(xiàng)目中的使用:
簡(jiǎn)單的延時(shí)隊(duì)列要有三部分:第一實(shí)現(xiàn)了Delayed接口的消息體、第二消費(fèi)消息的消費(fèi)者、第三存放消息的延時(shí)隊(duì)列,那下面就來(lái)看看延時(shí)隊(duì)列demo。
一、消息體
package com.delqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 消息體定義 實(shí)現(xiàn)Delayed接口就是實(shí)現(xiàn)兩個(gè)方法即compareTo 和 getDelay最重要的就是getDelay方法,這個(gè)方法用來(lái)判斷是否到期……
*
* @author whd
* @date 2017年9月24日 下午8:57:14
*/
public class Message implements Delayed {
private int id;
private String body; // 消息內(nèi)容
private long excuteTime;// 延遲時(shí)長(zhǎng),這個(gè)是必須的屬性因?yàn)橐凑者@個(gè)判斷延時(shí)時(shí)長(zhǎng)。
public int getId() {
return id;
}
public String getBody() {
return body;
}
public long getExcuteTime() {
return excuteTime;
}
public Message(int id, String body, long delayTime) {
this.id = id;
this.body = body;
this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
}
// 自定義實(shí)現(xiàn)比較方法返回 1 0 -1三個(gè)參數(shù)
@Override
public int compareTo(Delayed delayed) {
Message msg = (Message) delayed;
return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1
: (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);
}
// 延遲任務(wù)是否到時(shí)就是按照這個(gè)方法判斷如果返回的是負(fù)數(shù)則說(shuō)明到期否則還沒(méi)到期
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
二、消息消費(fèi)者
package com.delqueue;
import java.util.concurrent.DelayQueue;
public class Consumer implements Runnable {
// 延時(shí)隊(duì)列 ,消費(fèi)者從其中獲取消息進(jìn)行消費(fèi)
private DelayQueue<Message> queue;
public Consumer(DelayQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Message take = queue.take();
System.out.println("消費(fèi)消息id:" + take.getId() + " 消息體:" + take.getBody());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
三、延時(shí)隊(duì)列
package com.delqueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DelayQueueTest {
public static void main(String[] args) {
// 創(chuàng)建延時(shí)隊(duì)列
DelayQueue<Message> queue = new DelayQueue<Message>();
// 添加延時(shí)消息,m1 延時(shí)3s
Message m1 = new Message(1, "world", 3000);
// 添加延時(shí)消息,m2 延時(shí)10s
Message m2 = new Message(2, "hello", 10000);
//將延時(shí)消息放到延時(shí)隊(duì)列中
queue.offer(m2);
queue.offer(m1);
// 啟動(dòng)消費(fèi)線程 消費(fèi)添加到延時(shí)隊(duì)列中的消息,前提是任務(wù)到了延期時(shí)間
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(new Consumer(queue));
exec.shutdown();
}
}
將消息體放入延遲隊(duì)列中,在啟動(dòng)消費(fèi)者線程去消費(fèi)延遲隊(duì)列中的消息,如果延遲隊(duì)列中的消息到了延遲時(shí)間則可以從中取出消息否則無(wú)法取出消息也就無(wú)法消費(fèi)。
這就是延遲隊(duì)列demo,下面我們來(lái)說(shuō)說(shuō)在真實(shí)環(huán)境下的使用。
使用場(chǎng)景描述:
在打車(chē)軟件中對(duì)訂單進(jìn)行派單的流程,當(dāng)有訂單的時(shí)候給該訂單篩選司機(jī),然后給當(dāng)訂單綁定司機(jī),但是有時(shí)運(yùn)氣沒(méi)那么好,訂單進(jìn)來(lái)后第一次沒(méi)有篩選到合適的司機(jī),但我們也不能就此結(jié)束派單,而是將該訂單的信息放到延時(shí)隊(duì)列中過(guò)個(gè)2秒鐘在進(jìn)行一次,其實(shí)這個(gè)2秒鐘就是一個(gè)延遲,所以這里我們就可以使用延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)……
下面看看簡(jiǎn)單的流程圖:

下面來(lái)看看具體代碼實(shí)現(xiàn):
在項(xiàng)目中有如下幾個(gè)類(lèi):第一 、任務(wù)類(lèi) 第二、按照任務(wù)類(lèi)組裝的消息體類(lèi) 第三、延遲隊(duì)列管理類(lèi)
任務(wù)類(lèi)即執(zhí)行篩選司機(jī)、綁單、push消息的任務(wù)類(lèi)
package com.test.delayqueue;
/**
* 具體執(zhí)行相關(guān)業(yè)務(wù)的業(yè)務(wù)類(lèi)
* @author whd
* @date 2017年9月25日 上午12:49:32
*/
public class DelayOrderWorker implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
//相關(guān)業(yè)務(wù)邏輯處理
System.out.println(Thread.currentThread().getName()+" do something ……");
}
}
消息體類(lèi),在延時(shí)隊(duì)列中這個(gè)實(shí)現(xiàn)了Delayed接口的消息類(lèi)是比不可少的,實(shí)現(xiàn)接口時(shí)有一個(gè)getDelay(TimeUnit unit)方法,這個(gè)方法就是判斷是否到期的
這里定義的是一個(gè)泛型類(lèi),所以可以將我們上面的任務(wù)類(lèi)作為其中的task,這樣就將任務(wù)類(lèi)分裝成了一個(gè)消息體
package com.test.delayqueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延時(shí)隊(duì)列中的消息體將任務(wù)封裝為消息體
*
* @author whd
* @date 2017年9月25日 上午12:48:30
* @param <T>
*/
public class DelayOrderTask<T extends Runnable> implements Delayed {
private final long time;
private final T task; // 任務(wù)類(lèi),也就是之前定義的任務(wù)類(lèi)
/**
* @param timeout
* 超時(shí)時(shí)間(秒)
* @param task
* 任務(wù)
*/
public DelayOrderTask(long timeout, T task) {
this.time = System.nanoTime() + timeout;
this.task = task;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
DelayOrderTask other = (DelayOrderTask) o;
long diff = time - other.time;
if (diff > 0) {
return 1;
} else if (diff < 0) {
return -1;
} else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int hashCode() {
return task.hashCode();
}
public T getTask() {
return task;
}
}
延時(shí)隊(duì)列管理類(lèi),這個(gè)類(lèi)主要就是將任務(wù)類(lèi)封裝成消息并并添加到延時(shí)隊(duì)列中,以及輪詢(xún)延時(shí)隊(duì)列從中取出到時(shí)的消息體,在獲取任務(wù)類(lèi)放到線程池中執(zhí)行任務(wù)
package com.test.delayqueue;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 延時(shí)隊(duì)列管理類(lèi),用來(lái)添加任務(wù)、執(zhí)行任務(wù)
*
* @author whd
* @date 2017年9月25日 上午12:44:59
*/
public class DelayOrderQueueManager {
private final static int DEFAULT_THREAD_NUM = 5;
private static int thread_num = DEFAULT_THREAD_NUM;
// 固定大小線程池
private ExecutorService executor;
// 守護(hù)線程
private Thread daemonThread;
// 延時(shí)隊(duì)列
private DelayQueue<DelayOrderTask<?>> delayQueue;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n = 1;
private static DelayOrderQueueManager instance = new DelayOrderQueueManager();
private DelayOrderQueueManager() {
executor = Executors.newFixedThreadPool(thread_num);
delayQueue = new DelayQueue<>();
init();
}
public static DelayOrderQueueManager getInstance() {
return instance;
}
/**
* 初始化
*/
public void init() {
daemonThread = new Thread(() -> {
execute();
});
daemonThread.setName("DelayQueueMonitor");
daemonThread.start();
}
private void execute() {
while (true) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
System.out.println("當(dāng)前存活線程數(shù)量:" + map.size());
int taskNum = delayQueue.size();
System.out.println("當(dāng)前延時(shí)任務(wù)數(shù)量:" + taskNum);
try {
// 從延時(shí)隊(duì)列中獲取任務(wù)
DelayOrderTask<?> delayOrderTask = delayQueue.take();
if (delayOrderTask != null) {
Runnable task = delayOrderTask.getTask();
if (null == task) {
continue;
}
// 提交到線程池執(zhí)行task
executor.execute(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 添加任務(wù)
*
* @param task
* @param time
* 延時(shí)時(shí)間
* @param unit
* 時(shí)間單位
*/
public void put(Runnable task, long time, TimeUnit unit) {
// 獲取延時(shí)時(shí)間
long timeout = TimeUnit.NANOSECONDS.convert(time, unit);
// 將任務(wù)封裝成實(shí)現(xiàn)Delayed接口的消息體
DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);
// 將消息體放到延時(shí)隊(duì)列中
delayQueue.put(delayOrder);
}
/**
* 刪除任務(wù)
*
* @param task
* @return
*/
public boolean removeTask(DelayOrderTask task) {
return delayQueue.remove(task);
}
}
測(cè)試類(lèi)
package com.delqueue;
import java.util.concurrent.TimeUnit;
import com.test.delayqueue.DelayOrderQueueManager;
import com.test.delayqueue.DelayOrderWorker;
public class Test {
public static void main(String[] args) {
DelayOrderWorker work1 = new DelayOrderWorker();// 任務(wù)1
DelayOrderWorker work2 = new DelayOrderWorker();// 任務(wù)2
DelayOrderWorker work3 = new DelayOrderWorker();// 任務(wù)3
// 延遲隊(duì)列管理類(lèi),將任務(wù)轉(zhuǎn)化消息體并將消息體放入延遲對(duì)列中等待執(zhí)行
DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();
manager.put(work1, 3000, TimeUnit.MILLISECONDS);
manager.put(work2, 6000, TimeUnit.MILLISECONDS);
manager.put(work3, 9000, TimeUnit.MILLISECONDS);
}
}
OK 這就是項(xiàng)目中的具體使用情況,當(dāng)然具體內(nèi)容被忽略,整體框架就是這樣,還有這里使用java的延時(shí)隊(duì)列但是這種方式是有問(wèn)題的如果如果down機(jī)則會(huì)出現(xiàn)任務(wù)丟失,所以也可以考慮使用mq、redis來(lái)實(shí)現(xiàn)……
更多關(guān)于java算法相關(guān)內(nèi)容感興趣的讀者可查看本站專(zhuān)題:《Java數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Java操作DOM節(jié)點(diǎn)技巧總結(jié)》、《Java文件與目錄操作技巧匯總》和《Java緩存操作技巧匯總》
希望本文所述對(duì)大家java程序設(shè)計(jì)有所幫助。
相關(guān)文章
Springboot詳細(xì)講解RocketMQ實(shí)現(xiàn)順序消息的發(fā)送與消費(fèi)流程
RocketMQ作為一款純java、分布式、隊(duì)列模型的開(kāi)源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等,本篇我們了解如何實(shí)現(xiàn)順序消息的發(fā)送與消費(fèi)2022-06-06
springboot使用war包部署到外部tomcat過(guò)程解析
這篇文章主要介紹了springboot使用war包部署到外部tomcat過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01
Red?Hat?安裝JDK與IntelliJ?IDEA的詳細(xì)過(guò)程
YUM是基于Red Hat的Linux發(fā)行版的一個(gè)強(qiáng)大而用戶(hù)友好的包管理工具,這篇文章主要介紹了Red?Hat安裝JDK與IntelliJ IDEA,需要的朋友可以參考下2023-08-08
偵聽(tīng)消息隊(duì)列的Message Listener類(lèi)示例詳解
Spring AMQP 是基于 Spring 框架的AMQP消息解決方案,提供模板化的發(fā)送和接收消息的抽象層,提供基于消息驅(qū)動(dòng)的 POJO的消息監(jiān)聽(tīng)等,簡(jiǎn)化了我們對(duì)于RabbitMQ相關(guān)程序的開(kāi)發(fā),本文給大家介紹偵聽(tīng)消息隊(duì)列的Message Listener類(lèi),感興趣的朋友一起看看吧2023-12-12
Struts2中validate數(shù)據(jù)校驗(yàn)的兩種方法詳解附Struts2常用校驗(yàn)器
這篇文章主要介紹了Struts2中validate數(shù)據(jù)校驗(yàn)的兩種方法及Struts2常用校驗(yàn)器,本文介紹的非常詳細(xì),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧2016-09-09

