欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中的延遲隊(duì)列DelayQueue詳細(xì)解析

 更新時(shí)間:2023年12月08日 08:30:44   作者:菜鳥(niǎo)-小胖  
這篇文章主要介紹了Java中的延遲隊(duì)列DelayQueue詳細(xì)解析,JDK自身支持延遲隊(duì)列的數(shù)據(jù)結(jié)構(gòu),其實(shí)類:java.util.concurrent.DelayQueue,<BR>我們通過(guò)閱讀源碼的方式理解該延遲隊(duì)列類的實(shí)現(xiàn)過(guò)程,需要的朋友可以參考下

前言

JDK自身支持延遲隊(duì)列的數(shù)據(jù)結(jié)構(gòu),其實(shí)類:java.util.concurrent.DelayQueue。

我們通過(guò)閱讀源碼的方式理解該延遲隊(duì)列類的實(shí)現(xiàn)過(guò)程。

1.定義

DelayQueue:是一種支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列。

特性:

  1. 線程安全(多生產(chǎn)者,多消費(fèi)者)(單機(jī),如果想實(shí)現(xiàn)分布式,可以結(jié)合redis 消息分發(fā),如果需要較高數(shù)據(jù)可靠性可以考慮結(jié)合消息中間件等);
  2. 內(nèi)部元素有“延遲”特性:只有延遲到期的元素才允許被獲取;
  3. 具有優(yōu)先級(jí)特性的無(wú)界隊(duì)列,優(yōu)先級(jí)以元素延遲時(shí)間為標(biāo)準(zhǔn),最先過(guò)期的元素優(yōu)先級(jí)最高(隊(duì)首);
  4. 入隊(duì)操作不會(huì)被阻塞,獲取元素在特定情況會(huì)阻塞(隊(duì)列為空,隊(duì)首元素延遲未到期等);

根據(jù)其源碼分析為何如此定義以及其特性的由來(lái)。

DelayQueue繼承關(guān)系:

DelayQueue

類圖分析:

其核心繼承/實(shí)現(xiàn):

1.BlockingQueue:說(shuō)明其具有阻塞隊(duì)列的特性;

2.元素必實(shí)現(xiàn)接口Delayed,而Delayed繼承了接口Comparable。因此所有元素必須實(shí)現(xiàn)兩個(gè)方法:

compareTo方法用于元素比較; getDelay方法用于獲取元素剩余延時(shí)時(shí)間。

public interface Delayed extends Comparable<Delayed> {
    /**
     * 返回關(guān)聯(lián)對(duì)象的剩余延遲時(shí)間(可指定時(shí)間單位)
     */
    long getDelay(TimeUnit unit);
}

2.源碼

public class DelayQueue<E extends Delayed>
        extends AbstractQueue<E>
        implements BlockingQueue<E> {
    /**
     * 可重入鎖,用于保證線程安全
     */
    private final transient ReentrantLock lock = new ReentrantLock();
    /**
     * 優(yōu)先隊(duì)列(容器),實(shí)際存儲(chǔ)元素的地方
     */
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
     * 等待取元素線程的領(lǐng)導(dǎo)(leader)線程,有且僅有一個(gè)leader。
     * 具有最高優(yōu)先級(jí),第一個(gè)嘗試獲取元素的線程。
     * leader取完元素后,會(huì)喚醒新的等待線程成為新的leader。
     */
    private Thread leader = null;
    /**
     * 觸發(fā)條件,表示是否可以從隊(duì)列中讀取元素.
     * 用于等待(await())/通知(signal())其他線程
     */
    private final Condition available = lock.newCondition();
    /**
     * 構(gòu)造函數(shù)
     */
    public DelayQueue() {
    }
    /**
     * 構(gòu)造函數(shù): 調(diào)用addAll()方法:將集合c 存入隊(duì)列中
     *
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
    /*--------------------------添加元素(非阻塞)-------------------------------*/
    /**
     * 插入新元素.
     * 核心內(nèi)容見(jiàn):public boolean offer(E e)
     */
    public boolean add(E e) {
        return offer(e);
    }
    /**
     * 插入新元素.
     * 核心內(nèi)容見(jiàn):public boolean offer(E e)
     */
    public void put(E e) {
        offer(e);
    }
    /**
     * 插入新元素.
     * 核心內(nèi)容見(jiàn):public boolean offer(E e)
     * @param e       元素
     * @param timeout 此參數(shù)將被忽略,因?yàn)樵摲椒◤牟蛔枞◤U棄)
     * @param unit    此參數(shù)將被忽略,因?yàn)樵摲椒◤牟蛔枞◤U棄)
     * @return {@code true}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }
    /**
     * 插入新元素.(線程安全 lock)
     * 邏輯:
     *  1.入隊(duì);
     *  2.如果入隊(duì)元素為隊(duì)首元素(原隊(duì)列為空),喚醒一個(gè)等待的線程,通知獲取數(shù)據(jù)。
     *
     * @param e 元素
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 入隊(duì)
            q.offer(e);
            // 若該元素為隊(duì)列頭部元素(說(shuō)明原隊(duì)列為空),可以喚醒等待的線程取元素?cái)?shù)據(jù)
            if (q.peek() == e) {
                // 如果隊(duì)首元素是剛插入的元素,則設(shè)置leader為null(騰位置)
                leader = null;
                // 喚醒一個(gè)等待的線程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------取出(返回并刪除)元素-------------------------------*/
    /**
     * 取出延遲到期元素(非阻塞的).(線程安全 lock)
     * poll() 方法是非阻塞的,即調(diào)用之后無(wú)論元素是否存在/延遲到期都會(huì)立即返回。
     * 邏輯:
     * 1.查詢隊(duì)首元素;
     * 2.元素延遲到期返回,否則返回null
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 查詢隊(duì)首元素
            E first = q.peek();
            // 隊(duì)首元素為空或者延時(shí)未到期 返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0) {
                return null;
            } else {
                // 如果到期,取出并刪除隊(duì)首元素
                return q.poll();
            }
        } finally {
            lock.unlock();
        }
    }
  /**
     * 取出延遲到期元素(帶有超時(shí)時(shí)間,阻塞).(線程安全 lock)
     * 如果隊(duì)首元素未到期或者為null,等待:直到隊(duì)首元素延遲到期或者超出指定等待時(shí)間(timeout)
     * 邏輯(無(wú)限循環(huán)等待獲?。?
     * 宗旨:在不超出timeout的時(shí)間內(nèi),循環(huán)去取出延遲到期的隊(duì)首元素(前提無(wú)其他線程正在取數(shù)--互斥).
     * 1.查詢隊(duì)首元素;
     *  2.1.隊(duì)列空:等待timeout一段時(shí)間,直到等待超時(shí)(即timeout被重置小于等于0);
     *  2.2.隊(duì)列不為空:
     *      2.2.1. 隊(duì)首元素延遲到期,取出隊(duì)首元素(poll());
     *      2.2.2. 隊(duì)首元素延遲未到期:
     *          2.2.3 等待超時(shí) ,返回null;
     *          2.2.4 等待未超時(shí),等待時(shí)間<延遲時(shí)間或者有其他線程正在取數(shù)據(jù),繼續(xù)等待到超時(shí)到期
     *          2.2.5 等待為超時(shí),等待時(shí)間>=延遲時(shí)間并且無(wú)其他線程正在取數(shù)據(jù),該線程設(shè)置為leader等待到延遲到期(最后清空l(shuí)eader)
     * 3. 循環(huán)后,如果leader=null(無(wú)正在取數(shù)線程)并且隊(duì)列還有數(shù)據(jù),喚醒一個(gè)等待線程最終成為leader.
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 以可中斷方式獲取鎖
        lock.lockInterruptibly();
        try {
            for (; ; ) {
                // 獲取隊(duì)首元素
                E first = q.peek();
                if (first == null) {
                    // 若隊(duì)首元素為空(即隊(duì)列為空,這時(shí)就需要關(guān)注,當(dāng)前取值請(qǐng)求是否需要阻塞等待
                    // 等待時(shí)間小于等于0 ,不阻塞等待,直接返回null)
                    if (nanos <= 0) {
                        return null;
                    } else {
                        // 等待相應(yīng)的時(shí)間
                        nanos = available.awaitNanos(nanos);
                    }
                } else {
                    // 若隊(duì)列元素非空,獲取隊(duì)首元素剩余延遲時(shí)間
                    long delay = first.getDelay(NANOSECONDS);
                    // 延時(shí)過(guò)期 返回元素
                    if (delay <= 0) {
                        return q.poll();
                    }
                    // 延時(shí)未過(guò)期  等待時(shí)間超時(shí) ,不等待,直接返回null
                    if (nanos <= 0) {
                        return null;
                    }
                    first = null;
                    // 延時(shí)和等待都未到期且等待時(shí)間<延遲時(shí)間 或者 有其他線程在取數(shù)據(jù),當(dāng)前請(qǐng)求繼續(xù)等待
                    if (nanos < delay || leader != null) {
                        nanos = available.awaitNanos(nanos);
                    } else {
                        // 沒(méi)有其他線程等待,將當(dāng)前線程設(shè)置為 leader,類似于“獨(dú)占”操作
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待直到延遲到期
                            long timeLeft = available.awaitNanos(delay);
                            // 計(jì)算超時(shí)時(shí)間
                            nanos -= delay - timeLeft;
                        } finally {
                            // 該線程操作完畢,把 leader 置空
                            if (leader == thisThread) {
                                leader = null;
                            }
                        }
                    }
                }
            }
        } finally {
            // 如果leader線程為空 并且  queue非空,則喚醒其他等待線程
            if (leader == null && q.peek() != null) {
                available.signal();
            }
            lock.unlock();
        }
    }
   /**
     * 取出延遲到期元素(無(wú)超時(shí)時(shí)間限制,阻塞).(線程安全 lock)
     * 邏輯(無(wú)限循環(huán)等待獲?。?
     * 其邏輯參考poll(long timeout, TimeUnit unit).
     * 其區(qū)別在于:不受超時(shí)時(shí)間限制(timeout)
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 以可中斷方式獲取鎖
        lock.lockInterruptibly();
        try {
            // 無(wú)限循環(huán)
            for (; ; ) {
                // 獲取隊(duì)首元素
                E first = q.peek();
                if (first == null) {
                    // 若隊(duì)首元素為空(隊(duì)列為空),則等待
                    available.await();
                } else {
                    // 若隊(duì)列元素非空,獲取隊(duì)首元素剩余延遲時(shí)間
                    long delay = first.getDelay(NANOSECONDS);
                    // 延遲到期,獲取隊(duì)首元素
                    if (delay <= 0) {
                        return q.poll();
                    }
                    // 延時(shí)未過(guò)期
                    first = null;
                    // leader 不為空表示有其他線程在讀取數(shù)據(jù),當(dāng)前線程等待
                    if (leader != null) {
                        available.await();
                    } else {
                        // 沒(méi)有其他線程等待,將當(dāng)前線程設(shè)置為 leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待延遲時(shí)間過(guò)期
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread) {
                                leader = null;
                            }
                        }
                    }
                }
            }
        } finally {
            // 如果leader線程為空 并且  queue非空,則喚醒其他等待線程
            if (leader == null && q.peek() != null) {
                available.signal();
            }
            lock.unlock();
        }
    }
    /*--------------------------讀取隊(duì)首元素-------------------------------*/
    /**
     * 讀取隊(duì)首元素.(線程安全 lock)
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------讀取隊(duì)列長(zhǎng)度-------------------------------*/
    /**
     * 獲取隊(duì)列數(shù)據(jù)的長(zhǎng)度.(線程安全 lock)
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------獲取延遲到期元素集合-------------------------------*/
    /**
     * 將隊(duì)列中延遲到期數(shù)據(jù) 收集到集合C中.(線程安全 lock)
     * 
     * @return  返回延遲到期元素?cái)?shù)量
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null) {
            throw new NullPointerException();
        }
        if (c == this) {
            throw new IllegalArgumentException();
        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            //  peekExpired() 判斷隊(duì)首元素是否延遲到期
            for (E e; (e = peekExpired()) != null; ) {
                c.add(e);       
                q.poll();
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 將隊(duì)列中延遲到期數(shù)據(jù) 收集到集合C中(C集合總數(shù)有限制小于maxElements).(線程安全 lock)
     * @return  返回延遲到期元素?cái)?shù)量
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null) {
            throw new NullPointerException();
        }
        if (c == this) {
            throw new IllegalArgumentException();
        }
        if (maxElements <= 0) {
            return 0;
        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            // peekExpired() 判斷隊(duì)首元素是否延遲到期。并且到期元素總數(shù)不允許超過(guò)maxElements
            for (E e; n < maxElements && (e = peekExpired()) != null; ) {
                c.add(e);       
                q.poll();
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 讀取隊(duì)首元素(已延遲到期).(私有方法)
     */
    private E peekExpired() {
        // 獲取隊(duì)首元素
        E first = q.peek();
        // 隊(duì)首元素存在并且延遲到期,否則返回null
        return (first == null || first.getDelay(NANOSECONDS) > 0) ?
                null : first;
    }
    /*--------------------------刪除元素-------------------------------*/
    /**
     * 清除隊(duì)列中所有元素(線程安全 lock)--暴力清除
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.clear();
        } finally {
            lock.unlock();
        }
    }
    /**
     * 刪除指定元素O.(線程安全 lock)
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }
    /**
     * 刪除指定元素O.(這里指的是相同的對(duì)象引用/內(nèi)存地址)(線程安全 lock)
     */
    void removeEQ(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
                // 使用了對(duì)象引用/內(nèi)存地址相等比較
                if (o == it.next()) {
                    it.remove();
                    break;
                }
            }
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------隊(duì)列轉(zhuǎn)數(shù)組-------------------------------*/
    /**
     * 將隊(duì)列元素都復(fù)制到數(shù)組中(無(wú)序).(線程安全 lock)
     */
    public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray();
        } finally {
            lock.unlock();
        }
    }
    /**
     * 將隊(duì)列元素都復(fù)制到數(shù)組a中(無(wú)序).
     */
    public <T> T[] toArray(T[] a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray(a);
        } finally {
            lock.unlock();
        }
    }
    /*--------------------------私有內(nèi)部類--迭代器-------------------------------*/
   /**
     * 返回此隊(duì)列中所有元素(已過(guò)期和未過(guò)期)的迭代器。迭代器不按任何特定順序返回元素。
     */
    public Iterator<E> iterator() {
        return new Itr(toArray());
    }
    /**
     * 快照迭代器,用于處理底層 隊(duì)列/數(shù)組的副本。
     */
    private class Itr implements Iterator<E> {
        final Object[] array; // Array of all elements
        int cursor;           // index of next element to return
        int lastRet;          // index of last element, or -1 if no such
        Itr(Object[] array) {
            lastRet = -1;
            this.array = array;
        }
        public boolean hasNext() {
            return cursor < array.length;
        }
        @SuppressWarnings("unchecked")
        public E next() {
            if (cursor >= array.length)
                throw new NoSuchElementException();
            lastRet = cursor;
            return (E) array[cursor++];
        }
        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            removeEQ(array[lastRet]);
            lastRet = -1;
        }
    }
}

3.使用demo

使用DelayQueue實(shí)現(xiàn)延遲隊(duì)列:

優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單。

缺點(diǎn):可擴(kuò)展性較差,內(nèi)存限制、無(wú)持久化機(jī)制等。

 @SneakyThrows
    public static void main(String[] args) {
        DelayQueue<TestTask> testTaskDelayQueue = new DelayQueue<>();
        long time = System.currentTimeMillis();
        testTaskDelayQueue.offer(TestTask.builder().name("test_1").endTime(time + 10 * 1000).build());
        testTaskDelayQueue.offer(TestTask.builder().name("test_2").endTime(time + 4 * 1000).build());
        testTaskDelayQueue.offer(TestTask.builder().name("test_3").endTime(time + 16 * 1000).build());
        for(;;){
            System.out.println(testTaskDelayQueue.take());
            TimeUnit.SECONDS.sleep(2);
        }
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    private static class TestTask implements Delayed {
        private String name;
        private Long endTime;
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }
    }

到此這篇關(guān)于Java中的延遲隊(duì)列DelayQueue詳細(xì)解析的文章就介紹到這了,更多相關(guān)延遲隊(duì)列DelayQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java觀察者模式的深入了解

    Java觀察者模式的深入了解

    這篇文章主要為大家介紹了Java觀察者模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-01-01
  • spring boot創(chuàng)建和數(shù)據(jù)庫(kù)關(guān)聯(lián)模塊詳解

    spring boot創(chuàng)建和數(shù)據(jù)庫(kù)關(guān)聯(lián)模塊詳解

    這篇文章主要給大家介紹了關(guān)于spring boot創(chuàng)建和數(shù)據(jù)庫(kù)關(guān)聯(lián)模塊的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • idea如何反編譯jar包

    idea如何反編譯jar包

    文章介紹了如何使用IntelliJ IDEA反編譯JAR包,并詳細(xì)步驟包括安裝JavaBytecodeDecompiler插件、使用命令行反編譯、解決Java版本不兼容問(wèn)題以及推薦其他反編譯工具
    2025-02-02
  • Java實(shí)現(xiàn)的生成二維碼和解析二維碼URL操作示例

    Java實(shí)現(xiàn)的生成二維碼和解析二維碼URL操作示例

    這篇文章主要介紹了Java實(shí)現(xiàn)的生成二維碼和解析二維碼URL操作,結(jié)合實(shí)例形式分析了Java創(chuàng)建與解析二維碼,以及文件讀寫等相關(guān)操作技巧,需要的朋友可以參考下
    2018-07-07
  • Java如何實(shí)現(xiàn)文件壓縮與上傳FTP

    Java如何實(shí)現(xiàn)文件壓縮與上傳FTP

    這篇文章主要介紹了Java如何實(shí)現(xiàn)文件壓縮與上傳FTP,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • java?設(shè)計(jì)模式從風(fēng)控鏈理解責(zé)任鏈模式

    java?設(shè)計(jì)模式從風(fēng)控鏈理解責(zé)任鏈模式

    這篇文章主要為大家介紹了java?設(shè)計(jì)模式從風(fēng)控鏈理解責(zé)任鏈模式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • Java Annotation注解相關(guān)原理代碼總結(jié)

    Java Annotation注解相關(guān)原理代碼總結(jié)

    這篇文章主要介紹了Java Annotation注解相關(guān)原理代碼總結(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • IDEA 創(chuàng)建多級(jí)文件夾的操作

    IDEA 創(chuàng)建多級(jí)文件夾的操作

    這篇文章主要介紹了IDEA 創(chuàng)建多級(jí)文件夾的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • 解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題

    解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題

    這篇文章主要介紹了解決Intellij IDEA運(yùn)行報(bào)Command line is too long的問(wèn)題,本文通過(guò)兩種方案給大家詳細(xì)介紹,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-05-05
  • SpringBoot整合Swagger框架過(guò)程解析

    SpringBoot整合Swagger框架過(guò)程解析

    這篇文章主要介紹了SpringBoot整合Swagger框架過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05

最新評(píng)論