Java線程池隊列PriorityBlockingQueue原理分析
一、什么是PriorityBlockingQueue?
PriorityBlockingQueue隊列是 JDK1.5 的時候出來的一個阻塞隊列。但是該隊列入隊的時候是不會阻塞的,永遠(yuǎn)會加到隊尾。下面我們介紹下它的幾個特點:
- PriorityBlockingQueue 和 ArrayBlockingQueue 一樣是基于數(shù)組實現(xiàn)的,但后者在初始化時需要指定長度,前者默認(rèn)長度是 11。
- 該隊列可以說是真正的無界隊列,它在隊列滿的時候會進(jìn)行擴(kuò)容,而前面說的無界阻塞隊列其實都有有界,只是界限太大可以忽略(最大值是 2147483647)
- 該隊列屬于權(quán)重隊列,可以理解為它可以進(jìn)行排序,但是排序不是從小到大排或從大到小排,是基于數(shù)組的堆結(jié)構(gòu)(具體如何排下面會進(jìn)行分析)
- 出隊方式和前面的也不同,是根據(jù)權(quán)重來進(jìn)行出隊,和前面所說隊列中那種先進(jìn)先出或者先進(jìn)后出方式不同。
- 其存入的元素必須實現(xiàn)Comparator,或者在創(chuàng)建隊列的時候自定義Comparator
注意:
- 堆結(jié)構(gòu)實際上是一種完全二叉樹,建議學(xué)習(xí)前了解一下二叉樹。
- 堆又分為大頂堆和小頂堆。大頂堆中第一個元素肯定是所有元素中最大的,小頂堆中第一個元素是所有元素中最小的。
二、PriorityBlockingQueue類圖:
三、源碼解析
1、字段講解
從下面的字段我們可以知道,該隊列可以排序,使用顯示鎖來保證操作的原子性,在空隊列時,出隊線程會堵塞等。
/** * 默認(rèn)數(shù)組長度 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 最大達(dá)容量,分配時超出可能會出現(xiàn) OutOfMemoryError 異常 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 隊列,存儲我們的元素 */ private transient Object[] queue; /** * 隊列長度 */ private transient int size; /** * 比較器,入隊進(jìn)行權(quán)重的比較 */ private transient Comparator<? super E> comparator; /** * 顯示鎖 */ private final ReentrantLock lock; /** * 空隊列時進(jìn)行線程阻塞的 Condition 對象 */ private final Condition notEmpty;
2、構(gòu)造方法
/** * 默認(rèn)構(gòu)造,使用長度為 11 的數(shù)組,比較器為空 */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * 自定義數(shù)據(jù)長度構(gòu)造,比較器為空 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * 自定義數(shù)組長度,可以自定義比較器 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
3、入隊方法
3.1、put方法
入隊方法,下面可以看到 put 方法最終會調(diào)用 offer 方法,所以我們只看 offer 方法即可。
public void put(E e) { offer(e); // never need to block } public boolean offer(E e) { //判斷是否為空 if (e == null) throw new NullPointerException(); //顯示鎖 final ReentrantLock lock = this.lock; lock.lock(); //定義臨時對象 int n, cap; Object[] array; //判斷數(shù)組是否滿了 while ((n = size) >= (cap = (array = queue).length)) //數(shù)組擴(kuò)容 tryGrow(array, cap); try { //拿到比較器 Comparator<? super E> cmp = comparator; //判斷是否有自定義比較器 if (cmp == null) //堆上浮 siftUpComparable(n, e, array); else //使用自定義比較器進(jìn)行堆上浮 siftUpUsingComparator(n, e, array, cmp); //隊列長度 +1 size = n + 1; //喚醒休眠的出隊線程 notEmpty.signal(); } finally { //釋放鎖 lock.unlock(); } return true; }
3.2、上浮調(diào)整比較器方法的實現(xiàn)
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { //無符號向左移,目的是找到放入位置的父節(jié)點 int parent = (k - 1) >>> 1; //拿到父節(jié)點的值 Object e = array[parent]; //比較是否大于該元素,不大于就沒比較交換 if (key.compareTo((T) e) >= 0) break; //以下都是元素位置交換 array[k] = e; k = parent; } array[k] = key; }
根據(jù)上面的代碼,可以看出這是完全二叉樹在進(jìn)行上浮調(diào)整。調(diào)整入隊的元素,找出最小的,將元素排列有序化。簡單理解就是:父節(jié)點元素值一定要比它的子節(jié)點得小,如果父節(jié)點大于子節(jié)點了,那就兩者位置進(jìn)行交換。
3.3、入隊圖解
說的可能很模糊,我們先寫個 demo,根據(jù) demo 來進(jìn)行圖解分析:
/** * @Auther: Gentle * @Date: 2019/4/14 15:11 * @Description: PriorityBlockingQueue 簡單演示 demo */ public class TestPriorityBlockingQueue { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(); concurrentLinkedQueue.offer(10); concurrentLinkedQueue.offer(20); concurrentLinkedQueue.offer(5); concurrentLinkedQueue.offer(1); concurrentLinkedQueue.offer(25); concurrentLinkedQueue.offer(30); //輸出元素排列 concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" ")); //取出元素 Integer take = concurrentLinkedQueue.take(); System.out.println(); concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" ")); } }
上面可以看出,我們要入隊的元素是 [10,20,5,1,21,30],接下來我們用圖來演示一步步入隊情況。
隊列初始化時:
這時,我們開始將元素 元素 10 入隊,并用二叉樹輔助理解:
我們在將元素 20 入隊:
將元素 5 入隊后發(fā)現(xiàn)父節(jié)點大于子節(jié)點,這時需要進(jìn)行上浮調(diào)整
開始進(jìn)行上浮調(diào)整,將元素 10 和元素 5進(jìn)行位置調(diào)換,結(jié)果如下:
接著將元素 1 入隊后發(fā)現(xiàn)父節(jié)點大于子節(jié)點,繼續(xù)進(jìn)行調(diào)整:
第一次調(diào)整將元素 20 和元素 1 進(jìn)行位置交換,交換完畢后結(jié)果如下:
交換完畢后,我們發(fā)現(xiàn)父節(jié)點的元素值還是大于子節(jié)點,說明還需要進(jìn)行一次交換,最后交換結(jié)果如下:
接下來將元素 25 和 30 入隊,結(jié)果如下:
注: 最小堆的的頂端一定是元素值最小的那個。
4、出隊方法
4.1、take方法
出隊方法,該方法會阻塞
public E take() throws InterruptedException { //顯示鎖 final ReentrantLock lock = this.lock; //可中斷鎖 lock.lockInterruptibly(); //結(jié)果接受對象 E result; try { //判讀隊列是否為空 while ( (result = dequeue()) == null) //線程阻塞 notEmpty.await(); } finally { lock.unlock(); } return result; }
4.2、dequeue方法
具體出隊方法的實現(xiàn)
private E dequeue() { //長度減少 1 int n = size - 1; //判斷隊列中是否又元素 if (n < 0) return null; else { //隊列對象 Object[] array = queue; //取出第一個元素 E result = (E) array[0]; //拿出最后一個元素 E x = (E) array[n]; //置空 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) //下沉調(diào)整 siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); //成功則減少隊列中的元素數(shù)量 size = n; return result; } }
總體就是找到父節(jié)點與兩個子節(jié)點中最小的一個節(jié)點,然后進(jìn)行交換位置,不斷重復(fù),由上而下的交換。
4.3、下沉調(diào)整比較器方法的實現(xiàn)
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { //判斷隊列長度 if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //找到隊列最后一個元素的父節(jié)點的索引。 //如下圖最大元素是30 父節(jié)點是 10,對于索引是 2 int half = n >>> 1; // loop while a non-leaf while (k < half) { //拿到 k 節(jié)點下的左子節(jié)點 int child = (k << 1) + 1; // assume left child is least //取得子節(jié)點對應(yīng)的值 Object c = array[child]; //取得 k 右子節(jié)點的索引 int right = child + 1; //比較右節(jié)點的索引是否小于隊列長度和左右子節(jié)點的值進(jìn)行比較 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //比較父節(jié)點值是否大于子節(jié)點 if (key.compareTo((T) c) <= 0) break; //下面都是元素替換 array[k] = c; k = child; } array[k] = key; } }
4.4、出隊圖解
這時,我們需要從隊列中取出第一個元素 1,元素 1 取出時會與隊列中最后一個元素進(jìn)行交換,并將最后一個元素置空。(實際上源碼不是這么做的,源代碼中是用變量來保存索引,直到全部下沉調(diào)整完成才進(jìn)行替換)
替換后,結(jié)果就如下圖顯示一樣。我們發(fā)現(xiàn)父節(jié)點大于子節(jié)點了,所以還需要再一次進(jìn)行替換操作。 再一次替換后,將元素 30 下沉到下一個左邊子節(jié)點,子節(jié)點上浮到原父節(jié)點位置。這就完成了下沉調(diào)整了。
四、總結(jié)
PriorityBlockingQueue 真的是個神奇的隊列,可以實現(xiàn)優(yōu)先出隊。最特別的是它只有一個鎖,入隊操作永遠(yuǎn)成功,而出隊只有在空隊列的時候才會進(jìn)行線程阻塞??梢哉f有一定的應(yīng)用場景吧,比如:有任務(wù)要執(zhí)行,可以對任務(wù)加一個優(yōu)先級的權(quán)重,這樣隊列會識別出來,對該任務(wù)優(yōu)先進(jìn)行出隊。
到此這篇關(guān)于Java線程池隊列PriorityBlockingQueue原理分析的文章就介紹到這了,更多相關(guān)Java的PriorityBlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實現(xiàn)的生成二維碼和解析二維碼URL操作示例
這篇文章主要介紹了Java實現(xiàn)的生成二維碼和解析二維碼URL操作,結(jié)合實例形式分析了Java創(chuàng)建與解析二維碼,以及文件讀寫等相關(guān)操作技巧,需要的朋友可以參考下2018-07-07java中的數(shù)學(xué)計算函數(shù)的總結(jié)
這篇文章主要介紹了java中的數(shù)學(xué)計算函數(shù)的總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-07-07IDEA2020.3.2版本自動注釋類和方法注釋模板配置步驟詳解
這篇文章主要介紹了IDEA2020.3.2版本自動注釋類和方法注釋模板配置步驟,本文給大家分享了我自己創(chuàng)建過程通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03MyBatis批量查詢、插入、更新、刪除的實現(xiàn)示例
由于需要處理短時間內(nèi)大量數(shù)據(jù)入庫的問題,想到了Mybatis的批量操作,本文主要介紹了MyBatis批量查詢、插入、更新、刪除的實現(xiàn)示例,感興趣的可以了解一下2023-05-05