欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java并發(fā)編程中的阻塞隊列解析

 更新時間:2023年08月29日 09:25:18   作者:方騰飛  
這篇文章主要介紹了Java并發(fā)編程中的阻塞隊列解析,阻塞隊列BlockingQueue是一個支持兩個附加操作的隊列,這兩個附加的操作是在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?當隊列滿時,存儲元素的線程會等待隊列可用,需要的朋友可以參考下

1. 什么是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。

這兩個附加的操作是:

  • 在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡?/li>
  • 當隊列滿時,存儲元素的線程會等待隊列可用。

阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。

阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。

阻塞隊列提供了處理方法:

  • 返回特殊值:插入方法會返回是否成功,成功則返回 true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回 null
  • 一直阻塞:當阻塞隊列滿時,如果生產(chǎn)者線程往隊列里 put 元素,隊列會一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里 take 元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產(chǎn)者線程一段時間,如果超過一定的時間,生產(chǎn)者線程就會退出。

2. Java 里的阻塞隊列

JDK7 提供了 7 個阻塞隊列。分別是

  • ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。

ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產(chǎn)者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產(chǎn)者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創(chuàng)建一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現(xiàn)的,代碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
       if (capacity <= 0)
           throw new IllegalArgumentException();
       this.items = new Object[capacity];
       lock = new ReentrantLock(fair);
       notEmpty = lock.newCondition();
       notFull =  lock.newCondition();
}

LinkedBlockingQueue 是一個用鏈表實現(xiàn)的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

PriorityBlockingQueue 是一個支持優(yōu)先級的無界隊列。默認情況下元素采取自然順序排列,也可以通過比較器 comparator 來指定元素的排序規(guī)則。元素按照升序排列。

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現(xiàn)。隊列中的元素必須實現(xiàn) Delayed 接口,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。我們可以將 DelayQueue 運用在以下應用場景:

  • 緩存系統(tǒng)的設計:可以用 DelayQueue 保存緩存元素的有效期,使用一個線程循環(huán)查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示緩存有效期到了。
  • 定時任務調(diào)度。使用 DelayQueue 保存當天將會執(zhí)行的任務和執(zhí)行時間,一旦從 DelayQueue 中獲取到任務就開始執(zhí)行,從比如 TimerQueue 就是使用 DelayQueue 實現(xiàn)的。

隊列中的 Delayed 必須實現(xiàn) compareTo 來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實現(xiàn)代碼如下:

public int compareTo(Delayed other) {
          if (other == this) // compare zero ONLY if same object
               return 0;
           if (other instanceof ScheduledFutureTask) {
               ScheduledFutureTask x = (ScheduledFutureTask)other;
               long diff = time - x.time;
               if (diff < 0)
                   return -1;
               else if (diff > 0)
                   return 1;
      else if (sequenceNumber < x.sequenceNumber)
                   return -1;
               else
                   return 1;
           }
           long d = (getDelay(TimeUnit.NANOSECONDS) -
                     other.getDelay(TimeUnit.NANOSECONDS));
           return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
       }

如何實現(xiàn) Delayed 接口

我們可以參考 ScheduledThreadPoolExecutor 里 ScheduledFutureTask 類。

這個類實現(xiàn)了 Delayed 接口。

首先:在對象創(chuàng)建的時候,使用 time 記錄前對象什么時候可以使用,代碼如下:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
           super(r, result);
           this.time = ns;
           this.period = period;
           this.sequenceNumber = sequencer.getAndIncrement();
}

然后使用 getDelay 可以查詢當前元素還需要延時多久,代碼如下:

public long getDelay(TimeUnit unit) {
           return unit.convert(time - now(), TimeUnit.NANOSECONDS);
       }

通過構(gòu)造函數(shù)可以看出延遲時間參數(shù) ns 的單位是納秒,自己設計的時候最好使用納秒,因為 getDelay 時可以指定任意單位,一旦以納秒作為單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當 time 小于當前時間時,getDelay 會返回負數(shù)。

如何實現(xiàn)延時隊列

延時隊列的實現(xiàn)很簡單,當消費者從隊列里獲取元素時,如果元素沒有達到延時時間,就阻塞當前線程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
                   if (delay <= 0)
                       return q.poll();
                   else if (leader != null)
                       available.await();

SynchronousQueue 是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則不能繼續(xù)添加元素。SynchronousQueue 可以看成是一個傳球手,負責把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合于傳遞性場景, 比如在一個線程中使用的數(shù)據(jù),傳遞給另外一個線程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue 是一個由鏈表結(jié)構(gòu)組成的無界阻塞 TransferQueue 隊列。相對于其他阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

transfer 方法。如果當前有消費者正在等待接收元素(消費者使用 take() 方法或帶時間限制的 poll() 方法時),transfer 方法可以把生產(chǎn)者傳入的元素立刻 transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer 方法會將元素存放在隊列的 tail 節(jié)點,并等到該元素被消費者消費了才返回。transfer 方法的關(guān)鍵代碼如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代碼是試圖把存放當前元素的 s 節(jié)點作為 tail 節(jié)點。第二行代碼是讓 CPU 自旋等待消費者消費元素。因為自旋會消耗 CPU,所以自旋一定的次數(shù)后使用 Thread.yield() 方法來暫停當前正在執(zhí)行的線程,并執(zhí)行其他線程。

tryTransfer 方法。則是用來試探下生產(chǎn)者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回 false。和 transfer 方法的區(qū)別是 tryTransfer 方法無論消費者是否接收,方法立即返回。而 transfer 方法是必須等到消費者消費了才返回。

對于帶有時間限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,則是試圖把生產(chǎn)者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回 false,如果在超時時間內(nèi)消費了元素,則返回 true。

LinkedBlockingDeque 是一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊列的第一個元素。以 Last 單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法卻等同于 takeFirst,不知道是不是 Jdk 的 bug,使用時還是用帶有 First 和 Last 后綴的方法更清楚。

在初始化 LinkedBlockingDeque 時可以設置容量防止其過渡膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。

3. 阻塞隊列的實現(xiàn)原理

如果隊列是空的,消費者會一直等待,當生產(chǎn)者添加元素時候,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生產(chǎn)者和消費者能夠高效率的進行通訊呢?讓我們先來看看 JDK 是如何實現(xiàn)的。

使用通知模式實現(xiàn)。所謂通知模式,就是當生產(chǎn)者往滿的隊列里添加元素時會阻塞住生產(chǎn)者,當消費者消費了一個隊列中的元素后,會通知生產(chǎn)者當前隊列可用。通過查看 JDK 源碼發(fā)現(xiàn) ArrayBlockingQueue 使用了 Condition 來實現(xiàn),代碼如下:

private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
       // 省略其他代碼
       notEmpty = lock.newCondition();
       notFull =  lock.newCondition();
   }
public void put(E e) throws InterruptedException {
       checkNotNull(e);
       final ReentrantLock lock = this.lock;
       lock.lockInterruptibly();
       try {
           while (count == items.length)
               notFull.await();
           insert(e);
       } finally {
           lock.unlock();
       }
}
public E take() throws InterruptedException {
       final ReentrantLock lock = this.lock;
       lock.lockInterruptibly();
       try {
           while (count == 0)
               notEmpty.await();
           return extract();
 } finally {
           lock.unlock();
       }
}
private void insert(E x) {
       items[putIndex] = x;
       putIndex = inc(putIndex);
       ++count;
       notEmpty.signal();
   }

當我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產(chǎn)者主要通過 LockSupport.park(this); 來實現(xiàn)

public final void await() throws InterruptedException {
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               LockSupport.park(this);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null) // clean up if cancelled
               unlinkCancelledWaiters();
           if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
       }

繼續(xù)進入源碼,發(fā)現(xiàn)調(diào)用 setBlocker 先保存下將要阻塞的線程,然后調(diào)用 unsafe.park 阻塞當前線程。

public static void park(Object blocker) {
       Thread t = Thread.currentThread();
       setBlocker(t, blocker);
       unsafe.park(false, 0L);
       setBlocker(t, null);
   }

unsafe.park 是個 native 方法,代碼如下:

public native void park(boolean isAbsolute, long time);

park 這個方法會阻塞當前線程,只有以下四種情況中的一種發(fā)生時,該方法才會返回。

  • 與 park 對應的 unpark 執(zhí)行或已經(jīng)執(zhí)行時。注意:已經(jīng)執(zhí)行是指 unpark 先執(zhí)行,然后再執(zhí)行的 park。
  • 線程被中斷時。
  • 如果參數(shù)中的 time 不是零,等待了指定的毫秒數(shù)時。
  • 發(fā)生異?,F(xiàn)象時。這些異常事先無法確定。

我們繼續(xù)看一下 JVM 是如何實現(xiàn) park 方法的,park 在不同的操作系統(tǒng)使用不同的方式實現(xiàn),在 linux 下是使用的是系統(tǒng)方法 pthread_cond_wait 實現(xiàn)。

實現(xiàn)代碼在 JVM 源碼路徑 src/os/linux/vm/os_linux.cpp 里的 os::PlatformEvent::park 方法,代碼如下:

void os::PlatformEvent::park() {      
    	     int v ;
        for (;;) {
   	v = _Event ;
        if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
        }
        guarantee (v >= 0, "invariant") ;
        if (v == 0) {
        // Do this the hard way by blocking ...
        int status = pthread_mutex_lock(_mutex);
        assert_status(status == 0, status, "mutex_lock");
        guarantee (_nParked == 0, "invariant") ;
        ++ _nParked ;
        while (_Event < 0) {
        status = pthread_cond_wait(_cond, _mutex);
        // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
        // Treat this the same as if the wait was interrupted
        if (status == ETIME) { status = EINTR; }
        assert_status(status == 0 || status == EINTR, status, "cond_wait");
        }
        -- _nParked ;
        // In theory we could move the ST of 0 into _Event past the unlock(),
        // but then we'd need a MEMBAR after the ST.
        _Event = 0 ;
        status = pthread_mutex_unlock(_mutex);
        assert_status(status == 0, status, "mutex_unlock");
        }
        guarantee (_Event >= 0, "invariant") ;
        }
    }

pthread_cond_wait 是一個多線程的條件變量函數(shù),cond 是 condition 的縮寫,字面意思可以理解為線程在等待一個條件發(fā)生,這個條件是一個全局變量。

這個方法接收兩個參數(shù),一個共享變量 _cond,一個互斥量 _mutex。

而 unpark 方法在 linux 下是使用 pthread_cond_signal 實現(xiàn)的。

park 在 windows 下則是使用 WaitForSingleObject 實現(xiàn)的。

當隊列滿時,生產(chǎn)者往阻塞隊列里插入一個元素,生產(chǎn)者線程會進入 WAITING (parking) 狀態(tài)。

我們可以使用 jstack dump 阻塞的生產(chǎn)者線程看到這點:

"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
       at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
       at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)

到此這篇關(guān)于Java并發(fā)編程中的阻塞隊列解析的文章就介紹到這了,更多相關(guān)Java阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 關(guān)于RestTemplate的使用深度解析

    關(guān)于RestTemplate的使用深度解析

    這篇文章主要介紹了對RestTemplate的深度解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • SpringBoot深入講解單元測試與熱部署應用

    SpringBoot深入講解單元測試與熱部署應用

    這篇文章介紹了SpringBoot單元測試與熱部署,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-06-06
  • Java?HttpURLConnection使用方法與實例演示分析

    Java?HttpURLConnection使用方法與實例演示分析

    這篇文章主要介紹了Java?HttpURLConnection使用方法與實例演示,HttpURLConnection一個抽象類是標準的JAVA接口,該類位于java.net包中,它提供了基本的URL請求,響應等功能,下面我們來深入看看
    2023-10-10
  • springboot中JSONObject遍歷并替換部分json值

    springboot中JSONObject遍歷并替換部分json值

    這篇文章主要介紹了springboot中JSONObject遍歷并替換部分json值,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11
  • Java 實現(xiàn)常見的非對稱加密算法

    Java 實現(xiàn)常見的非對稱加密算法

    這篇文章主要介紹了Java 實現(xiàn)常見的非對稱加密算法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-11-11
  • Spring AOP實現(xiàn)功能權(quán)限校驗功能的示例代碼

    Spring AOP實現(xiàn)功能權(quán)限校驗功能的示例代碼

    本篇文章主要介紹了Spring AOP實現(xiàn)功能權(quán)限校驗功能的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-12-12
  • java反射機制的一些學習心得小結(jié)

    java反射機制的一些學習心得小結(jié)

    這篇文章主要給大家介紹了關(guān)于java反射機制的一些學習心得,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-02-02
  • javaweb圖書商城設計之圖書模塊(4)

    javaweb圖書商城設計之圖書模塊(4)

    這篇文章主要介紹了javaweb圖書商城設計之圖書模塊的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-11-11
  • 詳解springboot?springsecuroty中的注銷和權(quán)限控制問題

    詳解springboot?springsecuroty中的注銷和權(quán)限控制問題

    這篇文章主要介紹了springboot-springsecuroty?注銷和權(quán)限控制,賬戶注銷需要在SecurityConfig中加入開啟注銷功能的代碼,權(quán)限控制要導入springsecurity和thymeleaf的整合依賴,本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧
    2022-03-03
  • Java實現(xiàn)公眾號功能、關(guān)注及消息推送實例代碼

    Java實現(xiàn)公眾號功能、關(guān)注及消息推送實例代碼

    公眾號開發(fā)近些年是一個比較熱門的方向,下面這篇文章主要給大家介紹了關(guān)于Java實現(xiàn)公眾號功能、關(guān)注及消息推送的相關(guān)資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2023-11-11

最新評論