解析ConcurrentHashMap: transfer方法源碼分析(難點)
- 上一篇文章介紹過put方法以及其相關的方法,接下來,本篇就介紹一下transfer這個方法(比較難),最好能動手結合著源碼進行分析,并仔細理解前面幾篇文章的內(nèi)容~
- 注:代碼分析的注釋中的CASE0、CASE1… ,這些并沒有直接關聯(lián)關系,只是為了給每個if邏輯判斷加一個標識,方便在其他邏輯判斷的地方進行引用而已。
再復習一下并發(fā)Map的結構圖:
1、transfer方法
transfer
方法的作用是:遷移元素,擴容時table容量變?yōu)樵瓉淼膬杀叮巡糠衷剡w移到其它桶nextTable中。該方法遷移數(shù)據(jù)的流程就是在發(fā)生擴容時,從散列表原table中,按照桶位下標,依次將每個桶中的元素(結點、鏈表、樹)遷移到新表nextTable中。
另外還有與其相關的方法helpTransfer
:協(xié)助擴容(遷移元素),當線程添加元素時發(fā)現(xiàn)table正在擴容,且當前元素所在的桶元素已經(jīng)遷移完成了,則協(xié)助遷移其它桶的元素。
下圖為并發(fā)擴容流程圖,在分析源碼前熟悉一下流程:
鏈表遷移示意圖:
下面就開始分析代碼把:
/** * 遷移元素,擴容時table容量變?yōu)樵瓉淼膬杀?,并把部分元素遷移到其它桶nextTable中: */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // n 表示擴容之前table數(shù)組的長度 // stride 表示分配給線程任務的步長 int n = tab.length, stride; // 這里為了方便分析,根據(jù)CUP核數(shù),stride 固定為MIN_TRANSFER_STRIDE 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 控制線程遷移數(shù)據(jù)的最小步長(桶位的跨度~) stride = MIN_TRANSFER_STRIDE; // subdivide range // ------------------------------------------------------------------------------ // CASE0:nextTab == null // 條件成立:表示當前線程為觸發(fā)本次擴容的線程,需要做一些擴容準備工作:(在if代碼塊中) // 條件不成立:表示當前線程是協(xié)助擴容的線程... if (nextTab == null) { // initiating try { // 創(chuàng)建一個比擴容之前容量n大一倍的新table @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 賦值nextTab對象給 nextTable ,方便協(xié)助擴容線程 拿到新表 nextTable = nextTab; // 記錄遷移數(shù)據(jù)整體位置的一個標記,初始值是原table數(shù)組的長度。 // 可以理解為:全局范圍內(nèi)散列表的數(shù)據(jù)任務處理到哪個桶的位置了 transferIndex = n; } // 表示新數(shù)組的長度 int nextn = nextTab.length; // fwd節(jié)點,當某個桶位數(shù)據(jù)遷移處理完畢后,將此桶位設置為fwd節(jié)點,其它寫線程或讀線程看到后,會有不同邏輯。fwd結點指向新表nextTab的引用 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 推進標記(后面講數(shù)據(jù)遷移的時候再分析~) boolean advance = true; // 完成標記(后面講數(shù)據(jù)遷移的時候再分析~) boolean finishing = false; // to ensure sweep before committing nextTab // i 表示分配給當前線程的數(shù)據(jù)遷移任務,任務執(zhí)行到的桶位下標(任務進度~表示當前線程的任務已經(jīng)干到哪里了~) // bound 表示分配給當前線程任務的下界限制。(線程的數(shù)據(jù)遷移任務先從高位開始遷移,遷移到下界位置結束) int i = 0, bound = 0; // 自旋~ 非常長的一個for循環(huán)... for (;;) { // f 桶位的頭結點 // fh 頭結點的hash Node<K,V> f; int fh; /** * while循環(huán)的任務如下:(循環(huán)的條件為推進標記advance為true) * 1.給當前線程分配任務區(qū)間 * 2.維護當前線程任務進度(i 表示當前處理的桶位) * 3.維護map對象全局范圍內(nèi)的進度 */ // 整個while循環(huán)就是在算i的值,過程太復雜,不用太關心 // i的值會從n-1依次遞減,感興趣的可以打下斷點就知道了 // 其中n是舊桶數(shù)組的大小,也就是說i從15開始一直減到1這樣去遷移元素 while (advance) { // nextIndex~nextBound分配任務的區(qū)間: // 分配任務的開始下標 // 分配任務的結束下標 int nextIndex, nextBound; // ----------------------------------------------------------------------- // CASE1: // 條件一:--i >= bound // 成立:表示當前線程的任務尚未完成,還有相應的區(qū)間的桶位要處理,--i 就讓當前線程處理下一個桶位. // 不成立:表示當前線程任務已完成 或者 未分配 if (--i >= bound || finishing) // 推進標記設置為flase advance = false; // ----------------------------------------------------------------------- // CASE2: (nextIndex = transferIndex) <= 0 // transferIndex:可以理解為,全局范圍內(nèi)散列表的數(shù)據(jù)任務處理到哪個桶的位置了~ // 前置條件:當前線程任務已完成 或 者未分配,即沒有經(jīng)過CASE1 // 條件成立:表示對象全局范圍內(nèi)的桶位都分配完畢了,沒有區(qū)間可分配了,設置當前線程的i變量為-1 跳出循環(huán),然后執(zhí)行退出遷移任務相關的程序 // 條件不成立:表示對象全局范圍內(nèi)的桶位尚未分配完畢,還有區(qū)間可分配~ else if ((nextIndex = transferIndex) <= 0) { i = -1; // 推進標記設置為flase advance = false; } // ----------------------------------------------------------------------- // CASE3,其實就是移動i的一個過程:CAS更新成功,則i從當前位置-1,再復習一下i的含義: // i 表示分配給當前線程的數(shù)據(jù)遷移任務,任務執(zhí)行到的桶位下標(任務進度~表示當前線程的任務已經(jīng)干到哪里了~) // CASE3前置條件(未經(jīng)過CASE1-2):1、當前線程需要分配任務區(qū)間 2.全局范圍內(nèi)還有桶位尚未遷移 // 條件成立:說明給當前線程分配任務成功 // 條件失?。赫f明分配任務給當前線程失敗,應該是和其它線程發(fā)生了競爭~ else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } /** 處理線程任務完成后,線程退出transfer方法的邏輯: **/ // ------------------------------------------------------------------------- // CASE4: // 條件一:i < 0 // 成立:表示當前線程未分配到任務,或已完成了任務 // 條件二、三:一般情況下不會成里~ if (i < 0 || i >= n || i + n >= nextn) { // 如果一次遍歷完成了 // 也就是整個map所有桶中的元素都遷移完成了 // 保存sizeCtl的變量 int sc; // 擴容結束條件判斷 if (finishing) { // 如果全部桶中數(shù)據(jù)遷移完成了,則替換舊桶數(shù)組 // 并設置下一次擴容門檻為新桶數(shù)組容量的0.75倍 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } // ------------------------------------------------------------------------- // CASE5:當前線程擴容完成,把并發(fā)擴容的線程數(shù)-1,該操作基于CAS,修改成功返回true // 條件成立:說明,更新 sizeCtl低16位 -1 操作成功,當前線程可以正常退出了~ // 如果條件不成立:說明CAS更新操作時,與其他線程發(fā)生競爭了~ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 條件成立:說明當前線程不是最后一個退出transfer任務的線程 // eg: // 1000 0000 0001 1011 0000 0000 0000 0000 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 擴容完成兩邊肯定相等 // 正常退出 return; // 完成標記finishing置為true,,finishing為true才會走到上面的if擴容結束條件判斷 // 推進標記advance置為true finishing = advance = true; // i重新賦值為n // 這樣會再重新遍歷一次桶數(shù)組,看看是不是都遷移完成了 // 也就是第二次遍歷都會走到下面的(fh = f.hash) == MOVED這個條件 i = n; // recheck before commit } } /** 線程處理一個桶位數(shù)據(jù)的遷移工作,處理完畢后, 設置advance為true: 表示繼續(xù)推薦,然后就會回到for,繼續(xù)循環(huán) **/ // ------------------------------------------------------------------------- // CASE6: // 【CASE6~CASE8】前置條件:當前線程任務尚未處理完,正在進行中! // 條件成立:說明當前桶位未存放數(shù)據(jù),只需要將此處設置為fwd節(jié)點即可。 else if ((f = tabAt(tab, i)) == null) // 如果桶中無數(shù)據(jù),直接放入FWD,標記該桶已遷移: // casTabAt通過CAS的方式去向Node數(shù)組指定位置i設置節(jié)點值,設置成功返回true,否則返回false // 即:將tab數(shù)組下標為i的結點設置為FWD結點 advance = casTabAt(tab, i, null, fwd); // ------------------------------------------------------------------------- // CASE7: (fh = f.hash) == MOVED 如果桶中第一個元素的hash值為MOVED // 條件成立:說明頭節(jié)f它是ForwardingNode節(jié)點, // 則,當前桶位已經(jīng)遷移過了,當前線程不用再處理了,直接再次更新當前線程任務索引,再次處理下一個桶位 或者 其它操作~ else if ((fh = f.hash) == MOVED) advance = true; // already processed // ------------------------------------------------------------------------- // CASE8: // 前置條件:**當前桶位有數(shù)據(jù)**,而且node節(jié)點 不是 fwd節(jié)點,說明這些數(shù)據(jù)需要遷移。 else { // 鎖定該桶并遷移元素:(sync 鎖加在當前桶位的頭結點) synchronized (f) { // 再次判斷當前桶第一個元素是否有修改,(可能出現(xiàn)其它線程搶先一步遷移/修改了元素) // 防止在你加鎖頭對象之前,當前桶位的頭對象被其它寫線程修改過,導致你目前加鎖對象錯誤... if (tabAt(tab, i) == f) { // 把一個鏈表拆分成兩個鏈表(規(guī)則按照是桶中各元素的hash與桶大小n進行與操作): // 等于0的放到低位鏈表(low)中,不等于0的放到高位鏈表(high)中 // 其中低位鏈表遷移到新桶中的位置相對舊桶不變 // 高位鏈表遷移到新桶中位置正好是其在舊桶的位置加n // 這也正是為什么擴容時容量在變成兩倍的原因 (鏈表遷移原理參考上面的圖) // ln 表示低位鏈表引用 // hn 表示高位鏈表引用 Node<K,V> ln, hn; // --------------------------------------------------------------- // CASE9: // 條件成立:表示當前桶位是鏈表桶位 if (fh >= 0) { // 第一個元素的hash值大于等于0,說明該桶中元素是以鏈表形式存儲的 // 這里與HashMap遷移算法基本類似,唯一不同的是多了一步尋找lastRun // 這里的lastRun是提取出鏈表后面不用處理再特殊處理的子鏈表 // 比如所有元素的hash值與桶大小n與操作后的值分別為 0 0 4 4 0 0 0 // 則最后后面三個0對應的元素肯定還是在同一個桶中 // 這時lastRun對應的就是倒數(shù)第三個節(jié)點 // 至于為啥要這樣處理,我也沒太搞明白 // lastRun機制: // 可以獲取出 當前鏈表 末尾連續(xù)高位不變的 node int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 看看最后這幾個元素歸屬于低位鏈表還是高位鏈表 // 條件成立:說明lastRun引用的鏈表為 低位鏈表,那么就讓 ln 指向 低位鏈表 if (runBit == 0) { ln = lastRun; hn = null; } // 否則,說明lastRun引用的鏈表為 高位鏈表,就讓 hn 指向 高位鏈表 else { hn = lastRun; ln = null; } // 遍歷鏈表,把hash&n為0的放在低位鏈表中,不為0的放在高位鏈表中 // 循環(huán)跳出條件:當前循環(huán)結點!=lastRun for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 低位鏈表的位置不變 setTabAt(nextTab, i, ln); // 高位鏈表的位置是:原位置 + n setTabAt(nextTab, i + n, hn); // 標記當前桶已遷移 setTabAt(tab, i, fwd); // advance為true,返回上面進行--i操作 advance = true; } // --------------------------------------------------------------- // CASE10: // 條件成立:表示當前桶位是 紅黑樹 代理結點TreeBin else if (f instanceof TreeBin) { // 如果第一個元素是樹節(jié)點,也是一樣,分化成兩顆樹 // 也是根據(jù)hash&n為0放在低位樹中,不為0放在高位樹中 // 轉(zhuǎn)換頭結點為 treeBin引用 t TreeBin<K,V> t = (TreeBin<K,V>)f; // 低位雙向鏈表 lo 指向低位鏈表的頭 loTail 指向低位鏈表的尾巴 TreeNode<K,V> lo = null, loTail = null; // 高位雙向鏈表 lo 指向高位鏈表的頭 loTail 指向高位鏈表的尾巴 TreeNode<K,V> hi = null, hiTail = null; // lc 表示低位鏈表元素數(shù)量 // hc 表示高位鏈表元素數(shù)量 int lc = 0, hc = 0; // 遍歷整顆樹(TreeBin中的雙向鏈表,從頭結點至尾節(jié)點),根據(jù)hash&n是否為0分化成兩顆樹: for (Node<K,V> e = t.first; e != null; e = e.next) { // h 表示循環(huán)處理當前元素的 hash int h = e.hash; // 使用當前節(jié)點 構建出來的新的TreeNode TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // --------------------------------------------------- // CASE11: // 條件成立:表示當前循環(huán)節(jié)點 屬于低位鏈節(jié)點 if ((h & n) == 0) { // 條件成立:說明當前低位鏈表還沒有數(shù)據(jù) if ((p.prev = loTail) == null) lo = p; // 說明低位鏈表已經(jīng)有數(shù)據(jù)了,此時當前元素追加到低位鏈表的末尾就行了 else loTail.next = p; // 將低位鏈表尾指針指向 p 節(jié)點 loTail = p; ++lc; } // --------------------------------------------------- // CASE12: // 條件成立:當前節(jié)點屬于高位鏈節(jié)點 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果分化的樹中元素個數(shù)小于等于6,則退化成鏈表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位樹的位置不變 setTabAt(nextTab, i, ln); // 高位樹的位置是原位置加n setTabAt(nextTab, i + n, hn); // 標記該桶已遷移 setTabAt(tab, i, fwd); // advance為true,返回上面進行--i操作 advance = true; } } } } } }
補充,lastRun機制示意圖:
小節(jié):
(1)新桶數(shù)組大小是舊桶數(shù)組的兩倍;
(2)遷移元素先從靠后的桶開始;
(3)遷移完成的桶在里面放置一ForwardingNode類型的元素,標記該桶遷移完成;
(4)遷移時根據(jù)hash&n是否等于0把桶中元素分化成兩個鏈表或樹;
(5)低位鏈表(樹)存儲在原來的位置;
(6)高們鏈表(樹)存儲在原來的位置加n的位置;
(7)遷移元素時會鎖住當前桶,也是分段鎖的思想;
2、helpTransfer方法
helpTransfer
:協(xié)助擴容(遷移元素),當線程添加元素時發(fā)現(xiàn)table正在擴容,且當前元素所在的桶元素已經(jīng)遷移完成了,則協(xié)助遷移其它桶的元素。當前桶元素遷移完成了才去協(xié)助遷移其它桶元素!
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { // nextTab 引用的是 fwd.nextTable == map.nextTable 理論上是這樣。 // sc 保存map.sizeCtl Node<K,V>[] nextTab; int sc; // CASE0: 如果桶數(shù)組不為空,并且當前桶第一個元素為ForwardingNode類型,并且nextTab不為空 // 說明當前桶已經(jīng)遷移完畢了,才去幫忙遷移其它桶的元素 // 擴容時會把舊桶的第一個元素置為ForwardingNode,并讓其nextTab指向新桶數(shù)組 // 條件一:tab != null 恒成立 true // 條件二:(f instanceof ForwardingNode) 恒成立 true // 條件三:((ForwardingNode<K,V>)f).nextTable) != null 恒成立 true if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // 根據(jù)前表的長度tab.length去獲取擴容唯一標識戳,假設 16 -> 32 擴容:1000 0000 0001 1011 int rs = resizeStamp(tab.length); // 條件一:nextTab == nextTable // 成立:表示當前擴容正在進行中 // 不成立:1.nextTable被設置為Null了,擴容完畢后,會被設為Null // 2.再次出發(fā)擴容了...咱們拿到的nextTab 也已經(jīng)過期了... // 條件二:table == tab // 成立:說明 擴容正在進行中,還未完成 // 不成立:說明擴容已經(jīng)結束了,擴容結束之后,最后退出的線程 會設置 nextTable 為 table // 條件三:(sc = sizeCtl) < 0 // 成立:說明擴容正在進行中 // 不成立:說明sizeCtl當前是一個大于0的數(shù),此時代表下次擴容的閾值,當前擴容已經(jīng)結束。 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs // true -> 說明當前線程獲取到的擴容唯一標識戳 非 本批次擴容 // false -> 說明當前線程獲取到的擴容唯一標識戳 是 本批次擴容 // 條件二:JDK1.8 中有bug jira已經(jīng)提出來了 其實想表達的是 = sc == (rs << 16 ) + 1 // true -> 表示擴容完畢,當前線程不需要再參與進來了 // false -> 擴容還在進行中,當前線程可以參與 // 條件三:JDK1.8 中有bug jira已經(jīng)提出來了 其實想表達的是 = sc == (rs<<16) + MAX_RESIZERS // true -> 表示當前參與并發(fā)擴容的線程達到了最大值 65535 - 1 // false -> 表示當前線程可以參與進來 // 條件四:transferIndex <= 0 // true -> 說明map對象全局范圍內(nèi)的任務已經(jīng)分配完了,當前線程進去也沒活干.. // false -> 還有任務可以分配。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 擴容線程數(shù)加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 當前線程幫忙遷移元素 transfer(tab, nextTab); break; } } return nextTab; } return table; }
3、總結
這塊代碼邏輯很難理解,可以結合著ConcurrentHashMap面試題去鞏固一下重新吸收~下一章是get、remove方法分析,也希望大家多多關注腳本之家的其他內(nèi)容!
相關文章
quartz實現(xiàn)定時功能實例詳解(servlet定時器配置方法)
Quartz是一個完全由java編寫的開源作業(yè)調(diào)度框架,下面提供一個小例子供大家參考,還有在servlet配置的方法2013-12-12IDEA導入eclipse項目并且部署到tomcat的步驟詳解
這篇文章主要給大家介紹了關于IDEA導入eclipse項目并且部署到tomcat的相關資料,文中通過圖文介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-02-02java實現(xiàn)將數(shù)字轉(zhuǎn)換成人民幣大寫
前面給大家介紹過使用javascript,php,c#,python等語言實現(xiàn)人民幣大寫格式化,這篇文章主要介紹了java實現(xiàn)將數(shù)字轉(zhuǎn)換成人民幣大寫的代碼,非常的簡單實用,分享給大家,需要的朋友可以參考下2015-04-04Java Listener監(jiān)聽器使用規(guī)范詳細介紹
監(jiān)聽器是一個專門用于對其他對象身上發(fā)生的事件或狀態(tài)改變進行監(jiān)聽和相應處理的對象,當被監(jiān)視的對象發(fā)生情況時,立即采取相應的行動。監(jiān)聽器其實就是一個實現(xiàn)特定接口的普通java程序,這個程序?qū)iT用于監(jiān)聽另一個java對象的方法調(diào)用或?qū)傩愿淖?/div> 2023-01-01SpringMVC實現(xiàn)獲取請求參數(shù)方法詳解
Spring MVC 是 Spring 提供的一個基于 MVC 設計模式的輕量級 Web 開發(fā)框架,本質(zhì)上相當于 Servlet,Spring MVC 角色劃分清晰,分工明細,這篇文章主要介紹了SpringMVC實現(xiàn)獲取請求參數(shù)方法2022-09-09最新評論