解析ConcurrentHashMap: transfer方法源碼分析(難點)
- 上一篇文章介紹過put方法以及其相關(guān)的方法,接下來,本篇就介紹一下transfer這個方法(比較難),最好能動手結(jié)合著源碼進行分析,并仔細理解前面幾篇文章的內(nèi)容~
- 注:代碼分析的注釋中的CASE0、CASE1… ,這些并沒有直接關(guān)聯(lián)關(guān)系,只是為了給每個if邏輯判斷加一個標識,方便在其他邏輯判斷的地方進行引用而已。
再復習一下并發(fā)Map的結(jié)構(gòu)圖:

1、transfer方法
transfer方法的作用是:遷移元素,擴容時table容量變?yōu)樵瓉淼膬杀?,并把部分元素遷移到其它桶nextTable中。該方法遷移數(shù)據(jù)的流程就是在發(fā)生擴容時,從散列表原table中,按照桶位下標,依次將每個桶中的元素(結(jié)點、鏈表、樹)遷移到新表nextTable中。
另外還有與其相關(guān)的方法helpTransfer:協(xié)助擴容(遷移元素),當線程添加元素時發(fā)現(xiàn)table正在擴容,且當前元素所在的桶元素已經(jīng)遷移完成了,則協(xié)助遷移其它桶的元素。
下圖為并發(fā)擴容流程圖,在分析源碼前熟悉一下流程:

鏈表遷移示意圖:

下面就開始分析代碼把:
/**
* 遷移元素,擴容時table容量變?yōu)樵瓉淼膬杀?,并把部分元素遷移到其它桶nextTable中:
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n 表示擴容之前table數(shù)組的長度
// stride 表示分配給線程任務的步長
int n = tab.length, stride;
// 這里為了方便分析,根據(jù)CUP核數(shù),stride 固定為MIN_TRANSFER_STRIDE 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 控制線程遷移數(shù)據(jù)的最小步長(桶位的跨度~)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// ------------------------------------------------------------------------------
// CASE0:nextTab == null
// 條件成立:表示當前線程為觸發(fā)本次擴容的線程,需要做一些擴容準備工作:(在if代碼塊中)
// 條件不成立:表示當前線程是協(xié)助擴容的線程...
if (nextTab == null) { // initiating
try {
// 創(chuàng)建一個比擴容之前容量n大一倍的新table
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 賦值nextTab對象給 nextTable ,方便協(xié)助擴容線程 拿到新表
nextTable = nextTab;
// 記錄遷移數(shù)據(jù)整體位置的一個標記,初始值是原table數(shù)組的長度。
// 可以理解為:全局范圍內(nèi)散列表的數(shù)據(jù)任務處理到哪個桶的位置了
transferIndex = n;
}
// 表示新數(shù)組的長度
int nextn = nextTab.length;
// fwd節(jié)點,當某個桶位數(shù)據(jù)遷移處理完畢后,將此桶位設(shè)置為fwd節(jié)點,其它寫線程或讀線程看到后,會有不同邏輯。fwd結(jié)點指向新表nextTab的引用
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 推進標記(后面講數(shù)據(jù)遷移的時候再分析~)
boolean advance = true;
// 完成標記(后面講數(shù)據(jù)遷移的時候再分析~)
boolean finishing = false; // to ensure sweep before committing nextTab
// i 表示分配給當前線程的數(shù)據(jù)遷移任務,任務執(zhí)行到的桶位下標(任務進度~表示當前線程的任務已經(jīng)干到哪里了~)
// bound 表示分配給當前線程任務的下界限制。(線程的數(shù)據(jù)遷移任務先從高位開始遷移,遷移到下界位置結(jié)束)
int i = 0, bound = 0;
// 自旋~ 非常長的一個for循環(huán)...
for (;;) {
// f 桶位的頭結(jié)點
// fh 頭結(jié)點的hash
Node<K,V> f; int fh;
/**
* while循環(huán)的任務如下:(循環(huán)的條件為推進標記advance為true)
* 1.給當前線程分配任務區(qū)間
* 2.維護當前線程任務進度(i 表示當前處理的桶位)
* 3.維護map對象全局范圍內(nèi)的進度
*/
// 整個while循環(huán)就是在算i的值,過程太復雜,不用太關(guān)心
// i的值會從n-1依次遞減,感興趣的可以打下斷點就知道了
// 其中n是舊桶數(shù)組的大小,也就是說i從15開始一直減到1這樣去遷移元素
while (advance) {
// nextIndex~nextBound分配任務的區(qū)間:
// 分配任務的開始下標
// 分配任務的結(jié)束下標
int nextIndex, nextBound;
// -----------------------------------------------------------------------
// CASE1:
// 條件一:--i >= bound
// 成立:表示當前線程的任務尚未完成,還有相應的區(qū)間的桶位要處理,--i 就讓當前線程處理下一個桶位.
// 不成立:表示當前線程任務已完成 或者 未分配
if (--i >= bound || finishing)
// 推進標記設(shè)置為flase
advance = false;
// -----------------------------------------------------------------------
// CASE2: (nextIndex = transferIndex) <= 0
// transferIndex:可以理解為,全局范圍內(nèi)散列表的數(shù)據(jù)任務處理到哪個桶的位置了~
// 前置條件:當前線程任務已完成 或 者未分配,即沒有經(jīng)過CASE1
// 條件成立:表示對象全局范圍內(nèi)的桶位都分配完畢了,沒有區(qū)間可分配了,設(shè)置當前線程的i變量為-1 跳出循環(huán),然后執(zhí)行退出遷移任務相關(guān)的程序
// 條件不成立:表示對象全局范圍內(nèi)的桶位尚未分配完畢,還有區(qū)間可分配~
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
// 推進標記設(shè)置為flase
advance = false;
}
// -----------------------------------------------------------------------
// CASE3,其實就是移動i的一個過程:CAS更新成功,則i從當前位置-1,再復習一下i的含義:
// i 表示分配給當前線程的數(shù)據(jù)遷移任務,任務執(zhí)行到的桶位下標(任務進度~表示當前線程的任務已經(jīng)干到哪里了~)
// CASE3前置條件(未經(jīng)過CASE1-2):1、當前線程需要分配任務區(qū)間 2.全局范圍內(nèi)還有桶位尚未遷移
// 條件成立:說明給當前線程分配任務成功
// 條件失敗:說明分配任務給當前線程失敗,應該是和其它線程發(fā)生了競爭~
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
處理線程任務完成后,線程退出transfer方法的邏輯:
**/
// -------------------------------------------------------------------------
// CASE4:
// 條件一:i < 0
// 成立:表示當前線程未分配到任務,或已完成了任務
// 條件二、三:一般情況下不會成里~
if (i < 0 || i >= n || i + n >= nextn) {
// 如果一次遍歷完成了
// 也就是整個map所有桶中的元素都遷移完成了
// 保存sizeCtl的變量
int sc;
// 擴容結(jié)束條件判斷
if (finishing) {
// 如果全部桶中數(shù)據(jù)遷移完成了,則替換舊桶數(shù)組
// 并設(shè)置下一次擴容門檻為新桶數(shù)組容量的0.75倍
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// -------------------------------------------------------------------------
// CASE5:當前線程擴容完成,把并發(fā)擴容的線程數(shù)-1,該操作基于CAS,修改成功返回true
// 條件成立:說明,更新 sizeCtl低16位 -1 操作成功,當前線程可以正常退出了~
// 如果條件不成立:說明CAS更新操作時,與其他線程發(fā)生競爭了~
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 條件成立:說明當前線程不是最后一個退出transfer任務的線程
// eg:
// 1000 0000 0001 1011 0000 0000 0000 0000
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 擴容完成兩邊肯定相等
// 正常退出
return;
// 完成標記finishing置為true,,finishing為true才會走到上面的if擴容結(jié)束條件判斷
// 推進標記advance置為true
finishing = advance = true;
// i重新賦值為n
// 這樣會再重新遍歷一次桶數(shù)組,看看是不是都遷移完成了
// 也就是第二次遍歷都會走到下面的(fh = f.hash) == MOVED這個條件
i = n; // recheck before commit
}
}
/**
線程處理一個桶位數(shù)據(jù)的遷移工作,處理完畢后,
設(shè)置advance為true: 表示繼續(xù)推薦,然后就會回到for,繼續(xù)循環(huán)
**/
// -------------------------------------------------------------------------
// CASE6:
// 【CASE6~CASE8】前置條件:當前線程任務尚未處理完,正在進行中!
// 條件成立:說明當前桶位未存放數(shù)據(jù),只需要將此處設(shè)置為fwd節(jié)點即可。
else if ((f = tabAt(tab, i)) == null)
// 如果桶中無數(shù)據(jù),直接放入FWD,標記該桶已遷移:
// casTabAt通過CAS的方式去向Node數(shù)組指定位置i設(shè)置節(jié)點值,設(shè)置成功返回true,否則返回false
// 即:將tab數(shù)組下標為i的結(jié)點設(shè)置為FWD結(jié)點
advance = casTabAt(tab, i, null, fwd);
// -------------------------------------------------------------------------
// CASE7: (fh = f.hash) == MOVED 如果桶中第一個元素的hash值為MOVED
// 條件成立:說明頭節(jié)f它是ForwardingNode節(jié)點,
// 則,當前桶位已經(jīng)遷移過了,當前線程不用再處理了,直接再次更新當前線程任務索引,再次處理下一個桶位 或者 其它操作~
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// -------------------------------------------------------------------------
// CASE8:
// 前置條件:**當前桶位有數(shù)據(jù)**,而且node節(jié)點 不是 fwd節(jié)點,說明這些數(shù)據(jù)需要遷移。
else {
// 鎖定該桶并遷移元素:(sync 鎖加在當前桶位的頭結(jié)點)
synchronized (f) {
// 再次判斷當前桶第一個元素是否有修改,(可能出現(xiàn)其它線程搶先一步遷移/修改了元素)
// 防止在你加鎖頭對象之前,當前桶位的頭對象被其它寫線程修改過,導致你目前加鎖對象錯誤...
if (tabAt(tab, i) == f) {
// 把一個鏈表拆分成兩個鏈表(規(guī)則按照是桶中各元素的hash與桶大小n進行與操作):
// 等于0的放到低位鏈表(low)中,不等于0的放到高位鏈表(high)中
// 其中低位鏈表遷移到新桶中的位置相對舊桶不變
// 高位鏈表遷移到新桶中位置正好是其在舊桶的位置加n
// 這也正是為什么擴容時容量在變成兩倍的原因 (鏈表遷移原理參考上面的圖)
// ln 表示低位鏈表引用
// hn 表示高位鏈表引用
Node<K,V> ln, hn;
// ---------------------------------------------------------------
// CASE9:
// 條件成立:表示當前桶位是鏈表桶位
if (fh >= 0) {
// 第一個元素的hash值大于等于0,說明該桶中元素是以鏈表形式存儲的
// 這里與HashMap遷移算法基本類似,唯一不同的是多了一步尋找lastRun
// 這里的lastRun是提取出鏈表后面不用處理再特殊處理的子鏈表
// 比如所有元素的hash值與桶大小n與操作后的值分別為 0 0 4 4 0 0 0
// 則最后后面三個0對應的元素肯定還是在同一個桶中
// 這時lastRun對應的就是倒數(shù)第三個節(jié)點
// 至于為啥要這樣處理,我也沒太搞明白
// lastRun機制:
// 可以獲取出 當前鏈表 末尾連續(xù)高位不變的 node
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 看看最后這幾個元素歸屬于低位鏈表還是高位鏈表
// 條件成立:說明lastRun引用的鏈表為 低位鏈表,那么就讓 ln 指向 低位鏈表
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 否則,說明lastRun引用的鏈表為 高位鏈表,就讓 hn 指向 高位鏈表
else {
hn = lastRun;
ln = null;
}
// 遍歷鏈表,把hash&n為0的放在低位鏈表中,不為0的放在高位鏈表中
// 循環(huán)跳出條件:當前循環(huán)結(jié)點!=lastRun
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 低位鏈表的位置不變
setTabAt(nextTab, i, ln);
// 高位鏈表的位置是:原位置 + n
setTabAt(nextTab, i + n, hn);
// 標記當前桶已遷移
setTabAt(tab, i, fwd);
// advance為true,返回上面進行--i操作
advance = true;
}
// ---------------------------------------------------------------
// CASE10:
// 條件成立:表示當前桶位是 紅黑樹 代理結(jié)點TreeBin
else if (f instanceof TreeBin) {
// 如果第一個元素是樹節(jié)點,也是一樣,分化成兩顆樹
// 也是根據(jù)hash&n為0放在低位樹中,不為0放在高位樹中
// 轉(zhuǎn)換頭結(jié)點為 treeBin引用 t
TreeBin<K,V> t = (TreeBin<K,V>)f;
// 低位雙向鏈表 lo 指向低位鏈表的頭 loTail 指向低位鏈表的尾巴
TreeNode<K,V> lo = null, loTail = null;
// 高位雙向鏈表 lo 指向高位鏈表的頭 loTail 指向高位鏈表的尾巴
TreeNode<K,V> hi = null, hiTail = null;
// lc 表示低位鏈表元素數(shù)量
// hc 表示高位鏈表元素數(shù)量
int lc = 0, hc = 0;
// 遍歷整顆樹(TreeBin中的雙向鏈表,從頭結(jié)點至尾節(jié)點),根據(jù)hash&n是否為0分化成兩顆樹:
for (Node<K,V> e = t.first; e != null; e = e.next) {
// h 表示循環(huán)處理當前元素的 hash
int h = e.hash;
// 使用當前節(jié)點 構(gòu)建出來的新的TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// ---------------------------------------------------
// CASE11:
// 條件成立:表示當前循環(huán)節(jié)點 屬于低位鏈節(jié)點
if ((h & n) == 0) {
// 條件成立:說明當前低位鏈表還沒有數(shù)據(jù)
if ((p.prev = loTail) == null)
lo = p;
// 說明低位鏈表已經(jīng)有數(shù)據(jù)了,此時當前元素追加到低位鏈表的末尾就行了
else
loTail.next = p;
// 將低位鏈表尾指針指向 p 節(jié)點
loTail = p;
++lc;
}
// ---------------------------------------------------
// CASE12:
// 條件成立:當前節(jié)點屬于高位鏈節(jié)點
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果分化的樹中元素個數(shù)小于等于6,則退化成鏈表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位樹的位置不變
setTabAt(nextTab, i, ln);
// 高位樹的位置是原位置加n
setTabAt(nextTab, i + n, hn);
// 標記該桶已遷移
setTabAt(tab, i, fwd);
// advance為true,返回上面進行--i操作
advance = true;
}
}
}
}
}
}
補充,lastRun機制示意圖:

小節(jié):
(1)新桶數(shù)組大小是舊桶數(shù)組的兩倍;
(2)遷移元素先從靠后的桶開始;
(3)遷移完成的桶在里面放置一ForwardingNode類型的元素,標記該桶遷移完成;
(4)遷移時根據(jù)hash&n是否等于0把桶中元素分化成兩個鏈表或樹;
(5)低位鏈表(樹)存儲在原來的位置;
(6)高們鏈表(樹)存儲在原來的位置加n的位置;
(7)遷移元素時會鎖住當前桶,也是分段鎖的思想;
2、helpTransfer方法
helpTransfer:協(xié)助擴容(遷移元素),當線程添加元素時發(fā)現(xiàn)table正在擴容,且當前元素所在的桶元素已經(jīng)遷移完成了,則協(xié)助遷移其它桶的元素。當前桶元素遷移完成了才去協(xié)助遷移其它桶元素!
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
// nextTab 引用的是 fwd.nextTable == map.nextTable 理論上是這樣。
// sc 保存map.sizeCtl
Node<K,V>[] nextTab; int sc;
// CASE0: 如果桶數(shù)組不為空,并且當前桶第一個元素為ForwardingNode類型,并且nextTab不為空
// 說明當前桶已經(jīng)遷移完畢了,才去幫忙遷移其它桶的元素
// 擴容時會把舊桶的第一個元素置為ForwardingNode,并讓其nextTab指向新桶數(shù)組
// 條件一:tab != null 恒成立 true
// 條件二:(f instanceof ForwardingNode) 恒成立 true
// 條件三:((ForwardingNode<K,V>)f).nextTable) != null 恒成立 true
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根據(jù)前表的長度tab.length去獲取擴容唯一標識戳,假設(shè) 16 -> 32 擴容:1000 0000 0001 1011
int rs = resizeStamp(tab.length);
// 條件一:nextTab == nextTable
// 成立:表示當前擴容正在進行中
// 不成立:1.nextTable被設(shè)置為Null了,擴容完畢后,會被設(shè)為Null
// 2.再次出發(fā)擴容了...咱們拿到的nextTab 也已經(jīng)過期了...
// 條件二:table == tab
// 成立:說明 擴容正在進行中,還未完成
// 不成立:說明擴容已經(jīng)結(jié)束了,擴容結(jié)束之后,最后退出的線程 會設(shè)置 nextTable 為 table
// 條件三:(sc = sizeCtl) < 0
// 成立:說明擴容正在進行中
// 不成立:說明sizeCtl當前是一個大于0的數(shù),此時代表下次擴容的閾值,當前擴容已經(jīng)結(jié)束。
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
// true -> 說明當前線程獲取到的擴容唯一標識戳 非 本批次擴容
// false -> 說明當前線程獲取到的擴容唯一標識戳 是 本批次擴容
// 條件二:JDK1.8 中有bug jira已經(jīng)提出來了 其實想表達的是 = sc == (rs << 16 ) + 1
// true -> 表示擴容完畢,當前線程不需要再參與進來了
// false -> 擴容還在進行中,當前線程可以參與
// 條件三:JDK1.8 中有bug jira已經(jīng)提出來了 其實想表達的是 = sc == (rs<<16) + MAX_RESIZERS
// true -> 表示當前參與并發(fā)擴容的線程達到了最大值 65535 - 1
// false -> 表示當前線程可以參與進來
// 條件四:transferIndex <= 0
// true -> 說明map對象全局范圍內(nèi)的任務已經(jīng)分配完了,當前線程進去也沒活干..
// false -> 還有任務可以分配。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 擴容線程數(shù)加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 當前線程幫忙遷移元素
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
3、總結(jié)
這塊代碼邏輯很難理解,可以結(jié)合著ConcurrentHashMap面試題去鞏固一下重新吸收~下一章是get、remove方法分析,也希望大家多多關(guān)注腳本之家的其他內(nèi)容!
相關(guān)文章
quartz實現(xiàn)定時功能實例詳解(servlet定時器配置方法)
Quartz是一個完全由java編寫的開源作業(yè)調(diào)度框架,下面提供一個小例子供大家參考,還有在servlet配置的方法2013-12-12
IDEA導入eclipse項目并且部署到tomcat的步驟詳解
這篇文章主要給大家介紹了關(guān)于IDEA導入eclipse項目并且部署到tomcat的相關(guān)資料,文中通過圖文介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-02-02
java實現(xiàn)將數(shù)字轉(zhuǎn)換成人民幣大寫
前面給大家介紹過使用javascript,php,c#,python等語言實現(xiàn)人民幣大寫格式化,這篇文章主要介紹了java實現(xiàn)將數(shù)字轉(zhuǎn)換成人民幣大寫的代碼,非常的簡單實用,分享給大家,需要的朋友可以參考下2015-04-04
Java Listener監(jiān)聽器使用規(guī)范詳細介紹
監(jiān)聽器是一個專門用于對其他對象身上發(fā)生的事件或狀態(tài)改變進行監(jiān)聽和相應處理的對象,當被監(jiān)視的對象發(fā)生情況時,立即采取相應的行動。監(jiān)聽器其實就是一個實現(xiàn)特定接口的普通java程序,這個程序?qū)iT用于監(jiān)聽另一個java對象的方法調(diào)用或?qū)傩愿淖?/div> 2023-01-01
SpringMVC實現(xiàn)獲取請求參數(shù)方法詳解
Spring MVC 是 Spring 提供的一個基于 MVC 設(shè)計模式的輕量級 Web 開發(fā)框架,本質(zhì)上相當于 Servlet,Spring MVC 角色劃分清晰,分工明細,這篇文章主要介紹了SpringMVC實現(xiàn)獲取請求參數(shù)方法2022-09-09最新評論

