欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解

 更新時(shí)間:2023年12月08日 08:58:20   作者:GeorgiaStar  
這篇文章主要介紹了Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解,阻塞隊(duì)列是一個(gè)支持兩個(gè)附加操作的隊(duì)列,這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),從隊(duì)列中獲取元素的消費(fèi)者線(xiàn)程會(huì)一直等待直到隊(duì)列變?yōu)榉强?需要的朋友可以參考下

前言

從阻塞隊(duì)列說(shuō)起,阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。

這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),從隊(duì)列中獲取元素的消費(fèi)者線(xiàn)程會(huì)一直等待直到隊(duì)列變?yōu)榉强铡?/p>

當(dāng)隊(duì)列滿(mǎn)時(shí),向隊(duì)列中放置元素的生產(chǎn)者線(xiàn)程會(huì)等待直到隊(duì)列可用。

阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線(xiàn)程,消費(fèi)者是從隊(duì)列里拿元素的線(xiàn)程。

在阻塞隊(duì)列不可用時(shí),這兩個(gè)附加操作提供了4種處理方式:

  • 拋出異常:當(dāng)隊(duì)列滿(mǎn)時(shí),插入元素會(huì)拋出IllegalStateException;
  • 返回特殊值:offer()是入隊(duì)方法,當(dāng)插入成功時(shí)返回true,插入失敗返回false;poll()是出隊(duì)方法,當(dāng)出隊(duì)成功時(shí)返回元素的值,隊(duì)列為空時(shí)返回null
  • 一直阻塞:當(dāng)隊(duì)列滿(mǎn)時(shí),阻塞執(zhí)行插入方法的線(xiàn)程;當(dāng)隊(duì)列空時(shí),阻塞執(zhí)行出隊(duì)方法的線(xiàn)程
  • 超時(shí)退出:顧名思義

下面是Java常見(jiàn)的阻塞隊(duì)列。

  • ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
  • LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
  • PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列
  • DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列
  • SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列
  • LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列
  • LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列

DelayQueue解析

DelayQueue隊(duì)列中每個(gè)元素都有一個(gè)過(guò)期時(shí)間,并且隊(duì)列是個(gè)優(yōu)先級(jí)隊(duì)列,當(dāng)從隊(duì)列獲取元素的時(shí)候,只有過(guò)期元素才會(huì)出隊(duì),DelayQueue的類(lèi)結(jié)構(gòu)如下圖所示:

如圖DelayQueue中內(nèi)部使用的是PriorityQueue存放數(shù)據(jù),使用ReentrantLock實(shí)現(xiàn)線(xiàn)程同步。

另外隊(duì)列里面的元素要實(shí)現(xiàn)Delayed接口,一個(gè)是獲取當(dāng)前剩余時(shí)間的接口,一個(gè)是元素比較的接口,因?yàn)檫@個(gè)是有優(yōu)先級(jí)的隊(duì)列。

DelayQueue類(lèi)的主要成員

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 持有內(nèi)部重入鎖。
    private final transient ReentrantLock lock = new ReentrantLock();
    // 優(yōu)先級(jí)隊(duì)列,存放工作任務(wù)。
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    // 依賴(lài)于重入鎖的condition。
    private final Condition available = lock.newCondition();
}

元素入隊(duì)列

插入元素到隊(duì)列中主要三個(gè)方法,但實(shí)際上底層調(diào)用的都是offer(e)方法

/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {
    return offer(e);
}
/**
 * Inserts the specified element into this delay queue. As the queue is
 * unbounded this method will never block.
 *
 * @param e the element to add
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) {
    offer(e);
}
/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true}
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    //獲取到重入鎖
    lock.lock();
    try {
        q.offer(e);
        //添加成功元素
        if (q.peek() == e) {
            leader = null;
            //將等待隊(duì)列中的頭節(jié)點(diǎn)移動(dòng)到同步隊(duì)列。
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

首先獲取獨(dú)占鎖,然后添加元素到優(yōu)先級(jí)隊(duì)列,由于q是優(yōu)先級(jí)隊(duì)列,所以添加完元素后,peek()方法返回的并不一定是剛才添加的元素,如果判斷為true,說(shuō)明當(dāng)前元素e的優(yōu)先級(jí)最小也就是即將過(guò)期的,這時(shí)候激活avaliable變量條件隊(duì)列里面的線(xiàn)程,通知它們隊(duì)列里面有元素了。

從隊(duì)列中取元素

有兩個(gè)方法可以取元素(都是取隊(duì)頭),poll()方法取隊(duì)頭當(dāng)隊(duì)頭元素沒(méi)過(guò)期時(shí)返回null,take()方法取隊(duì)頭當(dāng)隊(duì)頭元素沒(méi)過(guò)期時(shí)會(huì)一直等待。

/**
 * Retrieves and removes the head of this queue, or returns {@code null}
 * if this queue has no elements with an expired delay.
 *
 * @return the head of this queue, or {@code null} if this
 *         queue has no elements with an expired delay
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        //如果隊(duì)列為空,或者不為空但是隊(duì)頭元素沒(méi)有過(guò)期則返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    // 獲取鎖。每個(gè)延遲隊(duì)列內(nèi)聚了一個(gè)重入鎖。
    final ReentrantLock lock = this.lock;
    // 獲取可中斷的鎖。
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 嘗試從優(yōu)先級(jí)隊(duì)列中獲取隊(duì)列頭部元素,獲取但不移除
            E first = q.peek();
            if (first == null)
                //無(wú)元素,當(dāng)前線(xiàn)程節(jié)點(diǎn)加入等待隊(duì)列,并阻塞當(dāng)前線(xiàn)程
                available.await();
            else {
                // 通過(guò)延遲任務(wù)的getDelay()方法獲取延遲時(shí)間
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //延遲時(shí)間到期,獲取并刪除頭部元素。
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 線(xiàn)程節(jié)點(diǎn)進(jìn)入等待隊(duì)列 x 納秒。
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 若還存在元素的話(huà),則將等待隊(duì)列頭節(jié)點(diǎn)中的線(xiàn)程節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中。
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

重點(diǎn)說(shuō)一下take()方法,第一次調(diào)用take時(shí)候由于隊(duì)列空,所以把當(dāng)前線(xiàn)程放入available的條件隊(duì)列中等待,當(dāng)執(zhí)行offer()成功并且添加的新元素恰好就是優(yōu)先級(jí)隊(duì)列的隊(duì)首時(shí)就會(huì)通知最先等待的線(xiàn)程激活,循環(huán)重新獲取隊(duì)首元素,這時(shí)候first假如不空,則調(diào)用getDelay()方法看該元素還剩下多少時(shí)間就過(guò)期了,如果delay<=0則說(shuō)明已經(jīng)過(guò)期,則直接出隊(duì)返回。

否則看leader是否為null,不為null則說(shuō)明是其他線(xiàn)程也在執(zhí)行take()則把當(dāng)前線(xiàn)程放入條件隊(duì)列,否則就是只有當(dāng)前線(xiàn)程在執(zhí)行take()方法,則當(dāng)前線(xiàn)程await直到剩余過(guò)期時(shí)間到,這期間該線(xiàn)程會(huì)釋放鎖,所以其他線(xiàn)程可以offer()添加元素,也可以take()阻塞自己,剩余過(guò)期時(shí)間到后,當(dāng)前線(xiàn)程會(huì)重新競(jìng)爭(zhēng)鎖,重新進(jìn)入循環(huán)。

如果已經(jīng)具備了JUC包中的Lock接口以及AQS的相關(guān)知識(shí),上述代碼大部分應(yīng)該都比較容易理解。

DelayQueue將實(shí)現(xiàn)了Delayed接口的對(duì)象添加到優(yōu)先級(jí)隊(duì)列中,通過(guò)在可重入鎖的Condition上調(diào)用await()方法,實(shí)現(xiàn)了延遲獲取阻塞隊(duì)列中元素的功能。

總結(jié)

  • DelayQueue是一個(gè)內(nèi)部依靠AQS隊(duì)列同步器所實(shí)現(xiàn)的無(wú)界延遲阻塞隊(duì)列。
  • 隊(duì)列中的延遲對(duì)象需要覆蓋getDelay()與compareTo()方法,并且要注意 getDelay()的時(shí)間單位的統(tǒng)一,compareTo()根據(jù)業(yè)務(wù)邏輯進(jìn)行合理的比較邏輯重寫(xiě)。
  • DelayQueue中內(nèi)聚的可重入鎖是非公平的。
  • 延遲隊(duì)列是實(shí)現(xiàn)定時(shí)任務(wù)的關(guān)鍵,ScheduledThreadPoolExecutor中的任務(wù)隊(duì)列是DelayedWorkQueue,其和DelayedQueue高度類(lèi)似,也是一個(gè)延遲隊(duì)列。

DelayQueue使用例子

寫(xiě)一個(gè)簡(jiǎn)單的例子:

public class DelayQueueTest {
    public static final int SIZE = 10;
    public static void main(String[] args) {
    	DelayQueueTest test = new DelayQueueTest();
        //初始化線(xiàn)程池
        BlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor
            (5, 10, 10, TimeUnit.MILLISECONDS,
                arrayBlockingQueue, Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        DelayQueue<DelayedTask> delayTaskQueue = new DelayQueue<>();
        //模擬SIZE個(gè)延遲任務(wù)
        for (byte i = 0; i < SIZE; i++) {
            Long runAt = System.currentTimeMillis() + 1000 * i;
            String name = "Zhang_" + i;
            byte age = (byte)(10 + i);
            String gender = (i % 2 == 0 ? "male" : "female");
            Student student = new StudentBuilder(name, age, gender).height(150 + i).province("ZheJiang").build();
            delayTaskQueue.put(new DelayedTask<Student>(student, 1, function -> test.print(student), runAt));
        }
        while (true) {
            if (delayTaskQueue.size() == 0) {
                break;
            }
            try {
                //從延遲隊(duì)列中取值,如果沒(méi)有對(duì)象過(guò)期則取到null
                DelayedTask delayedTask = delayTaskQueue.poll();
                if (delayedTask != null) {
                    threadPool.execute(delayedTask);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }
    public String print(Object object) {
      System.out.println(Thread.currentThread().getName());
        String str = ">>>junit log>>>" + object.getClass().getSimpleName() + ":" + object.toString();
        System.out.println(str);
        return str;
    }
    private static class DelayedTask<T> implements Delayed, Runnable {
        /**
         * 任務(wù)參數(shù)
         */
        private T taskParam;
        /**
         * 任務(wù)類(lèi)型
         */
        private Integer type;
        /**
         * 任務(wù)函數(shù)
         */
        private Function<T, String> function;
        /**
         * 任務(wù)執(zhí)行時(shí)刻
         */
        private Long runAt;
        public T getTaskParam() {
            return taskParam;
        }
        public Integer getType() {
            return type;
        }
        public Function<T, String> getFunction() {
            return function;
        }
        public Long getRunAt() {
            return runAt;
        }
        DelayedTask(T taskParam, Integer type, Function<T, String> function, Long runAt) {
            this.taskParam = taskParam;
            this.type = type;
            this.function = function;
            this.runAt = runAt;
        }
        @Override
        public void run() {
            if (taskParam != null) {
                function.apply(taskParam);
            }
        }
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.runAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        public int compareTo(Delayed o) {
            DelayedTask object = (DelayedTask)o;
            return this.runAt.compareTo(object.getRunAt());
        }
    }
}

運(yùn)行結(jié)果如下,由于10個(gè)元素的延遲時(shí)間均相差1秒,可以看到逐步打印的效果。

DelayQueue典型場(chǎng)景是重試機(jī)制實(shí)現(xiàn),比如當(dāng)調(diào)用接口失敗后,把當(dāng)前調(diào)用信息放入delay=10s的元素,然后把元素放入隊(duì)列,那么這個(gè)隊(duì)列就是一個(gè)重試隊(duì)列,一個(gè)線(xiàn)程通過(guò)take()方法獲取需要重試的接口,take()返回則接口進(jìn)行重試,失敗則再次放入隊(duì)列,同時(shí)也可以在元素加上重試次數(shù)。

到此這篇關(guān)于Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解的文章就介紹到這了,更多相關(guān)阻塞延遲隊(duì)列DelayQueue原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring注解中@Autowired和@Bean的區(qū)別詳解

    Spring注解中@Autowired和@Bean的區(qū)別詳解

    這篇文章主要詳細(xì)介紹了Spring注解中@Autowired和@Bean二者有什么區(qū)別,文中通過(guò)兩個(gè)注解的使用場(chǎng)景介紹了二者的區(qū)別,感興趣的同學(xué)可以參考閱讀
    2023-06-06
  • Java?Stream流中的filter()使用方法舉例詳解

    Java?Stream流中的filter()使用方法舉例詳解

    filter()是Java?Stream?API中的中間操作,用于根據(jù)給定的Predicate條件篩選流中的元素,它通過(guò)接收一個(gè)返回boolean值的函數(shù)(斷言)作為參數(shù),篩選出滿(mǎn)足條件的元素并收集到新的流中,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-11-11
  • Spring Boot項(xiàng)目中jar包在服務(wù)器上啟動(dòng)的正確姿勢(shì)

    Spring Boot項(xiàng)目中jar包在服務(wù)器上啟動(dòng)的正確姿勢(shì)

    這篇文章主要給大家介紹了關(guān)于Spring Boot項(xiàng)目中jar包在服務(wù)器上啟動(dòng)的正確姿勢(shì),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-01-01
  • 2018年java技術(shù)面試題整理

    2018年java技術(shù)面試題整理

    小編為大家整理了2018年最新的關(guān)于java技術(shù)相關(guān)的面試題,以及給出了最簡(jiǎn)簡(jiǎn)答方式,學(xué)習(xí)下吧。
    2018-02-02
  • SpringBoot數(shù)據(jù)校驗(yàn)及多環(huán)境配置的問(wèn)題詳解

    SpringBoot數(shù)據(jù)校驗(yàn)及多環(huán)境配置的問(wèn)題詳解

    這篇文章主要介紹了SpringBoot數(shù)據(jù)校驗(yàn)及多環(huán)境配置,本文以SpringBoot-02-Config 項(xiàng)目為例,給大家詳細(xì)介紹,需要的朋友可以參考下
    2021-09-09
  • Java數(shù)據(jù)類(lèi)型轉(zhuǎn)換實(shí)例解析

    Java數(shù)據(jù)類(lèi)型轉(zhuǎn)換實(shí)例解析

    這篇文章主要介紹了Java數(shù)據(jù)類(lèi)型轉(zhuǎn)換實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Java volatile關(guān)鍵字原理剖析與實(shí)例講解

    Java volatile關(guān)鍵字原理剖析與實(shí)例講解

    volatile是Java提供的一種輕量級(jí)的同步機(jī)制,Java?語(yǔ)言包含兩種內(nèi)在的同步機(jī)制:同步塊(或方法)和?volatile?變量,本文將詳細(xì)為大家總結(jié)Java volatile關(guān)鍵字,通過(guò)詳細(xì)的代碼示例給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2023-07-07
  • Java如何對(duì)返回參數(shù)進(jìn)行處理

    Java如何對(duì)返回參數(shù)進(jìn)行處理

    這篇文章主要介紹了Java如何對(duì)返回參數(shù)進(jìn)行處理問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • spring boot實(shí)現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可)

    spring boot實(shí)現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可)

    這篇文章主要介紹了spring boot實(shí)現(xiàn)阿里云視頻點(diǎn)播上傳視頻功能(復(fù)制粘貼即可),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-12-12
  • Java NIO 文件通道 FileChannel 用法及原理

    Java NIO 文件通道 FileChannel 用法及原理

    這篇文章主要介紹了Java NIO 文件通道 FileChannel 用法和原理,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-01-01

最新評(píng)論