一篇帶你解析入門LongAdder源碼
1、LongAdder由來
LongAdder
類是JDK1.8新增的一個原子性操作類。AtomicLon
g通過CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器來說性能已經(jīng)很好了,但是JDK開發(fā)組并不滿足于此,因為經(jīng)常搞并發(fā)的請求下AtomicLong
的性能是不能讓人接受的。
如下AtomicLong
的incrementAndGet
的代碼,雖然AtomicLong
使用CAS算法,但是CAS失敗后還是通過無限循環(huán)的自旋鎖不多的嘗試,這就是高并發(fā)下CAS性能低下的原因所在。源碼如下:
public final long incrementAndGet() { for (;;) { long current = get(); long next = current + 1; if (compareAndSet(current, next)) return next; } }
高并發(fā)下N多線程同時去操作一個變量會造成大量線程CAS失敗,然后處于自旋狀態(tài),導(dǎo)致嚴重浪費CPU資源,降低了并發(fā)性。
2、LongAdder與AtomicLong的簡單介紹
我們知道,volatile
關(guān)鍵字是輕量級鎖,可以解決多線程內(nèi)存不可見問題。對于一寫多讀,可以解決變量同步問題,但是如果是多寫,volatile
無法解決線程安全問題的。例如,count++操作,就應(yīng)該使用如下方式: AtomicInteger count = new AtomicInteger();
、count.addAndGet(1);
而如果是JDK8及以上,推薦使用LongAdder對象替代,因為它的性能比AtomicLong
更好(減少樂觀鎖的重試次數(shù))。
LongAdder其他應(yīng)用場景:
對于Java項目
中計數(shù)統(tǒng)計的一些需求,如果是 JDK8,推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數(shù))
在大多數(shù)項目及開源組件中,計數(shù)統(tǒng)計使用最多的仍然還是AtomicLong
,雖然是阿里巴巴這樣說,但是我們?nèi)匀灰鶕?jù)使用場景來決定是否使用LongAdder
。
今天主要是來講講LongAdder
的實現(xiàn)原理,還是老方式,通過圖文一步步解開LongAdder
神秘的面紗,通過此篇文章你會了解到:
- 為什么AtomicLong在高并發(fā)場景下性能急劇下降?
- LongAdder為什么快?
- LongAdder實現(xiàn)原理(圖文分析)
- AtomicLong是否可以被遺棄或替換?
本文代碼全部基于JDK 1.8,建議邊看文章邊看源碼更加利于消化!
3、AtomicLong
當我們在進行計數(shù)統(tǒng)計的時,通常會使用AtomicLong
來實現(xiàn)。AtomicLong
能保證并發(fā)情況下計數(shù)的準確性,其內(nèi)部通過CAS來解決并發(fā)安全性的問題。
3.1 AtomicLong實現(xiàn)原理
說到線程安全的計數(shù)統(tǒng)計工具類,肯定少不了Atomic
下的幾個原子類。AtomicLong
就是juc包下重要的原子類,在并發(fā)情況下可以對長整形類型數(shù)據(jù)進行原子操作,保證并發(fā)情況下數(shù)據(jù)的安全性。
public class AtomicLong extends Number implements java.io.Serializable { // + 1 public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } // - 1 public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; } }
我們在計數(shù)的過程中,一般使用incrementAndGet()
和decrementAndGet()
進行加一和減一操作,這里調(diào)用了Unsafe
類中的getAndAddLong()
方法進行操作。
接著看看unsafe.getAndAddLong()方法:
public final class Unsafe { public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); }
這里直接進行CAS+自旋操作更新AtomicLong
中的value
值,進而保證value
值的原子性更新。
3.2 AtomicLong瓶頸分析
如上代碼所示,我們在使用CAS + 自旋的過程中,在高并發(fā)環(huán)境下,N個線程同時進行自旋操作,會出現(xiàn)大量失敗并不斷自旋的情況,此時AtomicLong
的自旋會成為瓶頸。
如上圖所示,高并發(fā)場景下AtomicLong
性能會急劇下降,我們后面也會舉例說明。
那么高并發(fā)下計數(shù)的需求有沒有更好的替代方案呢?在JDK8
中 Doug Lea
大神新寫了一個LongAdder
來解決此問題,我們后面來看LongAdder
是如何優(yōu)化的。
4、LongAdder
4.1 LongAdder和AtomicLong性能測試
我們說了很多LongAdder上性能優(yōu)于AtomicLong,到底是不是呢?一切還是以代碼說話:
/** * Atomic和LongAdder耗時測試 */ public class AtomicLongAdderTest { public static void main(String[] args) throws Exception{ testAtomicLongAdder(1, 10000000); testAtomicLongAdder(10, 10000000); testAtomicLongAdder(100, 10000000); } static void testAtomicLongAdder(int threadCount, int times) throws Exception{ System.out.println("threadCount: " + threadCount + ", times: " + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder 耗時:" + (System.currentTimeMillis() - start) + "ms"); System.out.println("threadCount: " + threadCount + ", times: " + times); long atomicStart = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong 耗時:" + (System.currentTimeMillis() - atomicStart) + "ms"); System.out.println("----------------------------------------"); } static void testAtomicLong(int threadCount, int times) throws Exception{ AtomicLong atomicLong = new AtomicLong(); List<Thread> list = Lists.newArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { atomicLong.incrementAndGet(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("AtomicLong value is : " + atomicLong.get()); } static void testLongAdder(int threadCount, int times) throws Exception{ LongAdder longAdder = new LongAdder(); List<Thread> list = Lists.newArrayList(); for (int i = 0; i < threadCount; i++) { list.add(new Thread(() -> { for (int j = 0; j < times; j++) { longAdder.increment(); } })); } for (Thread thread : list) { thread.start(); } for (Thread thread : list) { thread.join(); } System.out.println("LongAdder value is : " + longAdder.longValue()); } }
執(zhí)行結(jié)果:
這里可以看到隨著并發(fā)的增加,AtomicLong
性能是急劇下降的,耗時是LongAdder
的數(shù)倍。至于原因我們還是接著往后看。
4.2 LongAdder為什么這么快
先看下LongAdder
的操作原理圖:
既然說到LongAdder
可以顯著提升高并發(fā)環(huán)境下的性能,那么它是如何做到的?
1、 設(shè)計思想上,LongAdder
采用"分段"的方式降低CAS失敗的頻次
這里先簡單的說下LongAdder
的思路,后面還會詳述LongAdder
的原理。
我們知道,AtomicLong中有個內(nèi)部變量value保存著實際的long值,所有的操作都是針對該變量進行。也就是說,高并發(fā)環(huán)境下,value
變量其實是一個熱點數(shù)據(jù),也就是N個線程競爭一個熱點。
LongAdder
的基本思路就是分散熱點,將value
值的新增操作分散到一個數(shù)組中,不同線程會命中到數(shù)組的不同槽中,各個線程只對自己槽中的那個value
值進行CAS操作,這樣熱點就被分散了,沖突的概率就小很多。
LongAdde
r有一個全局變量volatile long base
值,當并發(fā)不高的情況下都是通過CAS來直接操作base值,如果CAS失敗,則針對LongAdder
中的Cell[]
數(shù)組中的Cell進行CAS操作,減少失敗的概率。
例如當前類中base = 10
,有三個線程進行CAS
原子性的**+1操作**,線程一執(zhí)行成功,此時base=11,線程二、線程三執(zhí)行失敗后開始針對于Cell[]
數(shù)組中的Cell
元素進行**+1操作**,同樣也是CAS
操作,此時數(shù)組index=1
和index=2
中Cell
的value
都被設(shè)置為了1.
執(zhí)行完成后,統(tǒng)計累加數(shù)據(jù):sum = 11 + 1 + 1 = 13,利用LongAdder進行累加的操作就執(zhí)行完了,流程圖如下:
如果要獲取真正的long
值,只要將各個槽中的變量值累加返回。這種分段的做法類似于JDK7中ConcurrentHashMap
的分段鎖。
2、使用Contended注解來消除偽共享
在 LongAdder
的父類 Striped64
中存在一個 volatile Cell[] cells;
數(shù)組,其長度是2 的冪次方,每個Cell
都使用 @Contended
注解進行修飾,而@Contended
注解可以進行緩存行填充,從而解決偽共享問題。偽共享會導(dǎo)致緩存行失效,緩存一致性開銷變大。
@sun.misc.Contended static final class Cell { }
偽共享指的是多個線程同時讀寫同一個緩存行的不同變量時導(dǎo)致的 CPU緩存失效
。盡管這些變量之間沒有任何關(guān)系,但由于在主內(nèi)存中鄰近,存在于同一個緩存行之中,它們的相互覆蓋會導(dǎo)致頻繁的緩存未命中,引發(fā)性能下降。這里對于偽共享我只是提一下概念,并不會深入去講解,大家可以自行查閱一些資料。
解決偽共享的方法一般都是使用直接填充,我們只需要保證不同線程的變量存在于不同的 CacheLine
即可,使用多余的字節(jié)來填充可以做點這一點,這樣就不會出現(xiàn)偽共享問題。例如在Disruptor
隊列的設(shè)計中就有類似設(shè)計。
在Striped64
類中我們可以看看Doug Lea
在Cell
上加的注釋也有說明這一點:
框中的翻譯如下:
Cell
類是AtomicLong
添加了padded(via@sun.misc.compended)
來消除偽共享的變種版本。緩存行填充對于大多數(shù)原子來說是繁瑣的,因為它們通常不規(guī)則地分散在內(nèi)存中,因此彼此之間不會有太大的干擾。但是,駐留在數(shù)組中的原子對象往往彼此相鄰,因此在沒有這種預(yù)防措施的情況下,通常會共享緩存行數(shù)據(jù)(對性能有巨大的負面影響)。
3、惰性求值
LongAdder
只有在使用longValue()
獲取當前累加值時才會真正的去結(jié)算計數(shù)的數(shù)據(jù),longValue()
方法底層就是調(diào)用sum()
方法,對base
和Cell
數(shù)組的數(shù)據(jù)累加然后返回,做到數(shù)據(jù)寫入和讀取分離。
而AtomicLong
使用incrementAndGet()
每次都會返回long
類型的計數(shù)值,每次遞增后還會伴隨著數(shù)據(jù)返回,增加了額外的開銷。
4.3 LongAdder實現(xiàn)原理
之前說了,AtomicLong
是多個線程針對單個熱點值value
進行原子操作。而LongAdder
是每個線程擁有自己的槽,各個線程一般只對自己槽中的那個值進行CAS操作。
比如有三個線程同時對value增加1,那么value = 1 + 1 + 1 = 3
但是對于LongAdder
來說,內(nèi)部有一個base
變量,一個Cell[]
數(shù)組。
base
變量:非競爭條件下,直接累加到該變量上
Cell[]
數(shù)組:競爭條件下,累加個各個線程自己的槽Cell[i]
中
最終結(jié)果的計算是下面這個形式:
4.4 ongAdder源碼剖析
前面已經(jīng)用圖分析了LongAdder
高性能的原理,我們繼續(xù)看下LongAdder
實現(xiàn)的源碼:
public class LongAdder extends Striped64 implements Serializable { public void increment() { add(1L); } public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } }
一般我們進行計數(shù)時都會使用increment()
方法,每次進行**+1操作**,increment()
會直接調(diào)用add()
方法。
變量說明:
- as 表示cells引用
- b 表示獲取的base值
- v 表示 期望值,
- m 表示 cells 數(shù)組的長度
- a 表示當前線程命中的cell單元格
條件分析:
條件一:as == null || (m = as.length - 1) < 0
此條件成立說明cells數(shù)組未初始化。如果不成立則說明cells數(shù)組已經(jīng)完成初始化,對應(yīng)的線程需要找到Cell數(shù)組中的元素去寫值。
條件二:(a = as[getProbe() & m]) == null
getProbe()
獲取當前線程的hash值,m表示cells長度-1,cells長度是2的冪次方數(shù),原因之前也講到過,與數(shù)組長度取??梢赞D(zhuǎn)化為按位與運算,提升計算性能。
當條件成立時說明當前線程通過hash計算出來數(shù)組位置處的cell為空,進一步去執(zhí)行longAccumulate()
方法。如果不成立則說明對應(yīng)的cell
不為空,下一步將要將x值通過CAS操作添加到cell中。
條件三:!(uncontended = a.cas(v = a.value, v + x)
主要看a.cas(v = a.value, v + x)
,接著條件二,說明當前線程hash與數(shù)組長度取模計算出的位置的cell有值,此時直接嘗試一次CAS操作,如果成功則退出if條件,失敗則繼續(xù)往下執(zhí)行longAccumulate()
方法。
接著往下看核心的longAccumulate()
方法,代碼很長,后面會一步步分析,先上代碼:
java.util.concurrent.atomic.Striped64.:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
代碼很長,if else
分支很多,除此看肯定會很頭疼。這里一點點分析,然后結(jié)合畫圖一步步了解其中實現(xiàn)原理。
我們首先要清楚執(zhí)行這個方法的前置條件,它們是或的關(guān)系,如上面條件一、二、三
- cells數(shù)組沒有初始化
- cells數(shù)組已經(jīng)初始化,但是當前線程對應(yīng)的cell數(shù)據(jù)為空
- cells數(shù)組已經(jīng)初始化, 當前線程對應(yīng)的cell數(shù)據(jù)為空,且CAS操作+1失敗
longAccumulate()方法的入?yún)ⅲ?/strong>
- long x 需要增加的值,一般默認都是1
- LongBinaryOperator fn 默認傳遞的是null
- wasUncontended競爭標識,如果是false則代表有競爭。只有cells初始化之后,并且當前線程CAS競爭修改失敗,才會是false
然后再看下Striped64中一些變量或者方法的定義:
- base: 類似于AtomicLong中全局的value值。在沒有競爭情況下數(shù)據(jù)直接累加到base上,或者cells擴容時,也需要將數(shù)據(jù)寫入到base上
- collide:表示擴容意向,false 一定不會擴容,true可能會擴容。
- cellsBusy:初始化cells或者擴容cells需要獲取鎖,
- 0:表示無鎖狀態(tài) 1:表示其他線程已經(jīng)持有了鎖casCellsBusy(): 通過CAS操作修改cellsBusy的值,CAS成功代表獲取鎖,返回true
- NCPU:當前計算機CPU數(shù)量,Cell數(shù)組擴容時會使用到
- getProbe(): 獲取當前線程的hash值
- advanceProbe(): 重置當前線程的hash值
接著開始正式解析longAccumulate()源碼:
private static final long PROBE; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
我們上面說過getProbe()
方法是為了獲取當前線程的hash
值,具體實現(xiàn)是通過UNSAFE.getInt()
實現(xiàn)的,PROBE
是在初始化時候獲取當前線程threadLocalRandomProbe
的值。
注:Unsafe.getInt()有三個重載方法getInt(Object o, long offset)、getInt(long address)和getIntVolatile(long address),都是從指定的位置獲取變量的值,只不過第一個的offset是相對于對象o的相對偏移量,第二個address是絕對地址偏移量。如果第一個方法中o為null是,offset也會被作為絕對偏移量。第三個則是帶有volatile語義的load讀操作。
如果當前線程的hash值h=getProbe()為0,0與任何數(shù)取模都是0,會固定到數(shù)組第一個位置,所以這里做了優(yōu)化,使用ThreadLocalRandom
為當前線程重新計算一個hash
值。最后設(shè)置wasUncontended = true
,這里含義是重新計算了當前線程的hash
后認為此次不算是一次競爭。hash值被重置就好比一個全新的線程一樣,所以設(shè)置了競爭狀態(tài)為true。
接著執(zhí)行for循環(huán)
,我們可以把for循環(huán)
代碼拆分一下,每個if條件
算作一個CASE
來分析:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) } }
如上所示,第一個if語句代表CASE1
,里面再有if判斷
會以CASE1.1
這種形式來講解,下面接著的else if
為CASE2
, 最后一個為CASE3
CASE1執(zhí)行條件:
if ((as = cells) != null && (n = as.length) > 0) { }
cells數(shù)組不為空,且數(shù)組長度大于0的情況會執(zhí)行CASE1,CASE1的實現(xiàn)細節(jié)代碼較多,放到最后面講解。
CASE2執(zhí)行條件和實現(xiàn)原理:
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
CASE2
標識cells數(shù)組還未初始化,因為判斷cells == as
,這個代表當前線程到了這里獲取的cells還是之前的一致。我們可以先看這個case
,最后再回頭看最為麻煩的CASE1
實現(xiàn)邏輯。
cellsBusy
上面說了是加鎖的狀態(tài),初始化cells數(shù)組
和擴容的時候都要獲取加鎖的狀態(tài),這個是通過CAS
來實現(xiàn)的,為0代表無鎖狀態(tài),為1代表其他線程已經(jīng)持有鎖了。cells==as
代表當前線程持有的數(shù)組未進行修改過,casCellsBusy()
通過CAS操作去獲取鎖。但是里面的if條件
又再次判斷了cell==as
,這一點是不是很奇怪?通過畫圖來說明下問題:
cells==as雙重判斷說明.png
如果上面條件都執(zhí)行成功就會執(zhí)行數(shù)組的初始化及賦值操作, Cell[] rs = new Cell[2
]表示數(shù)組的長度為2,rs[h & 1] = new Cell(x)
表示創(chuàng)建一個新的Cell元素
,value是x值,默認為1。
h & 1
類似于我們之前HashMap
或者ThreadLocal
里面經(jīng)常用到的計算散列桶index
的算法,通常都是hash & (table.len - 1)
,這里就不做過多解釋了。執(zhí)行完成后直接退出for循環(huán)
。
CASE3執(zhí)行條件和實現(xiàn)原理:
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
進入到這里說明cells正在或者已經(jīng)初始化過了,執(zhí)行caseBase()
方法,通過CAS
操作來修改base
的值,如果修改成功則跳出循環(huán),這個CASE只有在初始化Cell數(shù)組
的時候,多個線程嘗試CAS
修改cellsBusy
加鎖的時候,失敗的線程會走到這個分支,然后直接CAS修改base
數(shù)據(jù)。
CASE1 實現(xiàn)原理:
分析完了CASE2和CASE3
,我們再折頭回看一下CASE1
,進入CASE1
的前提是:cells數(shù)組
不為空,已經(jīng)完成了初始化賦值操作。
接著還是一點點往下拆分代碼,首先看第一個判斷分支CASE1.1
:
if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; }
這個if條件中(a = as[(n - 1) & h]) == null
代表當前線程對應(yīng)的數(shù)組下標位置的cell
數(shù)據(jù)為null
,代表沒有線程在此處創(chuàng)建Cell
對象。
接著判斷cellsBusy==0
,代表當前鎖未被占用。然后新創(chuàng)建Cell對象,接著又判斷了一遍cellsBusy == 0
,然后執(zhí)行casCellsBusy()
嘗試通過CAS操作修改cellsBusy=1
,加鎖成功后修改擴容意向collide = false
;
for (;;) { if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } if (created) break; continue; }
上面代碼判斷當前線程hash
后指向的數(shù)據(jù)位置元素是否為空,如果為空則將cell
數(shù)據(jù)放入數(shù)組中,跳出循環(huán)。如果不為空則繼續(xù)循環(huán)。
繼續(xù)往下看代碼,CASE1.2:
else if (!wasUncontended) wasUncontended = true; h = advanceProbe(h);
wasUncontended
表示cells
初始化后,當前線程競爭修改失敗wasUncontended =false
,這里只是重新設(shè)置了這個值為true
,緊接著執(zhí)行advanceProbe(h)
重置當前線程的hash
,重新循環(huán)。
接著看CASE1.3:
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
進入CASE1.3
說明當前線程對應(yīng)的數(shù)組中有了數(shù)據(jù),也重置過hash
值,這時通過CAS操作嘗試對當前數(shù)中的value
值進行累加x操作,x默認為1,如果CAS
成功則直接跳出循環(huán)。
接著看CASE1.4:
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;
如果cells數(shù)組
的長度達到了CPU核心數(shù)
,或者cells
擴容了,設(shè)置擴容意向collide為false
并通過下面的h = advanceProbe(h)
方法修改線程的probe再重新嘗試
至于這里為什么要提出和CPU數(shù)量
做判斷的問題:每個線程會通過線程對cells[threadHash%cells.length]
位置的Cell
對象中的value
做累加,這樣相當于將線程綁定到了cells
中的某個cell
對象上,如果超過CPU數(shù)量
的時候就不再擴容是因為CPU
的數(shù)量代表了機器處理能力,當超過CPU
數(shù)量時,多出來的cells
數(shù)組元素沒有太大作用。
接著看CASE1.5:
else if (n >= NCPU || cells != as) collide = false;
如果擴容意向collide
是false
則修改它為true
,然后重新計算當前線程的hash值繼續(xù)循環(huán),在CASE1.4
中,如果當前數(shù)組的長度已經(jīng)大于了CPU
的核數(shù),就會再次設(shè)置擴容意向collide=false
,這里的意義是保證擴容意向為false
后不再繼續(xù)往后執(zhí)行CASE1.6
的擴容操作。
接著看CASE1.6分支:
else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; }
這里面執(zhí)行的其實是擴容邏輯,首先是判斷通過CAS
改變cellsBusy
來嘗試加鎖,如果CAS
成功則代表獲取鎖成功,繼續(xù)向下執(zhí)行,判斷當前的cells
數(shù)組和最先賦值的as
是同一個,代表沒有被其他線程擴容過,然后進行擴容,擴容大小為之前的容量的兩倍,這里用的按位左移1位來操作的。
Cell[] rs = new Cell[n << 1];
到了這里,我們已經(jīng)分析完了longAccumulate()
所有的邏輯,邏輯分支挺多,仔細分析看看其實還是挺清晰的,流程圖如下:
我們再舉一些線程執(zhí)行的例子里面場景覆蓋不全,大家可以按照這種模式自己模擬場景分析代碼流程:
如有問題也請及時指出,我會第一時間更正,不勝感激!
4.5 LongAdder的sum方法
當我們最終獲取計數(shù)器值時,我們可以使用LongAdder.longValue()
方法,其內(nèi)部就是使用sum
方法來匯總數(shù)據(jù)的。
java.util.concurrent.atomic.LongAdder.sum():
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
實現(xiàn)很簡單,base +,遍歷cells數(shù)組中的值,然后累加。
4.6 AtomicLong可以棄用了嗎?
看上去LongAdder
的性能全面超越了AtomicLong
,而且阿里巴巴開發(fā)手冊也提及到 推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數(shù)),但是我們真的就可以舍棄掉LongAdder
了嗎?
當然不是,我們需要看場景來使用,如果是并發(fā)不太高的系統(tǒng),使用AtomicLong
可能會更好一些,而且內(nèi)存需求也會小一些。
我們看過sum()
方法后可以知道LongAdder
在統(tǒng)計的時候如果有并發(fā)更新,可能導(dǎo)致統(tǒng)計的數(shù)據(jù)有誤差。
而在高并發(fā)統(tǒng)計計數(shù)的場景下,才更適合使用LongAdder
。
5、總結(jié)
LongAdder中最核心的思想就是利用空間來換時間,將熱點value分散成一個Cell列表來承接并發(fā)的CAS,以此來提升性能。
LongAdder的原理及實現(xiàn)都很簡單,但其設(shè)計的思想值得我們品味和學(xué)習。
也希望大家多多關(guān)注腳本之家的其他內(nèi)容!
相關(guān)文章
關(guān)于Java的ArrayList數(shù)組自動擴容機制
這篇文章主要介紹了關(guān)于Java的ArrayList數(shù)組自動擴容機制,ArrayList底層是基于數(shù)組實現(xiàn)的,是一個動態(tài)數(shù)組,自動擴容,不是線程安全的,只能用在單線程環(huán)境下,需要的朋友可以參考下2023-05-05帶有@Transactional和@Async的循環(huán)依賴問題的解決
這篇文章主要介紹了帶有@Transactional和@Async的循環(huán)依賴問題的解決,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧2020-04-04SpringIOC?BeanDefinition的加載流程詳解
這篇文章主要為大家介紹了SpringIOC?BeanDefinition的加載流程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10IDEA設(shè)置生成帶注釋的getter和setter的圖文教程
通常我們用idea默認生成的getter和setter方法是不帶注釋的,當然,我們同樣可以設(shè)置idea像MyEclipse一樣生成帶有Javadoc的模板,具體設(shè)置方法,大家參考下本文2018-05-05java字符串數(shù)組進行大小排序的簡單實現(xiàn)
下面小編就為大家?guī)硪黄猨ava字符串數(shù)組進行大小排序的簡單實現(xiàn)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09Java將json對象轉(zhuǎn)換為map鍵值對案例詳解
這篇文章主要介紹了Java將json對象轉(zhuǎn)換為map鍵值對案例詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-09-09