欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java 并發(fā)編程ArrayBlockingQueue的實現(xiàn)

 更新時間:2021年02月19日 09:33:01   作者:maomaoJava  
這篇文章主要介紹了Java 并發(fā)編程ArrayBlockingQueue的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

一、簡介

ArrayBlockingQueue 顧名思義:基于數(shù)組的阻塞隊列。數(shù)組是要指定長度的,所以使用 ArrayBlockingQueue 時必須指定長度,也就是它是一個有界隊列。它實現(xiàn)了 BlockingQueue 接口,有著隊列、集合以及阻塞隊列的所有方法。

ArrayBlockingQueue 是線程安全的,內(nèi)部使用 ReentrantLock 來保證。ArrayBlockingQueue 支持對生產(chǎn)者線程和消費者線程進(jìn)行公平的調(diào)度。當(dāng)然默認(rèn)情況下是不保證公平性的,因為公平性通常會降低吞吐量,但是可以減少可變性和避免線程饑餓問題。

二、數(shù)據(jù)結(jié)構(gòu)

通常,隊列的實現(xiàn)方式有數(shù)組和鏈表兩種方式。對于數(shù)組這種實現(xiàn)方式來說,我們可以通過維護(hù)一個隊尾指針,使得在入隊的時候可以在 O(1)O(1) 的時間內(nèi)完成;但是對于出隊操作,在刪除隊頭元素之后,必須將數(shù)組中的所有元素都往前移動一個位置,這個操作的復(fù)雜度達(dá)到了 O(n)O(n),效果并不是很好。如下圖所示:

為了解決這個問題,我們可以使用另外一種邏輯結(jié)構(gòu)來處理數(shù)組中各個位置之間的關(guān)系。假設(shè)現(xiàn)在我們有一個數(shù)組 A[1…n],我們可以把它想象成一個環(huán)型結(jié)構(gòu),即 A[n] 之后是 A[1],相信了解過一致性 Hash 算法的童鞋應(yīng)該很容易能夠理解。

如下圖所示:我們可以使用兩個指針,分別維護(hù)隊頭和隊尾兩個位置,使入隊和出隊操作都可以在 O(1O(1 )的時間內(nèi)完成。當(dāng)然,這個環(huán)形結(jié)構(gòu)只是邏輯上的結(jié)構(gòu),實際的物理結(jié)構(gòu)還是一個普通的數(shù)組。

講完 ArrayBlockingQueue 的數(shù)據(jù)結(jié)構(gòu),接下來我們從源碼層面看看它是如何實現(xiàn)阻塞的。

三、源碼分析

3.1 屬性

// 隊列的底層結(jié)構(gòu)
final Object[] items;
// 隊頭指針
int takeIndex;
// 隊尾指針
int putIndex;
// 隊列中的元素個數(shù)
int count;

final ReentrantLock lock;

// 并發(fā)時的兩種狀態(tài)
private final Condition notEmpty;
private final Condition notFull;

items 是一個數(shù)組,用來存放入隊的數(shù)據(jù);count 表示隊列中元素的個數(shù);takeIndex 和 putIndex 分別代表隊頭和隊尾指針。

3.2 構(gòu)造方法

public ArrayBlockingQueue(int capacity) {
  this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
  this(capacity, fair);

  final ReentrantLock lock = this.lock;
  lock.lock(); // Lock only for visibility, not mutual exclusion
  try {
    int i = 0;
    try {
      for (E e : c) {
        checkNotNull(e);
        items[i++] = e;
      }
    } catch (ArrayIndexOutOfBoundsException ex) {
      throw new IllegalArgumentException();
    }
    count = i;
    putIndex = (i == capacity) ? 0 : i;
  } finally {
    lock.unlock();
  }
}

第一個構(gòu)造函數(shù)只需要指定隊列大小,默認(rèn)為非公平鎖;第二個構(gòu)造函數(shù)可以手動指定公平性和隊列大小;第三個構(gòu)造函數(shù)里面使用了 ReentrantLock 來加鎖,然后把傳入的集合元素按順序一個個放入 items 中。這里加鎖目的不是使用它的互斥性,而是讓 items 中的元素對其他線程可見(參考 AQS 里的 state 的 volatile 可見性)。

3.3 方法

3.3.1 入隊

ArrayBlockingQueue 提供了多種入隊操作的實現(xiàn)來滿足不同情況下的需求,入隊操作有如下幾種:

  • boolean add(E e)
  • void put(E e)
  • boolean offer(E e)
  • boolean offer(E e, long timeout, TimeUnit unit)

(1)add(E e)

public boolean add(E e) {
  return super.add(e);
}

//super.add(e)
public boolean add(E e) {
  if (offer(e))
    return true;
  else
    throw new IllegalStateException("Queue full");
}

可以看到 add 方法調(diào)用的是父類,也就是 AbstractQueue 的 add 方法,它實際上調(diào)用的就是 offer 方法。

(2)offer(E e)

我們接著上面的 add 方法來看 offer 方法:

public boolean offer(E e) {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    if (count == items.length)
      return false;
    else {
      enqueue(e);
      return true;
    }
  } finally {
    lock.unlock();
  }
}

offer 方法在隊列滿了的時候返回 false,否則調(diào)用 enqueue 方法插入元素,并返回 true。

private void enqueue(E x) {
  final Object[] items = this.items;
  items[putIndex] = x;
  // 圓環(huán)的index操作
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal();
}

enqueue 方法首先把元素放在 items 的 putIndex 位置,接著判斷在 putIndex+1 等于隊列的長度時把 putIndex 設(shè)置為0,也就是上面提到的圓環(huán)的 index 操作。最后喚醒等待獲取元素的線程。

(3)offer(E e, long timeout, TimeUnit unit)

該方法在 offer(E e) 的基礎(chǔ)上增加了超時的概念。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

  checkNotNull(e);
  // 把超時時間轉(zhuǎn)換成納秒
  long nanos = unit.toNanos(timeout);
  final ReentrantLock lock = this.lock;
  // 獲取一個可中斷的互斥鎖
  lock.lockInterruptibly();
  try {
    // while循環(huán)的目的是防止在中斷后沒有到達(dá)傳入的timeout時間,繼續(xù)重試
    while (count == items.length) {
      if (nanos <= 0)
        return false;
      // 等待nanos納秒,返回剩余的等待時間(可被中斷)
      nanos = notFull.awaitNanos(nanos);
    }
    enqueue(e);
    return true;
  } finally {
    lock.unlock();
  }
}

利用了 Condition 的 awaitNanos 方法,等待指定時間,因為該方法可中斷,所以這里利用 while 循環(huán)來處理中斷后還有剩余時間的問題,等待時間到了以后調(diào)用 enqueue 方法放入隊列。

(4)put(E e)

public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == items.length)
      notFull.await();
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

put 方法在 count 等于 items 長度時,一直等待,直到被其他線程喚醒。喚醒后調(diào)用 enqueue 方法放入隊列。

3.3.2 出隊

入隊列的方法說完后,我們來說說出隊列的方法。ArrayBlockingQueue 提供了多種出隊操作的實現(xiàn)來滿足不同情況下的需求,如下:

  • E poll()
  • E poll(long timeout, TimeUnit unit)
  • E take()
  • drainTo(Collection<? super E> c, int maxElements)

(1)poll()

public E poll() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return (count == 0) ? null : dequeue();
  } finally {
    lock.unlock();
  }
}

poll 方法是非阻塞方法,如果隊列沒有元素返回 null,否則調(diào)用 dequeue 把隊首的元素出隊列。

private E dequeue() {
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  notFull.signal();
  return x;
}

dequeue 會根據(jù) takeIndex 獲取到該位置的元素,并把該位置置為 null,接著利用圓環(huán)原理,在 takeIndex 到達(dá)列表長度時設(shè)置為0,最后喚醒等待元素放入隊列的線程。

(2)poll(long timeout, TimeUnit unit)

該方法是 poll() 的可配置超時等待方法,和上面的 offer 一樣,使用 while 循環(huán)配合 Condition 的 awaitNanos 來進(jìn)行等待,等待時間到后執(zhí)行 dequeue 獲取元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  long nanos = unit.toNanos(timeout);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == 0) {
      if (nanos <= 0)
        return null;
      nanos = notEmpty.awaitNanos(nanos);
    }
    return dequeue();
  } finally {
    lock.unlock();
  }
}

(3)take()

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == 0)
      notEmpty.await();
    return dequeue();
  } finally {
    lock.unlock();
  }
}

取走隊列里排在首位的對象,不同于 poll() 方法,若BlockingQueue為空,就阻塞等待直到有新的數(shù)據(jù)被加入。
(4)drainTo()

public int drainTo(Collection<? super E> c) {
  return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
  checkNotNull(c);
  if (c == this)
    throw new IllegalArgumentException();
  if (maxElements <= 0)
    return 0;
  final Object[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    int n = Math.min(maxElements, count);
    int take = takeIndex;
    int i = 0;
    try {
      while (i < n) {
        @SuppressWarnings("unchecked")
        E x = (E) items[take];
        c.add(x);
        items[take] = null;
        if (++take == items.length)
          take = 0;
        i++;
      }
      return n;
    } finally {
      // Restore invariants even if c.add() threw
      if (i > 0) {
        count -= i;
        takeIndex = take;
        if (itrs != null) {
          if (count == 0)
            itrs.queueIsEmpty();
          else if (i > take)
            itrs.takeIndexWrapped();
        }
        for (; i > 0 && lock.hasWaiters(notFull); i--)
          notFull.signal();
      }
    }
  } finally {
    lock.unlock();
  }
}

drainTo 相比于其他獲取方法,能夠一次性從隊列中獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù))。通過該方法,可以提升獲取數(shù)據(jù)效率,不需要多次分批加鎖或釋放鎖。

3.3.3 獲取元素

public E peek() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return itemAt(takeIndex); // null when queue is empty
  } finally {
    lock.unlock();
  }
}

final E itemAt(int i) {
  return (E) items[i];
}

這里獲取元素時上鎖是為了避免臟數(shù)據(jù)的產(chǎn)生。

3.3.4 刪除元素

我們可以想象一下,隊列中刪除某一個元素時,是不是要遍歷整個數(shù)據(jù)找到該元素,并把該元素后的所有元素往前移一位,也就是說,該方法的時間復(fù)雜度為 O(n)O(n)。

public boolean remove(Object o) {
  if (o == null) return false;
  final Object[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    if (count > 0) {
      final int putIndex = this.putIndex;
      int i = takeIndex;
       // 從takeIndex一直遍歷到putIndex,直到找到和元素o相同的元素,調(diào)用removeAt進(jìn)行刪除
      do {
        if (o.equals(items[i])) {
          removeAt(i);
          return true;
        }
        if (++i == items.length)
          i = 0;
      } while (i != putIndex);
    }
    return false;
  } finally {
    lock.unlock();
  }
}

remove 方法比較簡單,它從 takeIndex 一直遍歷到 putIndex,直到找到和元素 o 相同的元素,調(diào)用 removeAt 進(jìn)行刪除。我們重點來看一下 removeAt 方法。

void removeAt(final int removeIndex) {
  final Object[] items = this.items;
  if (removeIndex == takeIndex) {
    // removing front item; just advance
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    if (itrs != null)
      itrs.elementDequeued();
  } else {
    // an "interior" remove
    // slide over all others up through putIndex.
    final int putIndex = this.putIndex;
    for (int i = removeIndex;;) {
      int next = i + 1;
      if (next == items.length)
        next = 0;
      if (next != putIndex) {
        items[i] = items[next];
        i = next;
      } else {
        items[i] = null;
        this.putIndex = i;
        break;
      }
    }
    count--;
    if (itrs != null)
      itrs.removedAt(removeIndex);
  }
  notFull.signal();
}

removeAt 的處理方式和我想的稍微有一點出入,它內(nèi)部分為兩種情況來考慮:

  • removeIndex == takeIndex
  • removeIndex != takeIndex

也就是我考慮的時候沒有考慮邊界問題。當(dāng) removeIndex == takeIndex 時就不需要后面的元素整體往前移了,而只需要把 takeIndex的指向下一個元素即可(類比圓環(huán));當(dāng) removeIndex != takeIndex 時,通過 putIndex 將 removeIndex 后的元素往前移一位。

四、總結(jié)

ArrayBlockingQueue 是一個阻塞隊列,內(nèi)部由 ReentrantLock 來實現(xiàn)線程安全,由 Condition 的 await 和 signal 來實現(xiàn)等待喚醒的功能。它的數(shù)據(jù)結(jié)構(gòu)是數(shù)組,準(zhǔn)確的說是一個循環(huán)數(shù)組(可以類比一個圓環(huán)),所有的下標(biāo)在到達(dá)最大長度時自動從 0 繼續(xù)開始。

到此這篇關(guān)于Java 并發(fā)編程ArrayBlockingQueue的實現(xiàn)的文章就介紹到這了,更多相關(guān)Java 并發(fā)編程ArrayBlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java實現(xiàn)任意矩陣Strassen算法

    java實現(xiàn)任意矩陣Strassen算法

    這篇文章主要介紹了java實現(xiàn)任意矩陣Strassen算法的相關(guān)資料,需要的朋友可以參考下
    2016-02-02
  • SpringCloudGateway網(wǎng)關(guān)處攔截并修改請求的操作方法

    SpringCloudGateway網(wǎng)關(guān)處攔截并修改請求的操作方法

    這篇文章主要介紹了SpringCloudGateway網(wǎng)關(guān)處攔截并修改請求的操作方法,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2023-12-12
  • JDK源碼之Vector與HashSet解析

    JDK源碼之Vector與HashSet解析

    HashSet、HashMap、ArrayList、LinkedList、Vector這幾個在Java編程中經(jīng)常用到,他們之間有很多聯(lián)系,有很多相通的地方,我們這次先了解一下Vector與HashSet
    2021-06-06
  • Spring @Bean注解的使用場景與案例實現(xiàn)

    Spring @Bean注解的使用場景與案例實現(xiàn)

    隨著SpringBoot的流行,我們現(xiàn)在更多采用基于注解式的配置從而替換掉了基于XML的配置,所以本篇文章我們主要探討基于注解的@Bean以及和其他注解的使用
    2023-03-03
  • Java實現(xiàn)的Windows資源管理器實例

    Java實現(xiàn)的Windows資源管理器實例

    這篇文章主要介紹了Java實現(xiàn)的Windows資源管理器,實例分析了基于java實現(xiàn)windows資源管理器的相關(guān)技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-07-07
  • 詳解Java的文件與目錄管理以及輸入輸出相關(guān)操作

    詳解Java的文件與目錄管理以及輸入輸出相關(guān)操作

    這篇文章主要介紹了詳解Java的文件與目錄管理以及輸入輸出相關(guān)操作,是Java入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-09-09
  • 詳解Java synchronized關(guān)鍵字的用法

    詳解Java synchronized關(guān)鍵字的用法

    在多線程編程中常常使用鎖機制來確保同一時刻只有一個線程能夠修改共享內(nèi)存,在Java中一般是使用synchronized作為鎖機制,下面就讓我們來學(xué)習(xí)一下如何使用synchronized實現(xiàn)線程安全吧
    2023-08-08
  • SpringBoot2.x 整合 thumbnailator 圖片處理的示例代碼

    SpringBoot2.x 整合 thumbnailator 圖片處理的示例代碼

    這篇文章主要介紹了SpringBoot2.x 之整合 thumbnailator 圖片處理,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-10-10
  • 詳解mybatis.generator配上最新的mysql 8.0.11的一些坑

    詳解mybatis.generator配上最新的mysql 8.0.11的一些坑

    這篇文章主要介紹了詳解mybatis.generator配上最新的mysql 8.0.11的一些坑,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-10-10
  • Springboot引入多個yml方法(多種方案)

    Springboot引入多個yml方法(多種方案)

    SpringBoot默認(rèn)加載的是application.yml文件,所以想要引入其他配置的yml文件,就要在application.yml中激活該文件這篇文章主要介紹了Springboot引入多個yml方法,需要的朋友可以參考下
    2019-10-10

最新評論