Java并發(fā)工具類LongAdder原理實(shí)例解析
LongAdder實(shí)現(xiàn)原理圖


高并發(fā)下N多線程同時(shí)去操作一個(gè)變量會造成大量線程CAS失敗,然后處于自旋狀態(tài),導(dǎo)致嚴(yán)重浪費(fèi)CPU資源,降低了并發(fā)性。既然AtomicLong性能問題是由于過多線程同時(shí)去競爭同一個(gè)變量的更新而降低的,那么如果把一個(gè)變量分解為多個(gè)變量,讓同樣多的線程去競爭多個(gè)資源。
LongAdder則是內(nèi)部維護(hù)一個(gè)Cells數(shù)組,每個(gè)Cell里面有一個(gè)初始值為0的long型變量,在同等并發(fā)量的情況下,爭奪單個(gè)變量的線程會減少,這是變相的減少了爭奪共享資源的并發(fā)量,另外多個(gè)線程在爭奪同一個(gè)原子變量時(shí)候,如果失敗并不是自旋CAS重試,而是嘗試獲取其他原子變量的鎖,最后當(dāng)獲取當(dāng)前值時(shí)候是把所有變量的值累加后再加上base的值返回的。
LongAdder維護(hù)了要給延遲初始化的原子性更新數(shù)組和一個(gè)基值變量base數(shù)組的大小保持是2的N次方大小,數(shù)組表的下標(biāo)使用每個(gè)線程的hashcode值的掩碼表示,數(shù)組里面的變量實(shí)體是Cell類型。
Cell 類型是Atomic的一個(gè)改進(jìn),用來減少緩存的爭用,對于大多數(shù)原子操作字節(jié)填充是浪費(fèi)的,因?yàn)樵硬僮鞫际菬o規(guī)律的分散在內(nèi)存中進(jìn)行的,多個(gè)原子性操作彼此之間是沒有接觸的,但是原子性數(shù)組元素彼此相鄰存放將能經(jīng)常共享緩存行,也就是偽共享。所以這在性能上是一個(gè)提升。
另外由于Cells占用內(nèi)存是相對比較大的,所以一開始并不創(chuàng)建,而是在需要時(shí)候再創(chuàng)建,也就是惰性加載,當(dāng)一開始沒有空間時(shí)候,所有的更新都是操作base變量。
java.util.concurrency.atomic.LongAdder是Java8新增的一個(gè)類,提供了原子累計(jì)值的方法。根據(jù)文檔的描述其性能要優(yōu)于AtomicLong
這里測試時(shí)基于JDK1.8進(jìn)行的,AtomicLong 從Java8開始針對x86平臺進(jìn)行了優(yōu)化,使用XADD替換了CAS操作,我們知道JUC下面提供的原子類都是基于Unsafe類實(shí)現(xiàn)的,并由Unsafe來提供CAS的能力。CAS (compare-and-swap)本質(zhì)上是由現(xiàn)代CPU在硬件級實(shí)現(xiàn)的原子指令,允許進(jìn)行無阻塞,多線程的數(shù)據(jù)操作同時(shí)兼顧了安全性以及效率。大部分情況下,CAS都能夠提供不錯(cuò)的性能,但是在高競爭的情況下開銷可能會成倍增長,具體的研究可以參考這篇文章, 我們直接看下代碼:
public class AtomicLong {
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
}
public final class Unsafe {
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
}
getAndAddLong方法會以volatile的語義去讀需要自增的域的最新值,然后通過CAS去嘗試更新,正常情況下會直接成功后返回,但是在高并發(fā)下可能會同時(shí)有很多線程同時(shí)嘗試這個(gè)過程,也就是說線程A讀到的最新值可能實(shí)際已經(jīng)過期了,因此需要在while循環(huán)中不斷的重試,造成很多不必要的開銷,而xadd的相對來說會更高效一點(diǎn),偽碼如下,最重要的是下面這段代碼是原子的,也就是說其他線程不能打斷它的執(zhí)行或者看到中間值,這條指令是在硬件級直接支持的:
function FetchAndAdd(address location, int inc) {
int value := *location
*location := value + inc
return value
}
而LongAdder的性能比上面那種還要好很多,于是就研究了一下。首先它有一個(gè)基礎(chǔ)的值base,在發(fā)生競爭的情況下,會有一個(gè)Cell數(shù)組用于將不同線程的操作離散到不同的節(jié)點(diǎn)上去(會根據(jù)需要擴(kuò)容,最大為CPU核數(shù)),sum()會將所有Cell數(shù)組中的value和base累加作為返回值。核心的思想就是將AtomicLong一個(gè)value的更新壓力分散到多個(gè)value中去,從而降低更新熱點(diǎn)。

public class LongAdder extends Striped64 implements Serializable {
//...
}
LongAdder繼承自Striped64,Striped64內(nèi)部維護(hù)了一個(gè)懶加載的數(shù)組以及一個(gè)額外的base實(shí)例域,數(shù)組的大小是2的N次方,使用每個(gè)線程Thread內(nèi)部的哈希值訪問。
abstract class Striped64 extends Number {
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
}
數(shù)組的元素是Cell類,可以看到Cell類用Contended注解修飾,這里主要是解決false sharing(偽共享的問題),不過個(gè)人認(rèn)為偽共享翻譯的不是很好,或者應(yīng)該是錯(cuò)誤的共享,比如兩個(gè)volatile變量被分配到了同一個(gè)緩存行,但是這兩個(gè)的更新在高并發(fā)下會競爭,比如線程A去更新變量a,線程B去更新變量b,但是這兩個(gè)變量被分配到了同一個(gè)緩存行,因此會造成每個(gè)線程都去爭搶緩存行的所有權(quán),例如A獲取了所有權(quán)然后執(zhí)行更新這時(shí)由于volatile的語義會造成其刷新到主存,但是由于變量b也被緩存到同一個(gè)緩存行,因此就會造成cache miss,這樣就會造成極大的性能損失,因此有一些類庫的作者,例如JUC下面的、Disruptor等都利用了插入dummy 變量的方式,使得緩存行被其獨(dú)占,比如下面這種代碼:
static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
但是這種方式畢竟不通用,例如32、64位操作系統(tǒng)的緩存行大小不一樣,因此JAVA8中就增加了一個(gè)注@sun.misc.Contended解用于解決這個(gè)問題,由JVM去插入這些變量,具體可以參考o(jì)penjdk.java.net/jeps/142 ,但是通常來說對象是不規(guī)則的分配到內(nèi)存中的,但是數(shù)組由于是連續(xù)的內(nèi)存,因此可能會共享緩存行,因此這里加一個(gè)Contended注解以防cells數(shù)組發(fā)生偽共享的情況。
/**
* 底競爭下直接更新base,類似AtomicLong
* 高并發(fā)下,會將每個(gè)線程的操作hash到不同的
* cells數(shù)組中,從而將AtomicLong中更新
* 一個(gè)value的行為優(yōu)化之后,分散到多個(gè)value中
* 從而降低更新熱點(diǎn),而需要得到當(dāng)前值的時(shí)候,直接
* 將所有cell中的value與base相加即可,但是跟
* AtomicLong(compare and change -> xadd)的CAS不同,
* incrementAndGet操作及其變種
* 可以返回更新后的值,而LongAdder返回的是void
*/
public class LongAdder {
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* 如果是第一次執(zhí)行,則直接case操作base
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/**
* as數(shù)組為空(null或者size為0)
* 或者當(dāng)前線程取模as數(shù)組大小為空
* 或者cas更新Cell失敗
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
public long sum() {
//通過累加base與cells數(shù)組中的value從而獲得sum
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
}
/**
* openjdk.java.net/jeps/142
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
abstract class Striped64 extends Number {
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
/**
* 若getProbe為0,說明需要初始化
*/
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
/**
* 失敗重試
*/
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
/**
* 若as數(shù)組已經(jīng)初始化,(n-1) & h 即為取模操作,相對 % 效率要更高
*/
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {//這里casCellsBusy的作用其實(shí)就是一個(gè)spin lock
//可能會有多個(gè)線程執(zhí)行了`Cell r = new Cell(x);`,
//因此這里進(jìn)行cas操作,避免線程安全的問題,同時(shí)前面在判斷一次
//避免正在初始化的時(shí)其他線程再進(jìn)行額外的cas操作
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
//重新檢查一下是否已經(jīng)創(chuàng)建成功了
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot 現(xiàn)在是非空了,continue到下次循環(huán)重試
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;//若cas更新成功則跳出循環(huán),否則繼續(xù)重試
else if (n >= NCPU || cells != as) // 最大只能擴(kuò)容到CPU數(shù)目, 或者是已經(jīng)擴(kuò)容成功,這里只有的本地引用as已經(jīng)過期了
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // 擴(kuò)容
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重新計(jì)算hash(異或)從而嘗試找到下一個(gè)空的slot
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
/**
* 默認(rèn)size為2
*/
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x : // 若已經(jīng)有另一個(gè)線程在初始化,那么嘗試直接更新base
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
static final int getProbe() {
/**
* 通過Unsafe獲取Thread中threadLocalRandomProbe的值
*/
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
//返回Field在內(nèi)存中相對于對象內(nèi)存地址的偏移量
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
}
由于Cell相對來說比較占內(nèi)存,因此這里采用懶加載的方式,在無競爭的情況下直接更新base域,在第一次發(fā)生競爭的時(shí)候(CAS失敗)就會創(chuàng)建一個(gè)大小為2的cells數(shù)組,每次擴(kuò)容都是加倍,只到達(dá)到CPU核數(shù)。同時(shí)我們知道擴(kuò)容數(shù)組等行為需要只能有一個(gè)線程同時(shí)執(zhí)行,因此需要一個(gè)鎖,這里通過CAS更新cellsBusy來實(shí)現(xiàn)一個(gè)簡單的spin lock。
數(shù)組訪問索引是通過Thread里的threadLocalRandomProbe域取模實(shí)現(xiàn)的,這個(gè)域是ThreadLocalRandom更新的,cells的數(shù)組大小被限制為CPU的核數(shù),因?yàn)榧词褂谐^核數(shù)個(gè)線程去更新,但是每個(gè)線程也只會和一個(gè)CPU綁定,更新的時(shí)候頂多會有cpu核數(shù)個(gè)線程,因此我們只需要通過hash將不同線程的更新行為離散到不同的slot即可。
我們知道線程、線程池會被關(guān)閉或銷毀,這個(gè)時(shí)候可能這個(gè)線程之前占用的slot就會變成沒人用的,但我們也不能清除掉,因?yàn)橐话鉾eb應(yīng)用都是長時(shí)間運(yùn)行的,線程通常也會動態(tài)創(chuàng)建、銷毀,很可能一段時(shí)間后又會被其他線程占用,而對于短時(shí)間運(yùn)行的,例如單元測試,清除掉有啥意義呢?
總結(jié)
總的來說,LongAdder從性能上來說要遠(yuǎn)遠(yuǎn)好于AtomicLong,一般情況下是可以直接替代AtomicLong使用的,Netty也通過一個(gè)接口封裝了這兩個(gè)類,在Java8下直接采用LongAdder,但是AtomicLong的一系列方法不僅僅可以自增,還可以獲取更新后的值,如果是例如獲取一個(gè)全局唯一的ID還是采用AtomicLong會方便一點(diǎn)。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
如何使用IDEA開發(fā)Spark SQL程序(一文搞懂)
Spark SQL 是一個(gè)用來處理結(jié)構(gòu)化數(shù)據(jù)的spark組件。它提供了一個(gè)叫做DataFrames的可編程抽象數(shù)據(jù)模型,并且可被視為一個(gè)分布式的SQL查詢引擎。這篇文章主要介紹了如何使用IDEA開發(fā)Spark SQL程序(一文搞懂),需要的朋友可以參考下2021-08-08
springboot 配置文件配置項(xiàng)前綴為0的數(shù)字特殊處理方式
這篇文章主要介紹了springboot 配置文件配置項(xiàng)前綴為0的數(shù)字特殊處理方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
使用springboot+druid雙數(shù)據(jù)源動態(tài)配置操作
這篇文章主要介紹了使用springboot+druid雙數(shù)據(jù)源動態(tài)配置的操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Java中的三種標(biāo)準(zhǔn)注解和四種元注解說明
這篇文章主要介紹了Java中的三種標(biāo)準(zhǔn)注解和四種元注解說明,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
MyBatis查詢數(shù)據(jù),賦值給List集合時(shí),數(shù)據(jù)缺少的問題及解決
這篇文章主要介紹了MyBatis查詢數(shù)據(jù),賦值給List集合時(shí),數(shù)據(jù)缺少的問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
Java實(shí)現(xiàn)FTP文件上傳下載功能的詳細(xì)指南
本文將詳細(xì)解釋如何用Java實(shí)現(xiàn)FTP協(xié)議下的文件上傳和下載功能,涵蓋連接設(shè)置、文件操作以及異常處理等方面,介紹了 java.net 和 org.apache.commons.net.ftp 庫,以及如何使用這些庫提供的工具和方法進(jìn)行文件傳輸,需要的朋友可以參考下2025-07-07

