Java源碼重讀之ConcurrentHashMap詳解
如果你沒有閱讀過 Java 源碼重讀系列之 HashMap 這篇文章的話,建議你先讀一下。以下所有內容的前提是你已閱讀過以上的文章。
另外,凡是涉及到多線程、并發(fā)的東西從來就沒有簡單的,所以這次我們很難講清楚 ConcurrentHashMap 中的所有內容,只能聚焦到以下幾個內容
- ConcurrentHashMap 的 get 操作
- ConcurrentHashMap 的 put 操作
- ConcurrentHashMap 的 resize() 操作
- ConcurrentHashMap 是如何保證線程安全的
如果你想要了解的內容不在以上范圍內,那就不用繼續(xù)閱讀了,以免浪費時間~
0. 第一個屬性 serialPersistentFields
因為 ConcurrentHashMap 的邏輯比較復雜,所以我們直接從 serialPersistentFields 屬性說起,它之前的這些屬性等用到的時候我們再看就好了,你只要知道 這個屬性之前還有一堆固定的屬性就好了。
serialPersistentFields 屬性是一個 ObjectStreamField 的數(shù)組,而且默認添加了三個元素。
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};我們點到 ObjectStreamField 類中去,它的類頭有一段這樣的描述:
* A description of a Serializable field from a Serializable class. An array
* of ObjectStreamFields is used to declare the Serializable fields of a class.
簡單翻譯一下就是,一個序列化類中可以序列化屬性的描述。ObjectStreamFields 數(shù)組聲明了這個類的可序列化的字段。
好了,這個類我們看到這就可以了,而且也知道了 ConcurrentHashMap 中 serialPersistentFields 屬性的作用。就是聲明了一下 ConcurrentHashMap 里有三個 屬性可以被序列化。這三個屬性分別是segments、segmentMask、segmentShift 。結束~
1. spread()
繼續(xù)往下是 Node 類的定義,沒什么好說的,我們遇到了第一個方法。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}都是一些位運算。解釋一下,這個方法會將 h 和 h 右移 16 位的數(shù)值進行異或(^)操作,得到的結果與 HASH_BITS 進行與(&)操作。和 HASH_BITS 進行與(&)操作,作用就是保證返回的結果的最高位一定是 0,也就是說,返回的結果一定是正數(shù)。(如果你對位運算沒有什么概念的話,也可以不用糾結這個方法,這個方法的作用就是,給一個數(shù),返回另外一個數(shù)。)
2. tabAt()、casTabAt()、setTabAt()
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}這幾個是其實是 ConcurrentHashMap 的核心操作方法。tabAt() 是獲取,casTabAt() 是更新,并且是基于 CAS 方式實現(xiàn)的更新。setTabAt() 是插入。這些實現(xiàn)都使用了大名鼎鼎的 sun.misc.Unsafe 類。
如果你對這個類不熟悉的話,其實可以簡單理解,這個類里的一些方法都是線程的。因為這個類提供的是一些硬件級別的原子操作。簡單來說,sun.misc.Unsafe 類提供的方法都是線程安全的。理解到這里就可以了,再深入的內容,就不再本文范圍內了。繼續(xù)往下。
3. counterCells
繼續(xù)往下的話,就看到了 table 和 nextTable,沒什么說的,這個就是存儲數(shù)據(jù)的數(shù)組了,至于 nextTable,通過注釋可以看到,這個變量應該是只在擴容時使用到了,等用到的時候再說。
繼續(xù)往下呢就是一些int 類型的值了,通過名字和注釋也看不出來什么,直接跳過。等用到的時候再說。繼續(xù)往下的話我們就看到了一個 CounterCell[] 數(shù)組了,點到這個類的定義,可以看到以下代碼。
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}好像也沒有多復雜,就一個使用了 volatile 標記的 數(shù)值。至于 sun.misc.Contended 注解,主要是解決 CPU 偽緩存 問題的,提高性能和效率使用的,可以先不用關注。
但是,如果你閱讀一下注釋的話,就會發(fā)現(xiàn)這里面大有文章。涉及到兩個非常復雜的東西:LongAdder and Striped64。關于 LongAdder and Striped64 的內容也不在本文范圍內,有興趣的話可以搜一下相關的文章。不了解也沒有關系,不影響閱讀。我們繼續(xù)往下看。
4. keySet、values、entrySet
再往下就是幾個 view 變量了。
// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;看名字也應該能猜出來,這些變量應該是跟 HashMap 的 keySet()、values()、entrySet() 幾個方法的作用類似。如果你點到它的定義就會看到,這幾個類都繼承了 CollectionView 這個類。
abstract static class CollectionView<K,V,E>
implements Collection<E>, java.io.Serializable {
private static final long serialVersionUID = 7249069246763182397L;
final ConcurrentHashMap<K,V> map;
CollectionView(ConcurrentHashMap<K,V> map) { this.map = map; }
//... ...只看前面幾行就可以了,內部有一個 ConcurrentHashMap 類型的變量,而且 CollectionView 只有一個帶有 ConcurrentHashMap 參數(shù)的構造方法。盲猜也能猜到,上面的 xxxView 類內部操作的也都是 ConcurrentHashMap 存儲的數(shù)據(jù)。了解這些就可以了,我們繼續(xù)往下看。
5. 構造方法
第一個是個空構造方法沒有什么好說的,先看第二個。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}通過注釋和名稱我們應該能夠知道,這個構造方法可以初始化 Map 的容量。有意思的是,計算 cap 的方法。不知道你還記不記得 HashMap 的初始容量的構造方法是怎么計算容量的。代碼在下面
this.threshold = tableSizeFor(initialCapacity);
而 ConcurrentHashMap 則是將 initialCapacity 加上了 initialCapacity 的一半又加了 1 作為 tableSizeFor 的參數(shù)。其實就是為了解決 HashMap 存在的可能出現(xiàn)兩次擴容的問題。
注意,這里使用的是 >>>,不是 >>。>>> 的含義是無符號右移。它會把最高位表示正負的值也會右移,然后補 0。 所以 >>> 之后,一定是正數(shù)。如果 >>> 之前是正數(shù)的話,結果跟 >> 一致。如果是負數(shù)的話,就會出現(xiàn)一個很奇怪的正數(shù)。這是因為最高位表示負數(shù)的 1 也跟著右移了。由于代碼里已經判斷了小于 0 ,所以我們目前先按照除 2 理解即可。
還有一個點是,從代碼來看,ConcurrentHashMap 的最大容量 好像 是用 sizeCtl 表示的。但是,如果僅僅是表示最大容量,為什么會定義一個這么奇怪的名字呢? Ctl 的后綴應該是 control 的簡寫。具體是怎么控制的呢?
繼續(xù)往下,我們先跳過帶有 Map 參數(shù)的構造方法,因為這個涉及到 putAll() 方法。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}這兩個構造方法其實可以算做一個,我們直接看下面那個復雜的。
先判斷了一下參數(shù)的取值,然后更新了一下 initialCapacity 參數(shù),然后根據(jù)參數(shù)計算 size,考慮到 loadFactor 可能小于 1,導致 int 值越界,所以轉成了 long 類型。
關于 concurrencyLevel,給的注釋是:并發(fā)更新線程的預估數(shù)量。那上面那段判斷更新就不難理解了。假如我預估會有 20 個線程同時更新這個初始容量為 15 的 Map,這個時候的初始容量會自動的改為 20 。
好像沒有什么問題?有意思的是, loadFactor 這個參數(shù)竟然沒有保存!! 加載因子沒有保存,那什么時候觸發(fā)擴容呢?我們繼續(xù)往下看。
6. putAll()
回到帶有 Map 參數(shù)的構造方法。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}沒有什么復雜的,指定了下默認的初始容量(16)就直接 putAll(m); 了。
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}好像也不難,先執(zhí)行 tryPresize(m.size()); 應該是初始擴容, 然后再 for 循環(huán)進行 putVal() 操作。
7. tryPresize()
先看下方法名。嘗試并行重置容量。里面的 P 應該是 parallel(并行) 的縮寫。
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}首先計算了下變量 c,這個是保存入?yún)?size 個元素時需要的最大容量。
然后是一個 whlie 循環(huán),因為我們是通過構造方法進來的,所以 sizeCtl 的值現(xiàn)在是默認值 16 ,table 現(xiàn)在是 null。這個時候就進入到了 if 的代碼里了。
在 if 的條件里是判斷了 compareAndSwapInt() 的結果。這里需要說一下,compareAndSwapInt 方法是 CAS 的一種實現(xiàn)。這個方法內部做了兩件事情,首先是比較 this 這個對象的 SIZECTL 值是否跟 sc 相等,相等的話,把 SIZECTL 的值 改為 -1。而且 Unsafe 類還保證了線程的安全。如果有多個線程同時執(zhí)行這個方法的話,只會有一個線程成功。不會出現(xiàn)兩個線程都比較通過了,然后在賦值的時候產生覆蓋的問題。
好像也不難理解,其實就是把 sizeCtl 值改成了 -1,而且只有一個線程會成功。這里的 sizeCtl 更像是一把鎖,哪個線程改成了 -1 ,哪個線程就獲取到了鎖,那它就可以執(zhí)行后面的流程了。
繼續(xù),因為上面已經對 tab = table 賦值了,所以下面的判斷也能通過。然后,就看到了數(shù)組初始化的過程了。直接 new 了一個長度為 n 的 Node[]。并賦值給了 table。如果你往上追一下 n 的賦值,就會知道,現(xiàn)在的 n 正好是 c。就是方法一開始計算的值。
table 數(shù)組都已經初始化了,是不是結束了?并沒有。這個時候更新了一下 sc。 >>> 2 相當于除 4,其實就是 sc 現(xiàn)在的值是 n 的 3/4 。而且在 finally 塊中,更新了 sizeCtl。這個時候 sizeCtl 就不是 -1 了。根據(jù)我們之前的理解,這里更新 sizeCtl,應該是在釋放鎖。
然后,第一次 while 循環(huán)就結束了。再次進入 while 循環(huán),這次 sc 是 n 的 3/4 了,上一次循環(huán)已經更新了 sizeCtl。
這次 table 就不等于 null 了。而且根據(jù)我們之前的推斷,現(xiàn)在的 sc 應該等于 n 的 3/4 ,而 n 之前又等于 c。所以, c <= sc 這個條件也不成立。
而且 n >= MAXIMUM_CAPACITY 這個條件大概率是在擴容到最大的時候才會成立。所以,就走到了最后一個條件分支里了。因為 sc 現(xiàn)在是大于 0 的,所以直接走到了最后一個分支。
PS: if (sc < 0) 這個分支好像永遠不會執(zhí)行,因為 while 進入的條件要求 sc >= 0,而在判斷sc < 0 之前又沒有任何地方會把 sc 更新為負數(shù),所以好像一直都不會執(zhí)行。如果我理解錯了,希望有人能解惑一下~
8. resizeStamp()
首先執(zhí)行了一下 resizeStamp() 方法。這個方法也是一個位運算的方法。你可以直接使用 main() 方法跑一下,會返回一個很難理解的正數(shù)。簡單說一下這個數(shù)是怎么得出來的。
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}首先, numberOfLeadingZeros() 會返回 n 的二進制串中從最左邊算起連續(xù)的“0”的總個數(shù)。然后再跟 1 左移 15 位的值按位或(|)操作。 最終得到的就是一個在二進制中,第 16 位為 1 的正數(shù)。
繼續(xù)回到代碼,因為現(xiàn)在已經確定 sc 是 n 的 3/4 了(PS:如果這個 3/4 不理解,那換成 0.75 是不是會好點 ,好像跟 HashMap 的擴容因子一樣,其實 sc 的值就是擴容閾值,這個后面會提到,現(xiàn)在不理解沒關系),所以也是大于 0 的。這里又進行了一次 compareAndSwapInt()。這個時候賦值的是把 rs 左移了 16 位。 還記得 resizeStamp() 返回的結果么,第 16 位是 1。所以 rs 右移 16 位之后,最高位就是 1 了,在 int 類型里,二進制的最高位表示正負,1 表示負數(shù)。
所以,這個時候,又把 sizeCtl 更新成負值了。根據(jù)我們之前的理解,這里應該還是獲取鎖的操作。獲取到鎖之后,一般就是需要操作資源了。但是 table 我們不是已經初始化好了嗎?這次又要初始什么呢?
記住,現(xiàn)在 sizeCtl 是一個負值,并且 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2) 后面要用到!
9.transfer()
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
//... ...到這里還是比較好理解的。先初始了一下 stride 和 n 這兩個變量。然后,因為我們是初始化進來的,所以 nextTab 一定等于 null。這個時候會初始化 nextTab。在創(chuàng)建數(shù)組的時候捕獲了一個異常,這個異常出現(xiàn)的唯一情況就是內存不夠了,分配不了 2 倍的 n 的數(shù)組。這個時候,將 sizeCtl 的值改為了 Integer.MAX_VALUE。然后就結束了。如果沒有拋異常,會更新 nextTable 和 transferIndex 的值。
我們需要回頭看下 tryPresize() 方法。如果在拋異常的時候結束,會出現(xiàn)什么情況。根據(jù)代碼,異常結束后,會進入第三次循環(huán),這次循環(huán)會進入第二個分支。因為 c <= sc 一定會成立。這里就會結束循環(huán)。
到這里,我們已經把 tryPresize() 循環(huán)里的三個分支都走完了,下面繼續(xù)看 transfer() 這個方法。
nextTab 初始化之后,我們又看到了一個新的 Node 類:
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
點到這個類的定義里我們會發(fā)現(xiàn),這個類里面只有一個屬性 nextTable 和一個 find() 方法。關于 find() 是在獲取元素時才能用到,我們先不用關注。目前來看 ForwardingNode 其實就是對 nextTab 的一個封裝。然后繼續(xù)看 transfer() 。
兩個boolean 類型的值,默認一個 true,一個 false。
下面的代碼是一個 for 循環(huán),但是這個循環(huán)有差不多 100 多行的代碼(如果我在項目里遇到這種代碼估計會罵人的~)。我們一點點看,首先是一個while 循環(huán)。
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
} else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}首先 i = 0, bound = 0 ,所以,第一次循環(huán)不會進入第一個分支。然后,根據(jù)之前 transferIndex = n; 的賦值,也不會進入第二個分支。
這樣就來到了第三個分支。compareAndSwapInt 會更新 transferIndex 的值,如果 CPU 的個數(shù)是 1,transferIndex 更新成 0 ,否則更新成 nextIndex - stride 。然后更新 bound、i、advance 的值,循環(huán)就結束了。
繼續(xù)往下,現(xiàn)在 i 的值是 n - 1,所以不會命中 if (i < 0 || i >= n || i + n >= nextn) 條件。
然后,因我們是初始化時進入的,素以 tab 里的所有元素都是 null,第二個條件就通過了。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);其實就是把 tab 的 i 位置 初始化了一個 fwd 元素。 到這里,第一次 for 循環(huán)就結束了。
第二次循環(huán)其實也很簡單,首先 advance = false ,不會進入 while 循環(huán),然后就會進入下面的判斷
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed首先,獲取了下 tab[i] 的值,因為上次循環(huán)已經賦值過了,現(xiàn)在 f = fwd。然后,有意思的來了,先看下 MOVED 的定義
static final int MOVED = -1; // hash for forwarding nodes
沒錯,MOVED = -1,按我們正常的理解,一個對象的 hash 值,怎么也不會等于 -1 吧,我們再回頭看下 ForwardingNode 這個類
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
//... ...
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//... ...
}順便貼了下父類的代碼,主要看構造函數(shù),看到了么? fwd 這個對象在初始化的時候,指定了 hash 值,就是 MOVED。OK,回到之前的循環(huán)。
這次循環(huán)就會把 advance 改為 true。第二次循環(huán)就結束了。
經過上面的兩次循環(huán)之后,我們是其實只是執(zhí)行了一行代碼 tab[n-1] = fwd。現(xiàn)在進入第三次循環(huán),之前 i = n - 1 ?,F(xiàn)在又執(zhí)行了一次 if (--i >= bound || finishing),所以現(xiàn)在 i = n - 2 。但是 bound 可能有兩種情況,一種是 bound = n - stride,一種是 bound = 0。我們先假設 bound = n - stride; stride = 16 。所以,第一個條件是成立的,執(zhí)行 advance = false; ,然后 while 循環(huán)結束。
然后,第一個 if (i < 0 || i >= n || i + n >= nextn) 條件不會成立,又執(zhí)行到了賦值操作里。這時 tab[n-2] = fwd。第三次循環(huán)結束~
然后第四次循環(huán)又會把 advance 改為 true。
好好回味下~~
其實,這一頓操作下來就是在執(zhí)行 tab[i] = fwd 這一行代碼。搞了這么多東西,其實就是在支持多線程并發(fā)擴容,簡單說下過程。
首先,while 循環(huán)會確定當前線程擴容的區(qū)間 [ bound,nextIndex ) 左開右閉。然后 while 循環(huán)下面的代碼其實就是在給 tab 和 nextTab 賦值。設想下,如果 while 循環(huán)里的 compareAndSwapInt 執(zhí)行失敗,會是什么情況?沒錯,會空轉!結束只有兩種情況,一種是 transferIndex = 0。說明已經有其他線程把所有的區(qū)間都認領了,另外一種情況是執(zhí)行 compareAndSwapInt。認領 [ bound,nextIndex ) 的區(qū)間,進行擴容。
其實,你可以直接驗證下的,打斷點也好,手寫也罷,假設n = 1024; NCPU = 4。這時 stride = 32。那么第一個線程會先對 tab[1024-32,1024-1]進行賦值。如果這時有其他線程進來,在 while 循環(huán)的時候,就會認領 tab[1024-32-32,1024-32-1] 的區(qū)間進行賦值。如果有更多的線程進來的話,就會加快這個過程。這個就是所謂的 并發(fā)擴容 ,也有叫 輔助擴容 的。
然后,我們來看下通過 synchronized 加鎖的這段代碼。能執(zhí)行到這里的話只能是 tab[i].hash != MOVED。那就說明這里記錄的是一個正常的數(shù)據(jù)。
首先判斷了下 if (tabAt(tab, i) == f) 沒什么說的,肯定成立,不成立就結束了,然后判斷了下 if (fh >= 0)。有點奇怪,正常數(shù)據(jù)的 hash 還能小于 0 ?那我們先看下不成立的情況
else if (f instanceof TreeBin)
明白了吧,當發(fā)生 hash 沖突時,并且鏈表已經轉成紅黑樹了,這時 tab[i] = TreeBin,那我們看下 TreeBin 的定義。
static final class TreeBin<K,V> extends Node<K,V> {
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
//... ...
}
}
static final int TREEBIN = -2; // hash for roots of trees真相了,TreeBin 的 hash 值是 -2,就小于 0。后面的代碼我們就不說了,其實跟HashMap是一樣的,如果當前節(jié)點是鏈表,那就采用高低位鏈表的形式對 nextTab 賦值,如果是 TreeBin 那就采用紅黑樹的方式進行賦值。而且,我們對 tab[i] 加了 synchronized 鎖,也不會有線程競爭,老老實實賦值就好了。
最后,transfer 里的代碼基本上都看完了,就剩下面這段了
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}在 while 循環(huán)里,有這么一行代碼 i = -1; 執(zhí)行了這個之后,就會進入上面的代碼里。其實就是 tab 初始化完成之后,即 nextIndex = 0 的時候,就會執(zhí)行 i = -1; ,然后就會進入上面的代碼了。我們看下上面的代碼。
首先,finishing 現(xiàn)在應該是等于 false 的,直接進入第二個 if。這個也很簡單,首先 sc = sizeCtl,賦值,然后通過 CAS 將 sizeCtl 的值改為 sc - 1。還記得 sizeCtl 的值是多少么?? 我直接粘貼一下 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)。是不是跟 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 這個判斷邏輯一致?如果不相等,說明有其他線程修改了sizeCtl,這同時說明有其他線程還在執(zhí)行擴容的動作,即還在執(zhí)行 tryPresize() 或者是 transfer() 方法。那么,因為當前線程已經執(zhí)行完了,所以直接 return; 結束,讓其他線程繼續(xù)執(zhí)行就好了。
如果相等,執(zhí)行 finishing = advance = true; i = n。進入下一次 for 循環(huán)。
一頓判斷之后,你會發(fā)現(xiàn),還是會進入到上面的代碼,而這時,finishing == true 了!下面的代碼就不難理解了吧,更新 table ,相當于使用了新的數(shù)組了,而 sizeCtl 也更新了一下。都是位運算,如果你看不明白,可以用 main 方法跑一下。我們假設 n = 1024,那么 table 現(xiàn)在的大小也就是之前 nextTab 的大小,就是2048,然后,我們用 main 跑一下 sizeCtl 的值,不出意外的話應該等于 1536 。如果還看不明白,那么你再計算下 1536 / 2048。結果是 0.75 ,這個數(shù)字熟悉吧? HashMap 的默認加載因子!。沒錯,sizeCtl 其實就是下次需要擴容的閾值。
到這里,我們就把 transfer() 方法看完了。然后,我們重點總結下 sizeCtl 這個屬性,不得不承認,這個設計非常的巧妙!
首先,通常情況下 sizeCtl 應該是等于下次的擴容閾值的。但是在擴容期間,有兩個狀態(tài),一個是 -1,一個是非常大的一個負值。等于 -1 很好理解,相當于是一個鎖,哪個線程更新成功,就可以進行數(shù)組的初始化動作。那么,就剩最后一種情況了。直接用下面的 main() 方法跑一下
public static void main(String[] args) throws IllegalAccessException {
int rs = Integer.numberOfLeadingZeros(1024) | (1 << (16 - 1));
int sizeCtl = (rs << 16) + 2;
System.out.println(sizeCtl);
System.out.println((sizeCtl<<16)>>16);
}會得到下面的結果
-2146107390
2
有意思了,(sizeCtl<<16)>>16,這個操作是先左移 16 位,再右移 16 位,相當于把 sizeCtl 的高 16 位都置為 0 了。得到了一個 2,其實,這個 2 就是說當前有 (2 - 1) 個線程在進行擴容操作。(PS: sizeCtl 注釋里有寫~)。具體是為什么,我們繼續(xù)往下看。
transfer() 執(zhí)行完,就回到了 tryPresize()。然后繼續(xù)返回就到了 putAll()。繼續(xù)往下執(zhí)行就是 putVal() 方法了。
10.putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//... ...首先判斷了下 null,然后計算了下 hash 哈希值。就進入 for 循環(huán)了。首先是第一個分支。其實就是 tab 還沒有初始化的時候會進入這個分支。
11.initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}好像也沒有多少復雜的,因為我們之前已經對 sizeCtl 做了充分的解釋,這里如果 sc < 0 的話,說明在擴容或者是初始化中,然后當前線程直接執(zhí)行了 Thread.yield();,就是放棄 CPU 執(zhí)行權等待下次分配 CPU 時間片,如果不小于 0 ,并且 tab = null ,那說明現(xiàn)在還沒有線程執(zhí)行擴容,那當前線程就會更新 sizeCtl,然后自己開始執(zhí)行初始化動作,初始化好后直接返回 tab。
繼續(xù)回到 putVal() 方法,如果執(zhí)行第二個分支,說明 tab[i] == null,這個位置還沒有元素,直接通過 casTabAt() 方法進行賦值。如果這個位置有值,并且 (fh = f.hash) == MOVED 說明在擴容或者是在初始化,這個時候當前線程會進行 輔助擴容。
12.helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}首先在 if 條件里獲取到 nextTab 如果不為 null,就會進入 while 循環(huán),首先 sc < 0 說還在擴容或者是初始化中,while 循環(huán)里的第一個分支是不需要輔助擴容或者是已經達到最大的輔助線程數(shù)量,或者是已經剩最后一個線程在擴容了,其他的線程都結束了。所以直接 break; 就可以了。
第二個分支,首先會執(zhí)行 sizeCtl + 1 的操作,執(zhí)行成功就會執(zhí)行 transfer() 方法,這個方法我們之前已經看過了,就不看了,需要注意的是,我們之前說過,sizeCtl 的低 16 位代表目前正在擴容的線程數(shù)減一。因為這里新加入一個線程參與擴容,所以對 sizeCtl 進行了加一的操作。如果還有線程進來,那么 sizeCtl 還會加一。這里就解釋清楚 sizeCtl 的另外的一個用法了。擴容結束直接 break;。繼續(xù)回到 putVal()。
繼續(xù)下一次 for,如果 tab[i] 還不等于 null,那就說明發(fā)生哈希沖突了,并且當前已經不在擴容了。就走到了最后一個分支,使用 synchronized 加鎖的這一段代碼里,這段代碼其實并不復雜,發(fā)生沖突之后無非就兩種情況,鏈表或者是紅黑樹。你可以看下 TreeBin 的構造方法。它的哈希值是 -2。
static final int TREEBIN = -2; // hash for roots of trees
//... ...
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
//... ...
}所以才有了 if (fh >= 0) 的判斷,如果首節(jié)點的哈希值大于 0 ,那一定是鏈表。
最后還有一段進行樹化的判斷操作。
static final int TREEIFY_THRESHOLD = 8;
//... ...
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}鏈表的節(jié)點數(shù)超過 8 就會進行樹化操作。到這里其實 putVal() 的相關操作基本上已經結束了,就剩最后一個 addCount() 方法了,看名稱也知道這個是更新計數(shù)器的,盲猜也能猜到應該跟元素數(shù)量有關系。不過,好像有點問題啊,你有沒有發(fā)現(xiàn),在整個 putVal() 方法里面,好像沒有觸發(fā)擴容的邏輯??!
13.addCount()
其實這個方法除了操作我們之前見到的 counterCells 屬性外,還會判斷是否需要進行擴容。因為只有在知道具體的元素數(shù)量后,才能判斷出是否需要擴容。我們先看這個方法的第一段代碼。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//... ...首先,我們假設目前是第一次執(zhí)行這個方法,那么 counterCells == null,然后就會使用 CAS 執(zhí)行 baseCount = b + x。失敗之后,就開始執(zhí)行 fullAddCount() 方法了,因為現(xiàn)在 as == null 是成立的。
fullAddCount() 方法與 Striped64.longAccumulate() 方法基本上是一模一樣的,因為之前已經跳過了 Striped64,所以這里也不打算去細看。直接總結下 counterCells 的作用。其實 counterCells 就是在多個線程同時更新 baseCount 失敗時記錄下新增的元素數(shù)量。
舉個例子就明白了,假設 ConcurrentHashMap 初始化完成之后,有 2 個線程,同時執(zhí)行了 addCount(),那么 baseCount 會更新成 1,counterCells 會初始化為一個大小為 2 的數(shù)組,且一個元素是 null,另外一個元素的 counterCells[i].value 值是 1。
如果這個時候又來了一個線程,會有 3 種情況,
- CAS 更新
baseCount成功,baseCount = 2。第三個線程結束 - CAS 失敗且
counterCells[ThreadLocalRandom.getProbe() & m] == null。繼續(xù)初始化counterCells的另一個為null的元素,值為 1。 - CAS 失敗且
counterCells[ThreadLocalRandom.getProbe() & m] != null,那么counterCells[i].value值更新成 2。
ThreadLocalRandom.getProbe() 方法其實就是取了個隨機值。就是說,如果有多個線程同時更新的話,失敗的線程會隨機的從 counterCells 取一個元素,將新增的數(shù)量保存進去。
其實很簡單,能進入到 fullAddCount() 方法的條件只有一種,counterCells == null 并且 CAS 更新 baseCount 失敗,這種情況就是有多個線程同時執(zhí)行了 addCount() 方法,比如,有兩線程同時執(zhí)行 putVal(),那么必然有一個線程在 CAS 更新 baseCount 時會失敗,這個時候就進入到 fullAddCount() 方法。這個方法內部就是在操作 counterCells 數(shù)組。操作的行為基本上就是下面這幾種
要么是初始化 counterCells 數(shù)組
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}要么就是初始化 counterCells 數(shù)組元素
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}要么就是更新 counterCells 數(shù)組元素的值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;還有一種操作就是 擴容 ,對 counterCells 進行擴容。
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}在 fullAddCount() 方法里,每次循環(huán)都會重新隨機取元素 h = ThreadLocalRandom.advanceProbe(h);。如果執(zhí)行循環(huán)了多次,都沒有保存成功,說明 counterCells 的容量不夠用了,就會觸發(fā)擴容。從上面的代碼里也能看到,counterCells 的擴容非常簡單,數(shù)組直接翻倍,元素直接賦值到新數(shù)組里,位置都沒有變。
繼續(xù)回到 addCount() 方法,之后的邏輯就是判斷了下 check 參數(shù)。其實這里的邏輯是,如果有多個線程同時操作,只要沒有發(fā)生哈希沖突,就不進行擴容檢查。你往回翻一下就可以看到 check 參數(shù)其實就是 tab[i] 位置的元素數(shù)量。
如果發(fā)生了哈希沖突,或者說沒有多個線程同時操作(這個時候就進入不了當前的分支,更新 baseCount 不會失敗),就會執(zhí)行 s = sumCount(); 這個方法非常簡單,就是對 baseCount 和 counterCells 里的數(shù)值進行了一下求和,然后就開始執(zhí)行下面的代碼。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}首先 while 循環(huán)判斷了下當前的元素數(shù)量是否超過了 sizeCtl,即便是在擴容期間,sizeCtl 小于 0 的時候,也算成立。然后判斷了下 tab ,基本上也會成立。直接進入循環(huán)內部
第一個分支 if (sc < 0) 說明已經有線程開始執(zhí)行擴容動作了,這個時候更新 sizeCtl 的值加一,當前線程參與 輔助擴容。
第二個分支是目前還沒有線程進行擴容操作,那么當前線程開始執(zhí)行擴容,(rs << RESIZE_STAMP_SHIFT) + 2) 這個數(shù)值我們之前已經看到過了,就不再贅述了。
循環(huán)最后重新計算 s ,擴容結束后 s 就不會小于 sizeCtl,方法結束。
好了,到這里我們基本上就把 ConcurrentHashMap 的 put 操作的邏輯看完了。其實整體上跟 HashMap還是比較類似的,基本上就是把所有對 tab 的操作都使用 Unsafe 包裝了一下,解決多線程操作的問題,而發(fā)生哈希沖突時也是使用了 synchronized 進行了加鎖,解決了多線程操作鏈表的覆蓋問題。比較難的反而是元素數(shù)量的問題。因為 ConcurrentHashMap 一定要保證元素保存到 tab 成功后,元素數(shù)量一定也要加成功!不能因為元素數(shù)量的值更新失敗了,再把保存到 tab 里的元素刪除掉吧。所以呢 ConcurrentHashMap 就使用 counterCells 數(shù)組來保存那些更新 baseCount 失敗的數(shù)量。
14.get()
下面我們看下 get() 方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}好像也不復雜,第一個分支是一次就從 tab[i] 位置找到了元素,直接返回。最后一個 while 循環(huán)是 tab[i] 位置發(fā)生了哈希沖突,且當前位置是鏈表,通過 while 循環(huán)遍歷尋找。
重點說一下第二個分支吧 else if (eh < 0) 有兩種情況,一種是 tab[i] 位置發(fā)生了哈希沖突,且當前位置是紅黑樹,這時 e 是 TreeBin 類型的,因為涉及到紅黑樹,我們直接跳過,有興趣的可以自己研究下。另外一種情況是在擴容期間,當前元素已經轉移到新的 nextTable 上了,這時 e 的類型是 ForwardingNode 類型,我們直接看下 ForwardingNode 類的 find() 代碼
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}也不復雜,就是直接在 nextTable 找元素,如果 nextTable[i] 位置為 null 直接返回,否則就進入了 for (;;) 循環(huán)里,跟之前類似,第一個分支里是直接找到了元素,而 if (eh < 0) 也有兩種情況,一個是擴容時轉移到新的 nextTable,一個就是紅黑樹。最后就是鏈表了。
好了,到這里基本上所有的內容都結束了,最后還剩一點有意思的東西,就是 Traverser 類,這個類其實實現(xiàn)了在擴容期間,也能使 ConcurrentHashMap 可以高效的(不使用鎖)遍歷。代碼不多,有興趣的話可以讀一下~
以下是遺留的一些內容,有機會再補吧
sun.misc.Unsafe類。LongAdderandStriped64sun.misc.Contended注解TreeBin
以上就是Java源碼重讀之ConcurrentHashMap詳解的詳細內容,更多關于Java ConcurrentHashMap的資料請關注腳本之家其它相關文章!
- Java的ConcurrentHashMap原理深入分析
- Java?ConcurrentHashMap實現(xiàn)線程安全的代碼示例
- Java面試常考之ConcurrentHashMap多線程擴容機制詳解
- 淺析Java中ConcurrentHashMap的存儲流程
- Java?ConcurrentHashMap的源碼分析詳解
- Java集合ConcurrentHashMap詳解
- java并發(fā)容器ConcurrentHashMap深入分析
- Java HashTable的原理與實現(xiàn)
- Java中HashMap和Hashtable的區(qū)別小結
- 一文帶你全面了解Java?Hashtable
- Java中Hashtable集合的常用方法詳解
- 詳解Java中的HashTable
- Java容器HashMap與HashTable詳解
- java HashMap和HashTable的區(qū)別詳解
- java面試題——詳解HashMap和Hashtable 的區(qū)別
- Java中HashMap和Hashtable的區(qū)別淺析
- Java中ConcurrentHashMap和Hashtable的區(qū)別
相關文章
詳解XML,Object,Json轉換與Xstream的使用
這篇文章主要介紹了詳解XML,Object,Json轉換與Xstream的使用的相關資料,需要的朋友可以參考下2017-02-02

