Java并發(fā)編程之原子性-Atomic的使用
線程安全
當(dāng)多個(gè)線程訪問(wèn)某個(gè)類(lèi)時(shí),不管運(yùn)行時(shí)環(huán)境采用何種調(diào)度方式或者這些進(jìn)程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)調(diào),這個(gè)類(lèi)都能表現(xiàn)出正確的行為,那么就稱這個(gè)類(lèi)時(shí)線程安全的。
線程安全主要體現(xiàn)在以下三個(gè)方面
- 原子性:提供了互斥訪問(wèn),同一時(shí)刻只能有一個(gè)線程對(duì)它進(jìn)行操作
- 可見(jiàn)性:一個(gè)線程對(duì)主內(nèi)存的修改可以及時(shí)的被其他線程觀察到
- 有序性:一個(gè)線程觀察其他線程中的指令執(zhí)行順序,由于指令重排序的存在,該觀察結(jié)果一般雜亂無(wú)序
JUC中的Atomic包詳解
Atomic包中提供了很多Atomicxxx的類(lèi):
它們都是CAS(compareAndSwap)來(lái)實(shí)現(xiàn)原子性。
先寫(xiě)一個(gè)簡(jiǎn)單示例如下:
@Slf4j public class AtomicExample1 { // 請(qǐng)求總數(shù) public static int clientTotal = 5000; // 同時(shí)并發(fā)執(zhí)行的線程數(shù) public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); } }
可以發(fā)下每次的運(yùn)行結(jié)果總是我們想要的預(yù)期結(jié)果5000。說(shuō)明該計(jì)數(shù)方法是線程安全的。
我們查看下count.incrementAndGet()方法,它的第一個(gè)參數(shù)為對(duì)象本身,第二個(gè)參數(shù)為valueOffset是用來(lái)記錄value本身在內(nèi)存的編譯地址的,這個(gè)記錄,也主要是為了在更新操作在內(nèi)存中找到value的位置,方便比較,第三個(gè)參數(shù)為常量1
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; ... 此處省略多個(gè)方法... /** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } }
AtomicInteger源碼里使用了一個(gè)Unsafe的類(lèi),它提供了一個(gè)getAndAddInt的方法,我們繼續(xù)點(diǎn)看查看它的源碼:
public final class Unsafe { private static final Unsafe theUnsafe; ....此處省略很多方法及成員變量.... public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public native int getIntVolatile(Object var1, long var2); }
可以看到這里使用了一個(gè)do while語(yǔ)句來(lái)做主體實(shí)現(xiàn)的。而在while語(yǔ)句里它的核心是調(diào)用了一個(gè)compareAndSwapInt()的方法。它是一個(gè)native方法,它是一個(gè)底層的方法,不是使用Java來(lái)實(shí)現(xiàn)的。
假設(shè)我們要執(zhí)行0+1=0的操作,下面是單線程情況下各參數(shù)的值:
更新后:
compareAndSwapInt()方法的第一個(gè)參數(shù)(var1)是當(dāng)前的對(duì)象,就是代碼示例中的count。此時(shí)它的值為0(期望值)。第二個(gè)值(var2)是傳遞的valueOffset值,它的值為12。第三個(gè)參數(shù)(var4)就為常量1。方法中的變量參數(shù)(var5)是根據(jù)參數(shù)一和參數(shù)二valueOffset,調(diào)用底層getIntVolatile方法得到的值,此時(shí)它的值為0 。compareAndSwapInt()想要達(dá)到的目標(biāo)是對(duì)于count這個(gè)對(duì)象,如果當(dāng)前的期望值var1里的value跟底層的返回的值(var5)相同的話,那么把它更新成var5+var4這個(gè)值。不同的話重新循環(huán)取期望值(var5)直至當(dāng)前值與期望值相同才做更新。compareAndSwap方法的核心也就是我們通常所說(shuō)的CAS。
Atomic包下其他的類(lèi)如AtomicLong等的實(shí)現(xiàn)原理基本與上述一樣。
這里再介紹下LongAdder這個(gè)類(lèi),通過(guò)上述的分析,我們已經(jīng)知道了AtomicLong使用CAS:在一個(gè)死循環(huán)內(nèi)不斷嘗試修改目標(biāo)值直到修改成功。如果在競(jìng)爭(zhēng)不激烈的情況下,它修改成功概率很高。反之,如果在競(jìng)爭(zhēng)激烈的情況下,修改失敗的概率會(huì)很高,它就會(huì)進(jìn)行多次的循環(huán)嘗試,因此性能會(huì)受到一些影響。
對(duì)于普通類(lèi)型的long和double變量,jvm允許將64位的讀操作或?qū)懖僮鞑鸪蓛蓚€(gè)32位的操作。LongAdder的核心思想是將熱點(diǎn)數(shù)據(jù)分離,它可以將AtomicLong內(nèi)部核心數(shù)據(jù)value分離成一個(gè)數(shù)組,每個(gè)線程訪問(wèn)時(shí)通過(guò)hash等算法映射到其中一個(gè)數(shù)字進(jìn)行計(jì)數(shù)。而最終的計(jì)數(shù)結(jié)果則為這個(gè)數(shù)組的求和累加,其中熱點(diǎn)數(shù)據(jù)value,它會(huì)被分離成多個(gè)單元的cell,每個(gè)cell獨(dú)自維護(hù)內(nèi)部的值,當(dāng)前對(duì)象的實(shí)際值由所有的cell累計(jì)合成。這樣,熱點(diǎn)就進(jìn)行了有效的分離,提高了并行度。LongAdder相當(dāng)于在AtomicLong的基礎(chǔ)上將單點(diǎn)的更新壓力分散到各個(gè)節(jié)點(diǎn)上,在低并發(fā)的時(shí)候?qū)ase的直接更新可以很好的保障跟Atomic的性能基本一致。而在高并發(fā)的時(shí)候,通過(guò)分散提高了性能。但是如果在統(tǒng)計(jì)的時(shí)候有并發(fā)更新,可能會(huì)導(dǎo)致統(tǒng)計(jì)的數(shù)據(jù)有誤差。
在實(shí)際高并發(fā)計(jì)數(shù)的時(shí)候,可以優(yōu)先使用LongAdder。在低并行度或者需要準(zhǔn)確數(shù)值的時(shí)候可以優(yōu)先使用AtomicLong,這樣反而效率更高。
下面簡(jiǎn)單的演示下Atomic包下AtomicReference簡(jiǎn)單的用法:
@Slf4j public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); count.compareAndSet(0, 1); log.info("count:{}", count.get()); } }
compareAndSet()分別傳入的是預(yù)期值跟更新值,只有當(dāng)預(yù)期值跟當(dāng)前值相等時(shí),才會(huì)將值更新為更新值;
上面的第一個(gè)方法可以將值更新為2,而第二個(gè)步中無(wú)法將值更新為1。
下面簡(jiǎn)單介紹下AtomicIntegerFieldUpdater 用法(利用原子性去更新某個(gè)類(lèi)的實(shí)例):
@Slf4j public class AtomicExample5 { private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count"); @Getter private volatile int count = 100; public static void main(String[] args) { AtomicExample5 example5 = new AtomicExample5(); if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 1, {}", example5.getCount()); } if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 2, {}", example5.getCount()); } else { log.info("update failed, {}", example5.getCount()); } } }
它可以更新某個(gè)類(lèi)中指定成員變量的值。
注意:修改的成員變量需要用volatile關(guān)鍵字來(lái)修飾,并且不能是static描述的字段。
AtomicStampReference這個(gè)類(lèi)它的核心是要解決CAS的ABA問(wèn)題(CAS操作的時(shí)候,其他線程將變量的值A(chǔ)改成了B,接著又改回了A,等線程使用期望值A(chǔ)與當(dāng)前變量進(jìn)行比較的時(shí)候,發(fā)現(xiàn)A變量沒(méi)有變,于是CAS就將A值進(jìn)行了交換操作。
實(shí)際上該值已經(jīng)被其他線程改變過(guò))。
ABA問(wèn)題的解決思路就是每次變量變更的時(shí)候,就將版本號(hào)加一。
看一下它的一個(gè)核心方法compareAndSet():
public class AtomicStampedReference<V> { private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } ... 此處省略多個(gè)方法 .... public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } }
可以看到它多了一個(gè)stamp的比較,stamp的值是由每次更新的時(shí)候進(jìn)行維護(hù)的。
再介紹下AtomicLongArray,它維護(hù)了一個(gè)數(shù)組。在該數(shù)組下,我們可以選擇性的已原子性操作更新某個(gè)索引對(duì)應(yīng)的值。
public class AtomicLongArray implements java.io.Serializable { private static final long serialVersionUID = -2308431214976778248L; private static final Unsafe unsafe = Unsafe.getUnsafe(); ...此處省略.... /** * Atomically sets the element at position {@code i} to the given value * and returns the old value. * * @param i the index * @param newValue the new value * @return the previous value */ public final long getAndSet(int i, long newValue) { return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue); } /** * Atomically sets the element at position {@code i} to the given * updated value if the current value {@code ==} the expected value. * * @param i the index * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int i, long expect, long update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); } }
最后再寫(xiě)一個(gè)AtomcBoolean的簡(jiǎn)單使用:
@Slf4j public class AtomicExample6 { private static AtomicBoolean isHappened = new AtomicBoolean(false); // 請(qǐng)求總數(shù) public static int clientTotal = 5000; // 同時(shí)并發(fā)執(zhí)行的線程數(shù) public static int threadTotal = 200; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); test(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappened:{}", isHappened.get()); } private static void test() { if (isHappened.compareAndSet(false, true)) { log.info("execute"); } } }
總結(jié)
以上就是Atomic包的基本原理及主要的使用方法。它是使用CAS來(lái)保證原子性操作,從而達(dá)到線程安全的目的。
僅為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
add方法理解ArrayList的擴(kuò)容機(jī)制
這篇文章主要為大家介紹了add方法理解ArrayList的擴(kuò)容機(jī)制示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03eclipse項(xiàng)目在IDEA中打開(kāi)并運(yùn)行的詳細(xì)圖文教程
這篇文章主要給大家介紹了關(guān)于eclipse項(xiàng)目在IDEA中打開(kāi)并運(yùn)行的詳細(xì)圖文教程,至從使用IDEA開(kāi)發(fā)工具以來(lái),不少次有使用IDEA運(yùn)行Eclipse項(xiàng)目或非Maven項(xiàng)目,所以這里給大家總結(jié)下,需要的朋友可以參考下2023-09-09