ConcurrentHashMap線程安全及實(shí)現(xiàn)原理實(shí)例解析
背景
- 線程不安全的HashMap
因?yàn)槎嗑€程環(huán)境下,使用Hashmap進(jìn)行put操作會引起死循環(huán),導(dǎo)致CPU利用率接近100%(jdk1.7,1.8引入紅黑樹優(yōu)化了),多線程put可能會導(dǎo)致元素丟失。所以在并發(fā)情況下不能使用HashMap。
- 效率低下的HashTable容器
JDK1.7的實(shí)現(xiàn)
在JDK1.7版本中,ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)是由一個(gè)Segment數(shù)組和多個(gè)HashEntry組成,如下圖所示:

Segment數(shù)組的意義就是將一個(gè)大的table分割成多個(gè)小的table來進(jìn)行加鎖,也就是上面的提到的鎖分段技術(shù),而每一個(gè)Segment元素存儲的是HashEntry數(shù)組+鏈表,這個(gè)和HashMap的數(shù)據(jù)存儲結(jié)構(gòu)一樣。Segment(桶)
應(yīng)用場景
當(dāng)有一個(gè)大數(shù)組時(shí)需要在多個(gè)線程共享時(shí)就可以考慮是否把它給分層多個(gè)節(jié)點(diǎn)了,避免大鎖。并可以考慮通過hash算法進(jìn)行一些模塊定位。 其實(shí)不止用于線程,當(dāng)設(shè)計(jì)數(shù)據(jù)表的事務(wù)時(shí)(事務(wù)某種意義上也是同步機(jī)制的體現(xiàn)),可以把一個(gè)表看成一個(gè)需要同步的數(shù)組,如果操作的表數(shù)據(jù)太多時(shí)就可以考慮事務(wù)分離了(這也是為什么要避免大表的出現(xiàn)),比如把數(shù)據(jù)進(jìn)行字段拆分,水平分表等.
初始化
ConcurrentHashMap的初始化是會通過位與運(yùn)算來初始化Segment的大小,用ssize來表示,如下所示
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}如上所示,因?yàn)閟size用位于運(yùn)算來計(jì)算(ssize <<=1),所以Segment的大小取值都是以2的N次方,無關(guān)concurrencyLevel的取值,當(dāng)然concurrencyLevel最大只能用16位的二進(jìn)制來表示,即65536,換句話說,Segment的大小最多65536個(gè),沒有指定concurrencyLevel元素初始化,Segment的大小ssize默認(rèn)為16
每一個(gè)Segment元素下的HashEntry的初始化也是按照位于運(yùn)算來計(jì)算,用cap來表示,如下所示
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;如上所示,HashEntry大小的計(jì)算也是2的N次方(cap <<=1), cap的初始值為1,所以HashEntry最小的容量為2
JDK1.8的實(shí)現(xiàn)
JDK1.8的實(shí)現(xiàn)已經(jīng)摒棄了Segment的概念,而是直接用Node數(shù)組+鏈表+紅黑樹的數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn),并發(fā)控制使用Synchronized和CAS來操作,整個(gè)看起來就像是優(yōu)化過且線程安全的HashMap,雖然在JDK1.8中還能看到Segment的數(shù)據(jù)結(jié)構(gòu),但是已經(jīng)簡化了屬性,只是為了兼容舊版本。

基本屬性:
// 默認(rèn)初始值,必須是2的幕數(shù) private static final int DEFAULT_CAPACITY = 16; // 負(fù)載因子 private static final float LOAD_FACTOR = 0.75f; // 鏈表轉(zhuǎn)紅黑樹閥值,> 8 鏈表轉(zhuǎn)換為紅黑樹 static final int TREEIFY_THRESHOLD = 8; //樹轉(zhuǎn)鏈表閥值,小于等于6 ,僅在擴(kuò)容tranfer時(shí)才可能樹轉(zhuǎn)鏈表 static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大線程數(shù) private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中記錄size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 樹根節(jié)點(diǎn)的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 可用處理器數(shù)量 static final int NCPU = Runtime.getRuntime().availableProcessors(); //存放node的數(shù)組 第一次插入數(shù)據(jù)的時(shí)候初始化,大小是2的冪次方 transient volatile Node<K,V>[] table; /*控制標(biāo)識符,用來控制table的初始化和擴(kuò)容的操作,不同的值有不同的含義 *當(dāng)為負(fù)數(shù)時(shí):-1代表正在初始化,-N代表有N-1個(gè)線程正在 進(jìn)行擴(kuò)容 *當(dāng)為0時(shí):代表當(dāng)時(shí)的table還沒有被初始化 *當(dāng)為正數(shù)時(shí):表示初始化或者下一次進(jìn)行擴(kuò)容的大小 ,類似于擴(kuò)容閾值。它的值始終是當(dāng)前ConcurrentHashMap容量的0.75倍,這與loadfactor是對應(yīng)的。實(shí)際容量>=sizeCtl,則擴(kuò)容。 private transient volatile int sizeCtl;
put操作
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //兩次hash,減少hash沖突,可以均勻分布
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //對這個(gè)table進(jìn)行迭代
Node<K,V> f; int n, i, fh;
//這里就是上面構(gòu)造方法沒有進(jìn)行初始化,在這里進(jìn)行判斷,為null就調(diào)用initTable進(jìn)行初始化,屬于懶漢模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒有數(shù)據(jù),就直接無鎖插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//如果在進(jìn)行擴(kuò)容,則先進(jìn)行擴(kuò)容操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果以上條件都不滿足,那就要進(jìn)行加鎖操作,也就是存在hash沖突,鎖住鏈表或者紅黑樹的頭結(jié)點(diǎn)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示該節(jié)點(diǎn)是鏈表結(jié)構(gòu)
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//這里涉及到相同的key進(jìn)行put就會覆蓋原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入鏈表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//紅黑樹結(jié)構(gòu)
Node<K,V> p;
binCount = 2;
//紅黑樹結(jié)構(gòu)旋轉(zhuǎn)插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //如果鏈表的長度大于8時(shí)就會進(jìn)行紅黑樹的轉(zhuǎn)換
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//統(tǒng)計(jì)size,并且檢查是否需要擴(kuò)容
return null;
}這個(gè)put的過程很清晰,對當(dāng)前的table進(jìn)行無條件自循環(huán)直到put成功,可以分成以下六步流程來概述。
- 如果沒有初始化就先調(diào)用initTable()方法來進(jìn)行初始化過程
- 如果沒有hash沖突就直接CAS插入
- 如果還在進(jìn)行擴(kuò)容操作就先進(jìn)行擴(kuò)容
- 如果存在hash沖突,就加鎖來保證線程安全,這里有兩種情況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結(jié)構(gòu)插入,
- 最后一個(gè)如果該鏈表的數(shù)量大于閾值8,就要先轉(zhuǎn)換成黑紅樹的結(jié)構(gòu),break再一次進(jìn)入循環(huán)
- 如果添加成功就調(diào)用addCount()方法統(tǒng)計(jì)size,并且檢查是否需要擴(kuò)容
現(xiàn)在我們來對每一步的細(xì)節(jié)進(jìn)行源碼分析,在第一步中,符合條件會進(jìn)行初始化操作,我們來看看initTable()方法
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {//空的table才能進(jìn)入初始化操作
if ((sc = sizeCtl) < 0) //sizeCtl<0表示其他線程已經(jīng)在初始化了或者擴(kuò)容了,掛起當(dāng)前線程
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS操作SIZECTL為-1,表示初始化狀態(tài)
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化
table = tab = nt;
sc = n - (n >>> 2);//記錄下次擴(kuò)容的大小
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}在第二步中沒有hash沖突就直接調(diào)用Unsafe的方法CAS插入該元素,進(jìn)入第三步如果容器正在擴(kuò)容,則會調(diào)用helpTransfer()方法幫助擴(kuò)容,現(xiàn)在我們跟進(jìn)helpTransfer()方法看看
/**
*幫助從舊的table的元素復(fù)制到新的table中
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //新的table nextTba已經(jīng)存在前提下才能幫助擴(kuò)容
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);//調(diào)用擴(kuò)容方法
break;
}
}
return nextTab;
}
return table;
}其實(shí)helpTransfer()方法的目的就是調(diào)用多個(gè)工作線程一起幫助進(jìn)行擴(kuò)容,這樣的效率就會更高,而不是只有檢查到要擴(kuò)容的那個(gè)線程進(jìn)行擴(kuò)容操作,其他線程就要等待擴(kuò)容操作完成才能工作。
既然這里涉及到擴(kuò)容的操作,我們也一起來看看擴(kuò)容方法transfer()
擴(kuò)容原理
該方法的執(zhí)行邏輯如下:
- 計(jì)算每個(gè)線程可以處理的桶區(qū)間。默認(rèn) 16.
- 初始化臨時(shí)變量 nextTable,擴(kuò)容 2 倍。
- 死循環(huán),計(jì)算下標(biāo)。完成總體判斷。
如果桶內(nèi)有數(shù)據(jù),同步轉(zhuǎn)移數(shù)據(jù)。通常會像鏈表拆成 2 份。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 將 length / 8 然后除以 CPU核心數(shù)。如果得到的結(jié)果小于 16,那么就使用 16。
// 這里的目的是讓每個(gè) CPU 處理的桶一樣多,避免出現(xiàn)轉(zhuǎn)移任務(wù)不均勻的現(xiàn)象,如果桶較少的話,默認(rèn)一個(gè) CPU(一個(gè)線程)處理 16 個(gè)桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // stride 一步的距離
// 新的 table 尚未初始化
if (nextTab == null) { // initiating
try {
//構(gòu)建一個(gè)nextTable對象,其容量為原來容量的兩倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 擴(kuò)容失敗, sizeCtl 使用 int 最大值。
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更新成員變量
nextTable = nextTab;
// 更新轉(zhuǎn)移下標(biāo),就是 老的 tab 的 length
transferIndex = n;
}
// 新 tab 的 length
int nextn = nextTab.length;
// 創(chuàng)建一個(gè) fwd 節(jié)點(diǎn),用于占位。當(dāng)別的線程發(fā)現(xiàn)這個(gè)槽位中是 fwd 類型的節(jié)點(diǎn),則跳過這個(gè)節(jié)點(diǎn)。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 當(dāng)advance == true時(shí),表明該節(jié)點(diǎn)已經(jīng)處理過了
boolean advance = true;
// 完成狀態(tài),如果是 true,就結(jié)束此方法。
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循環(huán),i 表示下標(biāo),bound 表示當(dāng)前線程可以處理的當(dāng)前桶區(qū)間最小下標(biāo)
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 控制 --i ,遍歷原h(huán)ash表中的節(jié)點(diǎn)
while (advance) {
int nextIndex, nextBound;
// 對 i 減一,判斷是否大于等于 bound (正常情況下,如果大于 bound 不成立,說明該線程上次領(lǐng)取的任務(wù)已經(jīng)完成了。那么,需要在下面繼續(xù)領(lǐng)取任務(wù))
// 如果對 i 減一大于等于 bound(還需要繼續(xù)做任務(wù)),或者完成了,修改推進(jìn)狀態(tài)為 false,不能推進(jìn)了。任務(wù)成功后修改推進(jìn)狀態(tài)為 true。
// 通常,第一次進(jìn)入循環(huán),i-- 這個(gè)判斷會無法通過,從而走下面的 nextIndex 賦值操作(獲取最新的轉(zhuǎn)移下標(biāo))。其余情況都是:如果可以推進(jìn),將 i 減一,然后修改成不可推進(jìn)。如果 i 對應(yīng)的桶處理成功了,改成可以推進(jìn)。
if (--i >= bound || finishing)
advance = false; // 這里設(shè)置 false,是為了防止在沒有成功處理一個(gè)桶的情況下卻進(jìn)行了推進(jìn)
// 這里的目的是:1. 當(dāng)一個(gè)線程進(jìn)入時(shí),會選取最新的轉(zhuǎn)移下標(biāo)。2. 當(dāng)一個(gè)線程處理完自己的區(qū)間時(shí),如果還有剩余區(qū)間的沒有別的線程處理。再次獲取區(qū)間。
else if ((nextIndex = transferIndex) <= 0) {
// 如果小于等于0,說明沒有區(qū)間了 ,i 改成 -1,推進(jìn)狀態(tài)變成 false,不再推進(jìn),表示,擴(kuò)容結(jié)束了,當(dāng)前線程可以退出了
// 這個(gè) -1 會在下面的 if 塊里判斷,從而進(jìn)入完成狀態(tài)判斷
i = -1;
advance = false;
}
// CAS 修改 transferIndex,即 length - 區(qū)間值,留下剩余的區(qū)間值供后面的線程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound; // 這個(gè)值就是當(dāng)前線程可以處理的最小當(dāng)前區(qū)間最小下標(biāo)
i = nextIndex - 1; // 初次對i 賦值,這個(gè)就是當(dāng)前線程可以處理的當(dāng)前區(qū)間的最大下標(biāo)
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 已經(jīng)完成所有節(jié)點(diǎn)復(fù)制了
if (finishing) {
nextTable = null;
table = nextTab; // table 指向nextTable
sizeCtl = (n << 1) - (n >>> 1); // sizeCtl閾值為原來的1.5倍
return; // 跳出死循環(huán),
}
// CAS 更擴(kuò)容閾值,在這里面sizectl值減一,說明新加入一個(gè)線程參與到擴(kuò)容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 遍歷的節(jié)點(diǎn)為null,則放入到ForwardingNode 指針節(jié)點(diǎn)
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// f.hash == -1 表示遍歷到了ForwardingNode節(jié)點(diǎn),意味著該節(jié)點(diǎn)已經(jīng)處理過了
// 這里是控制并發(fā)擴(kuò)容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 節(jié)點(diǎn)加鎖
synchronized (f) {
// 節(jié)點(diǎn)復(fù)制工作
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 ,表示為鏈表節(jié)點(diǎn)
if (fh >= 0) {
// 構(gòu)造兩個(gè)鏈表 一個(gè)是原鏈表 另一個(gè)是原鏈表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置處插上鏈表
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置處插上鏈表
setTabAt(nextTab, i + n, hn);
// 在table i 位置處插上ForwardingNode 表示該節(jié)點(diǎn)已經(jīng)處理過了
setTabAt(tab, i, fwd);
// advance = true 可以執(zhí)行--i動作,遍歷節(jié)點(diǎn)
advance = true;
}
// 如果是TreeBin,則按照紅黑樹進(jìn)行處理,處理邏輯與上面一致
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 擴(kuò)容后樹節(jié)點(diǎn)個(gè)數(shù)若<=6,將樹轉(zhuǎn)鏈表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}擴(kuò)容過程有點(diǎn)復(fù)雜,這里主要涉及到多線程并發(fā)擴(kuò)容,ForwardingNode的作用就是支持?jǐn)U容操作,將已處理的節(jié)點(diǎn)和空節(jié)點(diǎn)置為ForwardingNode,并發(fā)處理時(shí)多個(gè)線程經(jīng)過ForwardingNode就表示已經(jīng)遍歷了,就往后遍歷,下圖是多線程合作擴(kuò)容的過程:

介紹完擴(kuò)容過程,我們再次回到put流程,在第四步中是向鏈表或者紅黑樹里加節(jié)點(diǎn),到第五步,會調(diào)用treeifyBin()方法進(jìn)行鏈表轉(zhuǎn)紅黑樹的過程。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//如果整個(gè)table的數(shù)量小于64,就擴(kuò)容至原來的一倍,不轉(zhuǎn)紅黑樹了
//因?yàn)檫@個(gè)閾值擴(kuò)容可以減少hash沖突,不必要去轉(zhuǎn)紅黑樹
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
//封裝成TreeNode
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//通過TreeBin對象對TreeNode轉(zhuǎn)換成紅黑樹
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}到第六步表示已經(jīng)數(shù)據(jù)加入成功了,現(xiàn)在調(diào)用addCount()方法計(jì)算ConcurrentHashMap的size,在原來的基礎(chǔ)上加一,現(xiàn)在來看看addCount()方法。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//更新baseCount,table的數(shù)量,counterCells表示元素個(gè)數(shù)的變化
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果多個(gè)線程都在執(zhí)行,則CAS失敗,執(zhí)行fullAddCount,全部加入count
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//check>=0表示需要進(jìn)行擴(kuò)容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//當(dāng)前線程發(fā)起庫哦哦讓操作,nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}put的流程現(xiàn)在已經(jīng)分析完了,你可以從中發(fā)現(xiàn),他在并發(fā)處理中使用的是樂觀鎖,當(dāng)有沖突的時(shí)候才進(jìn)行并發(fā)處理,而且流程步驟很清晰,但是細(xì)節(jié)設(shè)計(jì)的很復(fù)雜,畢竟多線程的場景也復(fù)雜。
get操作
我們現(xiàn)在要回到開始的例子中,我們對個(gè)人信息進(jìn)行了新增之后,我們要獲取所新增的信息,使用String name = map.get(“name”)獲取新增的name信息,現(xiàn)在我們依舊用debug的方式來分析下ConcurrentHashMap的獲取方法get()
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //計(jì)算兩次hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//讀取首節(jié)點(diǎn)的Node元素
if ((eh = e.hash) == h) { //如果該節(jié)點(diǎn)就是首節(jié)點(diǎn)就返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//hash值為負(fù)值表示正在擴(kuò)容,這個(gè)時(shí)候查的是ForwardingNode的find方法來定位到nextTable來
//查找,查找到就返回
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//既不是首節(jié)點(diǎn)也不是ForwardingNode,那就往下遍歷
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}ConcurrentHashMap的get操作的流程很簡單,也很清晰,可以分為三個(gè)步驟來描述
- 計(jì)算hash值,定位到該table索引位置,如果是首節(jié)點(diǎn)符合就返回
- 如果遇到擴(kuò)容的時(shí)候,會調(diào)用標(biāo)志正在擴(kuò)容節(jié)點(diǎn)ForwardingNode的find方法,查找該節(jié)點(diǎn),匹配就返回
- 以上都不符合的話,就往下遍歷節(jié)點(diǎn),匹配就返回,否則最后就返回null
size操作
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a; //變化的數(shù)量
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}在JDK1.8版本中,對于size的計(jì)算,在擴(kuò)容和addCount()方法就已經(jīng)有處理了,JDK1.7是在調(diào)用size()方法才去計(jì)算,其實(shí)在并發(fā)集合中去計(jì)算size是沒有多大的意義的,因?yàn)閟ize是實(shí)時(shí)在變的,只能計(jì)算某一刻的大小,但是某一刻太快了,人的感知是一個(gè)時(shí)間段,所以并不是很精確。
總結(jié)與思考
其實(shí)可以看出JDK1.8版本的ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)已經(jīng)接近HashMap,相對而言,ConcurrentHashMap只是增加了同步的操作來控制并發(fā),從JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+紅黑樹,相對而言,總結(jié)如下思考:
- JDK1.8的實(shí)現(xiàn)降低鎖的粒度,JDK1.7版本鎖的粒度是基于Segment的,包含多個(gè)HashEntry,而JDK1.8鎖的粒度就是HashEntry(首節(jié)點(diǎn))
- JDK1.8版本的數(shù)據(jù)結(jié)構(gòu)變得更加簡單,使得操作也更加清晰流暢,因?yàn)橐呀?jīng)使用synchronized來進(jìn)行同步,所以不需要分段鎖的概念,也就不需要Segment這種數(shù)據(jù)結(jié)構(gòu)了,由于粒度的降低,實(shí)現(xiàn)的復(fù)雜度也增加了
- JDK1.8使用紅黑樹來優(yōu)化鏈表,基于長度很長的鏈表的遍歷是一個(gè)很漫長的過程,而紅黑樹的遍歷效率是很快的,代替一定閾值的鏈表,這樣形成一個(gè)最佳拍檔
- JDK1.8為什么使用內(nèi)置鎖synchronized來代替重入鎖ReentrantLock,我覺得有以下幾點(diǎn):
- 因?yàn)榱6冉档土?,在相對而言的低粒度加鎖方式,synchronized并不比ReentrantLock差,在粗粒度加鎖中ReentrantLock可能通過Condition來控制各個(gè)低粒度的邊界,更加的靈活,而在低粒度中,Condition的優(yōu)勢就沒有了
- JVM的開發(fā)團(tuán)隊(duì)從來都沒有放棄synchronized,而且基于JVM的synchronized優(yōu)化空間更大,使用內(nèi)嵌的關(guān)鍵字比使用API更加自然
- 減少內(nèi)存開銷 :在大量的數(shù)據(jù)操作下,對于JVM的內(nèi)存壓力,基于API的ReentrantLock會開銷更多的內(nèi)存,雖然不是瓶頸,但是也是一個(gè)選擇依據(jù)
以上就是ConcurrentHashMap線程安全及實(shí)現(xiàn)原理實(shí)例解析的詳細(xì)內(nèi)容,更多關(guān)于ConcurrentHashMap線程安全的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
spring中websocket定時(shí)任務(wù)實(shí)現(xiàn)實(shí)時(shí)推送
本文主要介紹了spring中websocket定時(shí)任務(wù)實(shí)現(xiàn)實(shí)時(shí)推送,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Java線程使用同步鎖交替執(zhí)行打印奇數(shù)偶數(shù)的方法
這篇文章主要介紹了Java線程使用同步鎖交替執(zhí)行打印奇數(shù)偶數(shù)的方法。小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-01-01
Java如何優(yōu)雅關(guān)閉異步中的ExecutorService
在并發(fā)編程領(lǐng)域,Java的ExecutorService是線程池管理的關(guān)鍵接口,這篇文章主要為大家介紹了如何優(yōu)雅關(guān)閉異步中的ExecutorService,需要的可以了解下2025-02-02
Java全面細(xì)致講解Cookie與Session及kaptcha驗(yàn)證碼的使用
web開發(fā)階段我們主要是瀏覽器和服務(wù)器之間來進(jìn)行交互。瀏覽器和服務(wù)器之間的交互就像人和人之間進(jìn)行交流一樣,但是對于機(jī)器來說,在一次請求之間只是會攜帶著本次請求的數(shù)據(jù)的,但是可能多次請求之間是會有聯(lián)系的,所以提供了會話機(jī)制2022-06-06
Struts2+Hibernate實(shí)現(xiàn)數(shù)據(jù)分頁的方法
這篇文章主要介紹了Struts2+Hibernate實(shí)現(xiàn)數(shù)據(jù)分頁的方法,結(jié)合實(shí)例形式分析了Struts2結(jié)合Hibernate實(shí)現(xiàn)數(shù)據(jù)分頁的原理,步驟與相關(guān)實(shí)現(xiàn)代碼,需要的朋友可以參考下2016-03-03
如何從eureka獲取服務(wù)的ip和端口號進(jìn)行Http的調(diào)用
這篇文章主要介紹了如何從eureka獲取服務(wù)的ip和端口號進(jìn)行Http的調(diào)用,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03

