Java?DelayQueue實現(xiàn)任務(wù)延時示例講解
在項目中有使用到延時隊列的場景,做個簡單的記錄說明;首先DelayQueue實現(xiàn)了BlockingQueue,加入其中的元素必須實現(xiàn)Delayed接口;
當(dāng)生產(chǎn)者元素調(diào)用put往其中加入元素時,出發(fā)Delayed接口的compareTo方法進行排序,這個排序是按照時間的,按照計劃執(zhí)行的時間排序,先執(zhí)行的在前面,后執(zhí)行的排后面;消費者獲取元素時,調(diào)用getDelay方法返回的值大于0,則消費者線程wait返回的這個時間后,再從隊列頭部取出元素;下面是個簡單的例子
import org.jetbrains.annotations.NotNull; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayEntity implements Delayed { private static final Long currentTime = System.currentTimeMillis(); private String str; private Long scheduleTime; public DelayEntity(String str, Long delayed) { this.str = str; scheduleTime = System.currentTimeMillis() + (1000) * delayed; } @Override public long getDelay(@NotNull TimeUnit unit) { return unit.convert(scheduleTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(@NotNull Delayed o) { return (int) (this.scheduleTime - ((DelayEntity) o).scheduleTime); } public String getStr() { return str; } public Long getScheduleTime() { return scheduleTime; } public String showScheduleTime() { return "計劃執(zhí)行時間:" + new Date(this.scheduleTime).toString(); } }
@Test public void test() throws InterruptedException { DelayQueue<DelayEntity> delayQueue = new DelayQueue<>(); delayQueue.put(new DelayEntity("1", 1l)); delayQueue.put(new DelayEntity("2", 2l)); delayQueue.put(new DelayEntity("4", 3l)); while (true) { DelayEntity take = delayQueue.take(); System.out.println("參數(shù):" + take.getStr() + ";計劃執(zhí)行時間:" + take.showScheduleTime() + ";實際執(zhí)行時間:" + new Date().toString()); } }
下面看下take方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
這里可以看到,他不是一直循環(huán)的,是獲取到第一個元素的delay等待的時間,之后等待這個時間才去喚醒其他線程;
另外,添加元素時,add方法和put方法都是調(diào)用的offer方法,區(qū)別是一個返回值,一個沒有;
延伸下BlockingQueue的幾個常用的操作;
1.offer方法是BlockingQueue的,offer不會阻塞執(zhí)行的方法,可以添加返回true,否則返回false;
2.BlockingQueue的put方法,如果沒有空間,會阻塞一直等到有空間
3.poll獲取元素,不會阻塞,獲取不到就返回null;
4.take,獲取不到就阻塞
到此這篇關(guān)于Java DelayQueue實現(xiàn)任務(wù)延時示例講解的文章就介紹到這了,更多相關(guān)Java DelayQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決org.springframework.context.ApplicationContextException報錯的
這篇文章主要介紹了解決org.springframework.context.ApplicationContextException報錯的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06SpringBoot+WebSocket實現(xiàn)IM及時通訊的代碼示例
項目中碰到需要及時通訊的場景,使用springboot集成websocket,即可實現(xiàn)簡單的及時通訊,本文介紹springboot如何集成websocket、IM及時通訊需要哪些模塊、開發(fā)和部署過程中遇到的問題、以及實現(xiàn)小型IM及時通訊的代碼,需要的朋友可以參考下2023-10-10Springboot+AOP實現(xiàn)返回數(shù)據(jù)提示語國際化的示例代碼
這篇文章主要介紹了Springboot+AOP實現(xiàn)返回數(shù)據(jù)提示語國際化的示例代碼,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-07-07Mysql?json類型字段Java+Mybatis數(shù)據(jù)字典功能的實踐方式
這篇文章主要介紹了Mysql?json類型字段Java+Mybatis數(shù)據(jù)字典功能的實踐方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-08-08