詳解ArrayBlockQueue源碼解析
今天要講的是ArrayBlockQueue,ArrayBlockQueue是JUC提供的線程安全的有界的阻塞隊列,一看到Array,第一反應:這貨肯定和數(shù)組有關,既然是數(shù)組,那自然是有界的了,我們先來看看ArrayBlockQueue的基本使用方法,然后再看看ArrayBlockQueue的源碼。
ArrayBlockQueue基本使用
public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> arrayBlockingQueue=new ArrayBlockingQueue(5); arrayBlockingQueue.offer(10); arrayBlockingQueue.offer(50); arrayBlockingQueue.add(20); arrayBlockingQueue.add(60); System.out.println(arrayBlockingQueue); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue); System.out.println(arrayBlockingQueue.peek()); System.out.println(arrayBlockingQueue); }
運行結果:
- 創(chuàng)建了一個長度為5的ArrayBlockQueue。
- 用offer方法,向ArrayBlockQueue添加了兩個元素,分別是10,50。
- 用put方法,向ArrayBlockQueue添加了兩個元素,分別是20,60。
- 打印出ArrayBlockQueue,結果是10,50,20,60。
- 用poll方法,彈出ArrayBlockQueue第一個元素,并且打印出來:10。
- 打印出ArrayBlockQueue,結果是50,20,60。
- 用take方法,彈出ArrayBlockQueue第一個元素,并且打印出來:50。
- 打印出ArrayBlockQueue,結果是20,60。
- 用peek方法,彈出ArrayBlockQueue第一個元素,并且打印出來:20。
- 打印出ArrayBlockQueue,結果是20,60。
代碼比較簡單,但是你肯定會有疑問
- offer/add(在上面的代碼中沒有演示)/put都是往隊列里面添加元素,區(qū)別是什么?
- poll/take/peek都是彈出隊列的元素,區(qū)別是什么?
- 底層代碼是如何保證線程安全的?
- 數(shù)據(jù)保存在哪里?
要解決上面幾個疑問,最好的辦法當然是看下源碼,通過親自閱讀源碼所產生的印象遠遠要比看視頻,看博客,死記硬背最后的結論要深刻的多。就算真的忘記了,只要再看看源碼,瞬間可以回憶起來。
ArrayBlockQueue源碼解析
構造方法
ArrayBlockQueue提供了三個構造方法,如下圖所示:
ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
這是最常用的構造方法,傳入capacity,capacity是容量的意思,也就是ArrayBlockingQueue的最大長度,方法內部直接調用了第二個構造方法,傳入的第二個參數(shù)為false。
ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
這個構造方法接受兩個參數(shù),分別是capacity和fair,fair是boolean類型的,代表是公平鎖,還是非公平鎖,可以看出如果我們用第一個構造方法來創(chuàng)建ArrayBlockingQueue的話,采用的是非公平鎖,因為公平鎖會損失一定的性能,在沒有充足的理由的情況下,是沒有必要采用公平鎖的。
方法內部做了幾件事情:
- 創(chuàng)建Object類型的數(shù)組,容量為capacity,并且賦值給當前類對象的items。
- 創(chuàng)建排他鎖。
- 創(chuàng)建條件變量notEmpty 。
- 創(chuàng)建條件變量notFull。
至于排他鎖和兩個條件變量是做什么用的,看到后面就明白了。
ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //調用第二個構造方法,方法內部就是初始化數(shù)組,排他鎖,兩個條件變量 this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 開啟排他鎖 try { int i = 0; try { // 循環(huán)傳入的集合,把集合中的元素賦值給items數(shù)組,其中i會自增 for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i;//把i賦值給count //如果i==capacity,也就是到了最大容量,把0賦值給putIndex,否則把i賦值給putIndex putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock();//釋放排他鎖 } }
- 調用第二個構造方法,方法內部就是初始化數(shù)組items,排他鎖lock,以及兩個條件變量。
- 開啟排他鎖。
- 循環(huán)傳入的集合,將集合中的元素賦值給items數(shù)組,其中i會自增。
- 把i賦值給count。
- 如果i==capacity,說明到了最大的容量,就把0賦值給putIndex,否則把i賦值給putIndex。
- 在finally中釋放排他鎖。
看到這里,我們應該明白這個構造方法的作用是什么了,就是把傳入的集合作為ArrayBlockingQueuede初始化數(shù)據(jù),但是我們又會有一個新的疑問:count,putIndex 是做什么用的。
offer(E e)
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock();//開啟排他鎖 try { if (count == items.length)//如果count==items.length,返回false return false; else { enqueue(e);//入隊 return true;//返回true } } finally { lock.unlock();//釋放鎖 } }
- 開啟排他鎖。
- 如果count==items.length,也就是到了最大的容量,返回false。
- 如果count<items.length,執(zhí)行入隊方法,并且返回true。
- 釋放排他鎖。
看到這里,我們應該可以明白了,ArrayBlockQueue是如何保證線程安全的,還是利用了ReentrantLock排他鎖,count就是用來保存數(shù)組的當前大小的。我們再來看看enqueue方法。
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
這方法比較簡單,在代碼里面就不寫注釋了,做了如下的操作:
- 把x賦值給items[putIndex] 。
- 將putIndex進行自增,如果自增后的值 == items.length,把0賦值給putIndex 。
- 執(zhí)行count++操作。
- 調用條件變量notEmpty的signal方法,說明在某個地方,必定調用了notEmpty的await方法,這里就是喚醒因為調用notEmpty的await方法而被阻塞的線程。
這里就解答了一個疑問:putIndex是做什么的,就是入隊元素的下標。
add(E e)
public boolean add(E e) { return super.add(e); }
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
這個方法內部最終還是調用的offer方法。
put(E e)
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//開啟響應中斷的排他鎖 try { while (count == items.length)//如果隊列滿了,調用notFull的await notFull.await(); enqueue(e);//入隊 } finally { lock.unlock();//釋放排他鎖 } }
- 開啟響應中斷的排他鎖,如果在獲取鎖的過程中,當前的線程被中斷,會拋出異常。
- 如果隊列滿了,調用notFull的await方法,說明在某個地方,必定調用了notFull的signal方法來喚醒當前線程,這里用while循環(huán)是為了防止虛假喚醒。
- 執(zhí)行入隊操作。
- 釋放排他鎖。
可以看到put方法和 offer/add方法的區(qū)別了:
- offer/add:如果隊列滿了,直接返回false。
- put:如果隊列滿了,當前線程被阻塞,等待喚醒。
poll()
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
- 開啟排他鎖。
- 如果count==0,直接返回null,否則執(zhí)行dequeue出隊操作。
- 釋放排他鎖。
我們來看dequeue方法:
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex];//獲得元素的值 items[takeIndex] = null;//把null賦值給items[takeIndex] if (++takeIndex == items.length)//如果takeIndex自增后的值== items.length,就把0賦值給takeIndex takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();//喚醒因為調用notFull的await方法而被阻塞的線程 return x; }
- 獲取元素的值,takeIndex保存的是出隊的下標。
- 把null賦值給items[takeIndex],也就是清空被彈出的元素。
- 如果takeIndex自增后的值== items.length,就把0賦值給takeIndex。
- count--。
- 喚醒因為調用notFull的await方法而被阻塞的線程。
這里調用了notFull的signal方法來喚醒因為調用notFull的await方法而被阻塞的線程,那到底在哪里調用了notFull的await方法呢,還記不記得在put方法中調用了notFull的await方法,我們再看看:
while (count == items.length) notFull.await();
當隊列滿了,就調用 notFull.await()來等待,在出隊操作中,又調用了notFull.signal()來喚醒。
take()
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
- 開啟排他鎖。
- 如果count==0,代表隊列是空的,則調用notEmpty的await方法,用while循環(huán)是為了防止虛假喚醒。
- 執(zhí)行出隊操作。
- 釋放排他鎖。
這里調用了notEmpty的await方法,那么哪里調用了notEmpty的signal方法呢?在enqueue入隊方法里。
我們可以看到take和poll的區(qū)別:
- take:如果隊列為空,會阻塞,直到被喚醒了。
- poll: 如果隊列為空,直接返回null。
peek()
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
final E itemAt(int i) { return (E) items[i]; }
- 開啟排他鎖。
- 獲得元素。
- 釋放排他鎖。
我們可以看到peek和poll/take的區(qū)別:
- peek,只是獲取元素,不會清空元素。
- poll/take,獲取并清空元素。
size()
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
- 開啟排他鎖。
- 返回count。
- 釋放排他鎖。
總結
至此,ArrayBlockQueue的核心源碼就分析完畢了,我們來做一個總結:
- ArrayBlockQueue有幾個比較重要的字段,分別是items,保存的是隊列的數(shù)據(jù),putIndex保存的是入隊的下標,takeIndex保存的是出隊的下標,count用來統(tǒng)計隊列元素的個數(shù),lock用來保證線程的安全性,notEmpty和notFull兩個條件變量實現(xiàn)喚醒和阻塞。
- offer和add是一樣的,其中add方法內部調用的就是offer方法,如果隊列滿了,直接返回false。
- put,如果隊列滿了,會被阻塞。
- peek,只是彈出元素,不會清空元素。
- poll,彈出并清空元素,如果隊列為空,直接返回null。
- take,彈出并清空元素,如果隊列為空,會被阻塞。
以上所述是小編給大家介紹的ArrayBlockQueue源碼解析詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關文章
JDK1.7 之java.nio.file.Files 讀取文件僅需一行代碼實現(xiàn)
下面小編就為大家分享一篇JDK1.7 之java.nio.file.Files 讀取文件僅需一行代碼實現(xiàn),具有很好的參考價值,希望對大家有所幫助2017-11-11解決spring 處理request.getInputStream()輸入流只能讀取一次問題
這篇文章主要介紹了解決spring 處理request.getInputStream()輸入流只能讀取一次問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09淺談Java并發(fā) J.U.C之AQS:CLH同步隊列
AQS內部維護著一個FIFO隊列,該隊列就是CLH同步隊列。下面小編來簡單介紹下這個隊列2019-05-05SpringBoot Redis用注釋實現(xiàn)接口限流詳解
Redis 除了做緩存,還能干很多很多事情:分布式鎖、限流、處理請求接口冪等性。。。太多太多了~今天想和小伙伴們聊聊用 Redis 處理接口限流,這也是最近的 項目涉及到這個知識點了,我就拎出來和大家聊聊這個話題2022-07-07