詳解Java并發(fā)編程中的優(yōu)先級隊列PriorityBlockingQueue
PriorityBlockingQueue是Java中實現(xiàn)了堆數(shù)據結構的線程安全的有界阻塞隊列。它可以在多線程場景下安全地進行元素添加、刪除和獲取操作,而且可以根據元素的優(yōu)先級進行排序。本篇博客將會深入解讀PriorityBlockingQueue的源碼實現(xiàn)。
一、PriorityBlockingQueue概述
PriorityBlockingQueue類實現(xiàn)了BlockingQueue接口,它是一個線程安全的隊列,繼承自AbstractQueue類,而AbstractQueue類又實現(xiàn)了Queue接口。PriorityBlockingQueue是一個有界的隊列,其容量可以在構造函數(shù)中進行指定,若不指定則默認大小為Integer.MAX_VALUE。同時,PriorityBlockingQueue也支持根據元素的優(yōu)先級進行排序,這是由于PriorityBlockingQueue內部實現(xiàn)了一個堆數(shù)據結構。
二、PriorityBlockingQueue源碼解析
1.容器
PriorityBlockingQueue內部使用了一個Object類型的數(shù)組queue來存儲元素,同時使用了一個int類型的變量size來記錄元素的數(shù)量。下面是PriorityBlockingQueue類中的定義:
private transient Object[] queue; private transient int size;
2.比較器
PriorityBlockingQueue可以根據元素的優(yōu)先級進行排序,這是由于PriorityBlockingQueue內部維護了一個小根堆或大根堆。在構造函數(shù)中,我們可以選擇使用元素自身的比較方式或是自定義比較器來進行元素的排序。若未指定比較器,則PriorityBlockingQueue將使用元素自身的比較方式進行排序。
private final Comparator<? super E> comparator;
3.構造函數(shù)
PriorityBlockingQueue提供了多個構造函數(shù),我們可以選擇使用無參構造函數(shù)來創(chuàng)建一個默認容量為Integer.MAX_VALUE的PriorityBlockingQueue,或是使用帶有初始容量或自定義比較器的構造函數(shù)。下面是PriorityBlockingQueue類的兩個構造函數(shù):
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; this.comparator = comparator; }
4.添加元素
PriorityBlockingQueue中添加元素的方法為offer()方法,它會首先檢查容量是否足夠,如果容量不足則會進行擴容操作,擴容的方式是將原數(shù)組長度增加一半。接著,它會將新元素添加到隊列的末尾,并通過siftUp()方法將元素上濾到合適的位置,以維護堆的性質。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (n == 0) { array[0] = e; } else { siftUp(n, e, array, cmp); } size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
5.獲取元素
PriorityBlockingQueue中獲取元素的方法為take()方法,它會首先檢查隊列是否為空,如果隊列為空則會將當前線程阻塞,直到有元素被添加到隊列中。接著,它會獲取隊列的頭部元素,并通過siftDown()方法將隊列的末尾元素移動到頭部,以維護堆的性質。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while (size == 0) notEmpty.await(); result = extract(); } finally { lock.unlock(); } return result; } private E extract() { final Object[] array = queue; final E result = (E) array[0]; final int n = --size; final E x = (E) array[n]; array[n] = null; if (n != 0) siftDown(0, x, array, comparator); return result; }
6.維護堆性質
PriorityBlockingQueue使用小根堆或大根堆來維護元素的優(yōu)先級,這里我們以小根堆為例。小根堆的特點是父節(jié)點的值小于等于左右子節(jié)點的值,PriorityBlockingQueue中的堆是通過數(shù)組來實現(xiàn)的。當添加元素時,會將新元素添加到隊列的末尾,并通過siftUp()方法將元素上濾到合適的位置,以維護堆的性質。當獲取元素時,會獲取隊列的頭部元素,并通過siftDown()方法將隊列的末尾元素移動到頭部,以維護堆的性質。下面是siftUp()和siftDown()方法的代碼實現(xiàn):
private static <T> void siftUp(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftUpUsingComparator(k, x, array, cmp); else siftUpComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } private static <T> void siftDown(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftDownUsingComparator(k, x, array, cmp); else siftDownComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftDownUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftDownComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }
siftUp()方法和siftDown()方法都使用了siftUpUsingComparator()方法和siftDownUsingComparator()方法,它們是使用Comparator來實現(xiàn)堆的上濾和下濾的。當PriorityBlockingQueue沒有指定Comparator時,會使用元素自身的Comparable來實現(xiàn)堆的上濾和下濾。
總結
PriorityBlockingQueue是Java并發(fā)包中的一個線程安全的、支持優(yōu)先級隊列的類,它使用小根堆或大根堆來維護元素的優(yōu)先級。PriorityBlockingQueue的內部實現(xiàn)使用了ReentrantLock和Condition來實現(xiàn)線程安全的操作,同時使用了數(shù)組來實現(xiàn)堆。PriorityBlockingQueue的核心方法包括添加元素的offer()方法,獲取元素的take()方法,以及維護堆性質的siftUp()方法和siftDown()方法。PriorityBlockingQueue的使用方式與普通隊列類似,但可以根據元素的優(yōu)先級進行排序和處理。
以上就是詳解Java并發(fā)編程中的優(yōu)先級隊列PriorityBlockingQueue的詳細內容,更多關于Java PriorityBlockingQueue的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot使用RabbitMQ延時隊列(小白必備)
這篇文章主要介紹了SpringBoot使用RabbitMQ延時隊列(小白必備),詳細的介紹延遲隊列的使用場景及其如何使用,需要的小伙伴可以一起來了解一下2019-12-12Java后臺Controller實現(xiàn)文件下載操作
這篇文章主要介紹了Java后臺Controller實現(xiàn)文件下載操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10java中PO、VO、BO、POJO、DAO、DTO、TO、QO、Bean、conn的理解
這篇文章主要介紹了java中PO、VO、BO、POJO、DAO、DTO、TO、QO、Bean、conn的理解,需要的朋友可以參考下2020-02-02