淺析從同步原語看非阻塞同步以及Java中的應(yīng)用
一、從硬件原語上理解同步(非特指Java)
同步機(jī)制是多處理機(jī)系統(tǒng)的重要組成部分,其實(shí)現(xiàn)方式除了關(guān)系到計(jì)算的正確性之外還有效率的問題。同步機(jī)制的實(shí)現(xiàn)通常是在硬件提供的同步指令的基礎(chǔ)上,在通過用戶級(jí)別軟件例程實(shí)現(xiàn)的。上面說到的樂觀策略實(shí)際上就是建立在硬件指令集的基礎(chǔ)上的(我們需要實(shí)際操作和沖突檢測(cè)是原子性的),一般有下面的常用指令:測(cè)試并設(shè)置(test_and_set)、獲取并增加(fetch_and_increment)、原子交換(Atomic_Exchange)、比較并交換(CAS)、加載連接條件存儲(chǔ)(LL/SC),下面我們會(huì)講到這些以及通過這些硬件同步原語實(shí)現(xiàn)的旋轉(zhuǎn)鎖和柵欄同步。
1.1、基本硬件原語
在多處理機(jī)中實(shí)現(xiàn)同步,所需的主要功能是一組能以原子操作讀出并修改存儲(chǔ)單元的硬件原語。如果沒有這種操作,建立基本的同步原語的代價(jià)會(huì)非常大?;居布Z有幾種形式提供選擇,他們都能以原子操作的方式讀改存儲(chǔ)單元,并指出進(jìn)行的操作是否能以原子形式進(jìn)行,這些原語作為基本構(gòu)建提供構(gòu)造各種各樣的用戶及同步操作。
一個(gè)典型的例子就是原子交換(Atomic Exchange),他的功能是將一個(gè)存儲(chǔ)單元中的值和一個(gè)寄存器的值進(jìn)行交換。我們看看這個(gè)原語怎樣構(gòu)造一個(gè)我們通常意義上說的簡(jiǎn)單的鎖。
假設(shè)現(xiàn)在我們構(gòu)造這樣一個(gè)簡(jiǎn)單的鎖:其值為0表示鎖是開的(鎖可用),為1表示上鎖(不可用)。當(dāng)處理器要給該鎖上鎖的時(shí)候,將對(duì)應(yīng)于該鎖的存儲(chǔ)單元的值與存放在某個(gè)寄存器中的1進(jìn)行交換。如果別的處理器已經(jīng)上了鎖,那么交換指令返回的值為1否則為0。返回0的時(shí)候,因?yàn)槭窃咏粨Q,鎖的值就會(huì)從0變?yōu)?表示上鎖成功;返回1,原子交換鎖的值還是1,但是返回1表示已經(jīng)被上了鎖。我們考慮使用這個(gè)鎖:假設(shè)兩個(gè)處理器同時(shí)進(jìn)行交換操作(原子交換),競(jìng)爭(zhēng)的結(jié)果就是,只有一個(gè)處理器會(huì)先執(zhí)行成功而得到返回值0,而另一個(gè)得到的返回值為1表示已經(jīng)被上鎖。從這些我們可以看出,采用原子交換指令是實(shí)現(xiàn)同步的關(guān)鍵:這個(gè)原子交換操作的不可再分的,兩個(gè)交換操作將由寫順序機(jī)制確定先后順序,這也保證了兩個(gè)線程不能同時(shí)獲取同步變量鎖。
除此之外,還有別的原語可以實(shí)現(xiàn)同步(關(guān)鍵都在于能以原子的方式讀-改-寫存儲(chǔ)單元的值)。例如:測(cè)試并置定(test_and_set)(先測(cè)試一個(gè)存儲(chǔ)單元的值,如果符合條件就修改其值),另一個(gè)同步原語是讀取并加1(fetch_and_increment))(返回存儲(chǔ)單元的值并自動(dòng)增加該值)。
那么,上面的基本原語操作又是怎樣實(shí)現(xiàn)的呢,這在一條指令中完成上述操作顯然是困難的(在一條不可中斷的指令中完成一次存儲(chǔ)器的讀改寫,而且要求不允許其他的訪存操作還要避免死鎖)?,F(xiàn)在的計(jì)算機(jī)上采用一對(duì)指令來實(shí)現(xiàn)上述的同步原語。該指令對(duì)由兩條特殊的指令組成,一條是特殊的load指令(LL指令),另一條是特殊的store指令(SC)。指令的執(zhí)行順序是:如果LL指令指明的存儲(chǔ)單元的值在SC對(duì)其進(jìn)行寫之前被其他的指令改寫過,則第二條指令執(zhí)行失敗,如果在兩條指令之間進(jìn)行切換也會(huì)導(dǎo)致執(zhí)行SC失敗,而SC指令將通過返回一個(gè)值來指出該指令操作是否成功(如果返回的1表示執(zhí)行成功,返回0表示失?。?。為什么說這對(duì)指令相當(dāng)于原子操作呢,這指的是是所有其他處理器進(jìn)行的操作或者在這對(duì)指令之前執(zhí)行或者在其后執(zhí)行,不存在兩條指令之間進(jìn)行,所以在這一對(duì)指令之間不存在任何其他處理器改變相應(yīng)存儲(chǔ)單元的值。
下面是一段實(shí)現(xiàn)對(duì)R1指出的存儲(chǔ)單元進(jìn)行的原子交換操作
try:OR R3,R4,R0 //R4中為交換值,將該值送入R3
LL R2,0(R1) //將0(R1)中的值取到R2
SC R3,0(R1) //若0(R1)中的值與R3中的值相同,則置R3的值為1,否則為0
BEQZ R3,try //R3的值為0表示存失敗,轉(zhuǎn)移重新嘗試
MOV R4,R2 //成功,將取出的值送往R4
最終R4和由R1指向的存儲(chǔ)單元值進(jìn)行了原子交換,在LL和SC之間如果有別的處理器插入并且修改了存儲(chǔ)單元的值則SC都會(huì)返回0并存入R3中從而重新執(zhí)行交換操作。下面是實(shí)現(xiàn)各個(gè)講到的讀取并加1(fetch_and_increment)原語的實(shí)現(xiàn)
try:LL R2,0(R1) //將0(R1)中的值送入R2
DADDIU R2,R2,#1 //加1操作(R2+1->R2)
SC R2,0(R1) //如果0(R1)中的值和R2中的值相同就置R2的值為1,否則為0
BEQZ R2,try //R2的值為0表示存失敗,轉(zhuǎn)移到開始出重新執(zhí)行
上面的指令的執(zhí)行需要跟蹤地址,通常LL指令指定一個(gè)寄存器,該寄存器中存放著目的存儲(chǔ)單元的地址,這個(gè)寄存器稱為連接寄存器,如果發(fā)生中斷切換或者與連接寄存器中的地址匹配的cache塊被作廢(被別的SC指令訪問),則將連接寄存器清零,SC指令則檢查它的存儲(chǔ)地址和連接寄存器匯中的內(nèi)容是夠匹配,如果匹配則SC指令繼續(xù)執(zhí)行,否則執(zhí)行失敗。
1.2、用一致性實(shí)現(xiàn)鎖
我們現(xiàn)在用上面的原子交換的同步原語實(shí)現(xiàn)自旋鎖(spin lock)(處理器不停請(qǐng)求獲得鎖的試用權(quán),圍繞該鎖反復(fù)執(zhí)行循環(huán)程序,直到獲得鎖)。自旋鎖適用于這樣的場(chǎng)景:鎖被占用時(shí)間少,在獲得鎖之后加鎖的過程延遲小。
下面我們考慮使用一種簡(jiǎn)單的方法實(shí)現(xiàn):將鎖變量保存在存儲(chǔ)器中,處理器可以不斷通過原子交換操作來請(qǐng)求其使用權(quán),比如使用原子交換操作獲得其返回值從而直達(dá)鎖變量的使用情況。釋放鎖的時(shí)候,處理器只需要將說置為0。如下面的程序:使用原子交換操作堆自旋鎖進(jìn)行加鎖,其中R1中存放的是自旋鎖變量的地址
DADDIU R2,R0,#1
lockit: EXCH R2,0(R1) //原子交換,獲得自旋鎖的值并在下面比較自旋鎖的值為1還是0,為1表示已經(jīng)上鎖
BNEZ R2,lockit //若R2的內(nèi)容不為0,則表示已經(jīng)有其他程序獲得了鎖變量,就繼續(xù)旋轉(zhuǎn)等待
下面我們對(duì)這個(gè)簡(jiǎn)單的自旋鎖實(shí)現(xiàn)進(jìn)行一些改進(jìn)(下面說到的可類比JMM內(nèi)存模型理解)如果計(jì)算機(jī)支持Cache一致性,就可以將鎖調(diào)入Cache中(類比本地內(nèi)存),并通過一致性保證使得鎖的值保持和存儲(chǔ)器中的值一致(類比內(nèi)存可見性和本地內(nèi)存主內(nèi)存的值一致同步)。這樣做有下面的好處:①使得環(huán)繞自旋鎖的線程(自旋請(qǐng)求鎖變量)只對(duì)本地Cache中的鎖(主存中的副本)進(jìn)行操作,而不用再每次請(qǐng)求占用鎖時(shí)候進(jìn)行一次全局的訪存操作(訪問主內(nèi)存存儲(chǔ)器中存放的鎖的值) ②利用訪問鎖的程序局部性原理(處理器最近使用的鎖可能不久后還會(huì)使用),這種情況就可以使得鎖駐留在對(duì)應(yīng)的Cache中,大大減少了獲得鎖所需要的時(shí)間(處于性能考慮,需要減少全局訪存操作)。
在改進(jìn)之前,我們應(yīng)該知道,在上面的簡(jiǎn)單實(shí)現(xiàn)的基礎(chǔ)上(上面的每次循環(huán)交換均需要一次寫操作,因?yàn)橛卸鄠€(gè)處理器會(huì)同時(shí)請(qǐng)求加鎖,這就會(huì)導(dǎo)致一個(gè)處理器請(qǐng)求成功后,其他處理器都會(huì)寫不命中),需要對(duì)這個(gè)程序進(jìn)行改進(jìn),使得它只對(duì)本地副本中的鎖變量進(jìn)行讀取和檢測(cè),直到發(fā)現(xiàn)鎖已經(jīng)被釋放。發(fā)現(xiàn)釋放之后,立刻去進(jìn)行交換操作跟別的處理器競(jìng)爭(zhēng)鎖變量。所有這些進(jìn)程還是以原子交換的方式獲得鎖,也只有一個(gè)進(jìn)程可以獲得成功(獲得鎖變量成功的進(jìn)程交換后看到的鎖變量值為0,交換之后的鎖變量值為1表示上鎖成功;而獲得失敗的進(jìn)程雖然也交換了鎖變量的值,但是因?yàn)榻粨Q后自己看到的鎖變量的值已經(jīng)是1,就表示自己進(jìn)程失敗了),其他的需要繼續(xù)旋轉(zhuǎn)等待。當(dāng)獲得鎖的進(jìn)程使用完之后,將鎖變量置為0表示釋放鎖由其他需要獲取的進(jìn)程去競(jìng)爭(zhēng)它(其他進(jìn)程會(huì)在自己的Cache中發(fā)現(xiàn)鎖變量的值發(fā)生變化,這是上面所說的Cache一致性)。下面是修改后的旋轉(zhuǎn)鎖程序
lockit: LD R2,0(R1) //取得鎖的值
BNEZ R2,lockit //如果鎖還沒有釋放(R2的值還是1)
DADDIU R2,R0,#1 //將R2值置為1(這里面可以這樣想:上面BNEZ執(zhí)行失敗表示R2值為0,那么這個(gè)時(shí)候就+1)
EXCH R2,0(R1) //將R2中的值和0(R1)中的鎖變量進(jìn)行原子交換
BNEZ R2,lockit //上面第一次判斷是當(dāng)前進(jìn)程首先發(fā)現(xiàn)主存中的鎖變量值發(fā)生變化;
//進(jìn)行原子交換結(jié)果判斷和上面一樣,如果狡猾后返回值為0表示成功,為1表示失敗就繼續(xù)旋轉(zhuǎn)等待獲取
1.3、使用上面的旋轉(zhuǎn)鎖實(shí)現(xiàn)我們一個(gè)同步原語——柵欄同步
首先解釋一下什么叫柵欄同步(barrier)。假設(shè)有一個(gè)類似于柵欄的東西,它會(huì)強(qiáng)制所有到達(dá)柵欄的進(jìn)程進(jìn)行等待,直到全部的進(jìn)程都到達(dá)之后釋放所有到達(dá)的進(jìn)程繼續(xù)往下執(zhí)行,從而形成同步。下面我們就通過上面說的旋轉(zhuǎn)鎖來簡(jiǎn)單模擬實(shí)現(xiàn)這樣的一個(gè)同步原語
使用兩個(gè)旋轉(zhuǎn)鎖,一個(gè)表示計(jì)數(shù)器,記錄已經(jīng)到達(dá)該柵欄的進(jìn)程數(shù);另一個(gè)用來封鎖進(jìn)程知道最后一個(gè)進(jìn)程到達(dá)該柵欄。為了實(shí)現(xiàn)柵欄,我們需要一個(gè)變量,到達(dá)并阻塞住的進(jìn)程需要在這個(gè)變量上自旋等待知道滿足它需要的條件(都到達(dá)柵欄然后才能往下執(zhí)行)。我們使用spin表示這個(gè)條件condition。如下的程序所示,其中l(wèi)ock和unlock提供基本的旋轉(zhuǎn)鎖,變量count記錄已經(jīng)到達(dá)柵欄的進(jìn)程數(shù),total表示已經(jīng)到達(dá)柵欄的進(jìn)程總數(shù),對(duì)counterlock加鎖保證了增量操作的原子性,release用來封鎖最后一個(gè)到達(dá)柵欄的進(jìn)程。spin(release==1)表示需要全部進(jìn)程都到達(dá)柵欄。
lock(counterlock); //確保更新的原子性 if(count == 0) release = 0; //第一個(gè)進(jìn)程到達(dá),這時(shí)候重置release為0表示在其值變?yōu)?之前后續(xù)到達(dá)的進(jìn)程都需要等待 count = count + 1; //記錄到達(dá)的進(jìn)程數(shù) unlock(counterlock); //釋放鎖 if(count == total) { //進(jìn)程全部到達(dá) count = 0; //重置計(jì)數(shù)器count release = 1; //將release置為1表示釋放所欲到達(dá)的進(jìn)程 } else { //進(jìn)程還沒有全部到達(dá) spin(release == 1); //已經(jīng)到達(dá)的進(jìn)程旋轉(zhuǎn)等待知道所有的進(jìn)程到達(dá)(言外之意就是release=1) }
但是上面的這種簡(jiǎn)單實(shí)現(xiàn)還是存在問題的,我們考慮下面這種可能發(fā)生的情況:當(dāng)柵欄的使用在循環(huán)當(dāng)中時(shí)候,這時(shí)候所有釋放的進(jìn)程在運(yùn)行一段時(shí)間之后還會(huì)到達(dá)柵欄,假設(shè)其中一個(gè)進(jìn)程在上次釋放的時(shí)候還沒有來得及離開柵欄,而是依舊停留在旋轉(zhuǎn)操作上(可能操作系統(tǒng)重新進(jìn)行進(jìn)程調(diào)度導(dǎo)致那個(gè)進(jìn)程沒有來得及離開柵欄)。如果第二次柵欄使用的時(shí)候,一個(gè)執(zhí)行較快的進(jìn)程到達(dá)柵欄(這個(gè)快的意思是,當(dāng)他到達(dá)柵欄之后上次那個(gè)還沒有離開柵欄的進(jìn)程還在旋轉(zhuǎn)操作上),這個(gè)快的進(jìn)程會(huì)發(fā)現(xiàn)count=0,那么他就會(huì)將release置為0,這時(shí)候就會(huì)導(dǎo)致那個(gè)還在旋轉(zhuǎn)等待的進(jìn)程發(fā)現(xiàn)release值為0,然后那就更不會(huì)再退出這個(gè)旋轉(zhuǎn)操作了,就相當(dāng)于被捆綁在柵欄上出不去(這個(gè)問題會(huì)導(dǎo)致后續(xù)的count計(jì)數(shù)少了一個(gè)進(jìn)程到達(dá),而總是小于total),那這樣的話,由于count總是小于total那不是所有到達(dá)柵欄的進(jìn)程都在spin上一直自旋了嗎。那怎么解決這個(gè)問題呢,一種方法就是在進(jìn)程離開柵欄的時(shí)候也進(jìn)行計(jì)數(shù),在上次使用柵欄的進(jìn)程全部離開柵欄之前不允許執(zhí)行快的進(jìn)程再次使用并初始化柵欄的一些變量值。還有一種方法是使用sense_reversing柵欄,即每個(gè)進(jìn)程只用一個(gè)本地私有變量local_sense并初始化為1,用它和release判斷進(jìn)程是否需要自旋等待。
二、Java中的原子性操作概述
所謂原子操作,就是指執(zhí)行一系列操作的時(shí)候,要么全部執(zhí)行要么全部不執(zhí)行,不存在只執(zhí)行一部分的情況。在設(shè)置計(jì)數(shù)器的時(shí)候一般是讀取當(dāng)前的值,然后+1在更新(讀-改-寫的過程),如果不能保證這這幾個(gè)操作的過程的原子性就可能出現(xiàn)線程安全問題,比如下面的代碼示例,++value在沒有任何額外保證的前提下不是原子操作。
public class ThreadUnSafe{ private Long value; public Long getValue() {return value;} public void increment() {++value;} }
使用Javap -c XX.class查看匯編代碼如下
這是個(gè)復(fù)合操作,是不具備原子性的。而保證這個(gè)操作原子性的方法最簡(jiǎn)單的就是加上synchronized關(guān)鍵字,使用synchronized可以實(shí)現(xiàn)線程安全性,但是這是個(gè)獨(dú)占鎖,沒有獲取內(nèi)部鎖的線程會(huì)被阻塞住(即便是這里的getValue操作,多線程訪問也會(huì)阻塞住),這對(duì)于并發(fā)性能的提高是不好的(而這里也不能簡(jiǎn)單的去掉getValue上的synchronized,因?yàn)樽x操作需要保證value的讀一致性,即需要獲得主內(nèi)存中的值而不是線程工作內(nèi)存中的可能是舊的副本值)。那么除了加鎖之外其他安全的方法?后面講到的原子類(使用CAS實(shí)現(xiàn))就可以作為一個(gè)選擇。
三、Java中的CAS操作概述
Java中提供非阻塞的volatile關(guān)鍵字解決保證共享變量的可見性問題,但是不能解決部分符合操作不具備原子性的問題(比如自增運(yùn)算)。CAS即CompareAndSwap是JDK提供的非阻塞原子操作,通過硬件保證比較更新的原子性。我們通過compareAndSwapLong來簡(jiǎn)單介紹CAS:
compareAndSwapLong(Object obj, long valueOffset, long expect, long update),該方法中compareAndSwap表示比較并交換,方法中有四個(gè)操作數(shù),其中obj表示對(duì)象內(nèi)存的位置,valueOffset表示對(duì)象中存儲(chǔ)變量的偏移量,expect表示變量的預(yù)期值,update表示更新值。操作含義就是,若果對(duì)象obj中內(nèi)存偏移量為valueOffset的變量值為expect則使用心得update值替換舊的值expect,這是處理器提供的一個(gè)原子指令。這些方法有sun.misc.Unsafe類提供。后面我們會(huì)說到Unsafe類
在此之前我們先說一下CAS操作的一個(gè)經(jīng)典的ABA問題:假如線程1 使用CAS修改初始值為A的變量X,那么線程1會(huì)首先回去當(dāng)前變量X的值(A),然后使用CAS操作嘗試修改X的值為B,如果使用CAS修改成功了,那么程序一定執(zhí)行正確了嗎?在往下的假設(shè)看,如果線程I在獲取變量X的值A(chǔ)后,在執(zhí)行CAS之前線程II使用CAS修改變量X的值為B然后由修改回了A。這時(shí)候雖然線程I執(zhí)行CAS時(shí)候X的值依舊是A但是這個(gè)A已經(jīng)不是線程I獲取時(shí)候的A了,這就是ABA問題。ABA產(chǎn)生的原因是變量的狀態(tài)值產(chǎn)生了環(huán)形轉(zhuǎn)換,即變量值從A->B,然后又從B->A。jdk中提供了帶有標(biāo)記的原子類AtomicStampedReference(時(shí)間戳原子引用)通過控制變量的版本保證CAS的正確性。如下所做的測(cè)試ABA問題以及使用AtomicStampedReference來解決這個(gè)問題
3.1、模擬ABA問題
下面的程序輸出結(jié)果會(huì)是這樣的
package test; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class TestAtomicStampedReference { static AtomicReference<Integer> atomicReference = new AtomicReference<>(1); public static void main(String[] args) { Thread t1 = new Thread(new Runnable() { @Override public void run() { atomicReference.compareAndSet(1,2); atomicReference.compareAndSet(2,1); System.out.println(Thread.currentThread() + "線程修改后的變量值" + atomicReference.get()); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { //sleep 1秒,保證線程t1完成1->2->1的模擬ABA操作 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } atomicReference.compareAndSet(1,3); System.out.println(Thread.currentThread() + "線程修改后的變量值" + atomicReference.get()); } }); t1.start(); t2.start(); } }
3.2、使用AtomicStampedReference重新實(shí)現(xiàn)
下面是運(yùn)行結(jié)果
package test; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicStampedReference; public class TestAtomicStampedReference { static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(10,1); //定義初始值和初始版本號(hào) public static void main(String[] args) { Thread t1 = new Thread(new Runnable() { @Override public void run() { //線程1獲得初始版本號(hào)并sleep1秒 int version = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread() + "當(dāng)前線程獲得的版本號(hào)" + version); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "修改變量結(jié)果true/false?:" + atomicStampedReference.compareAndSet(10,11,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1) + "修改后的結(jié)果:" + atomicStampedReference.getReference()); System.out.println(Thread.currentThread() + "修改變量結(jié)果true/false?:" + atomicStampedReference.compareAndSet(11,10,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1) + "修改后的結(jié)果:" + atomicStampedReference.getReference()); } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { //首先獲得初始版本號(hào),sleep2秒讓線程1完成10->11->10的模擬ABA操作 int version = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread() + "當(dāng)前線程獲得的版本號(hào)" + version); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "修改變量結(jié)果true/false?:" + atomicStampedReference.compareAndSet(10,20,version,atomicStampedReference.getStamp()+1) + "修改后的結(jié)果:" + atomicStampedReference.getReference()); } }); t1.start(); t2.start(); } }
四、Java中的Unsafe類
JDK中的rt.jar包中的Unsafe類提供了硬件級(jí)別的原子性操作
Unsafe類中許多方法都是native方法,他們使用JNI的方式訪問本地C++中的實(shí)現(xiàn)庫。下面我們了解一下Unsafe類提供的幾個(gè)主要的方法以及如何使用unsafe類進(jìn)行一些編程操作。
4.1、Unsafe類中的重要方法介紹
(1)public native long objectFieldOffset(Field var1):返回指定的變量在所屬類中的內(nèi)存偏移地址,該偏移地址僅僅在該Unsafe函數(shù)中訪問指定字段時(shí)候使用。如下使用Unsafe類獲取變量value在Atomic對(duì)象中的內(nèi)存偏移量
(2)public native int arrayBaseOffset(Class<?> var1):獲取數(shù)組中第一個(gè)元素的地址
(3)public native int arrayIndexScale(Class<?> var1):獲取數(shù)組中一個(gè)元素占用的字節(jié)
(4)public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5):比較對(duì)象var1中的偏移量為var2的變量的值是否與var4相同,相同則使用var6的值更新,并返回true,否則返回false。
(5)public native long getLongVolatile(Object var1, long var2):獲取對(duì)象var1中偏移量為offset的變量對(duì)應(yīng)volatile語義的值。
(6)public native void putLongVolatile(Object var1, long var2, long var4):設(shè)置var1對(duì)象中offset偏移類型為long的值為var4,支持volatile語義
(7)public native void putOrderedLong(Object var1, long var2, long var4):設(shè)置對(duì)象obj中offset偏移地址對(duì)應(yīng)的long型的field的值為value。這是一個(gè)有延遲的putLongVolatile方法,并且不保證對(duì)應(yīng)的值類型的修改對(duì)其他線程可見,只有變量在只用volatile修飾并且預(yù)計(jì)會(huì)被意外修改的時(shí)候才會(huì)使用該方法、
(8)public native void park(boolean var1, long var2):阻塞當(dāng)前線程,其中參數(shù)var1等于false且var2等于0表示一直阻塞,var2大于0表示等待指定的時(shí)間后阻塞線程會(huì)被喚醒。這個(gè)var的值是相對(duì)的,為一個(gè)增量值,也就是相當(dāng)當(dāng)前時(shí)間累加事假后當(dāng)前線程就會(huì)被喚醒。如果var1位true,并且var2大于0,則表示阻塞的線程到指定的時(shí)間點(diǎn)后就會(huì)被喚醒,這里的時(shí)間var2是個(gè)絕對(duì)時(shí)間,是某個(gè)時(shí)間點(diǎn)換算為ms后的值。
(9)public native void unpark(Object var1):?jiǎn)拘颜{(diào)用park方法之后的線程。
下面是jdk8之后新增加的,我們列出Long類型的方法
(10)getAndSetLong()方法:獲取當(dāng)前對(duì)象var1中偏移量為var2的變量volatile語義的當(dāng)前值,并設(shè)置變量volatile語義的值為var4。
首先使用getLongVolatile獲取當(dāng)前變量的值,然后使用CAS原子操作設(shè)置新的值。這里使用while是當(dāng)CAS失敗時(shí)候進(jìn)行重試。
(11)getAndAddLong()方法:獲取對(duì)象var1中偏移量為var2變量的volatile語義的值,設(shè)置變量值為原始值+var4
4.2、Unsafe類的使用
考慮編寫出下面的程序,并在自己的IDE中運(yùn)行下面的程序,觀察結(jié)果。
package test; import sun.misc.Unsafe; public class TestUnsafe { //獲取Unsafe的實(shí)例 static Unsafe unsafe = Unsafe.getUnsafe(); //記錄變量value在TestUnsafe中的偏移量 static long valueState; //變量 private volatile long value; static { try { //獲取value變量在TestUnsafe類中的偏移量 valueState = unsafe.objectFieldOffset(TestUnsafe.class.getDeclaredField("value")); } catch (Exception e) { System.out.println(e.getMessage()); } } public static void main(String[] args) { TestUnsafe testUnsafe = new TestUnsafe(); System.out.println(unsafe.compareAndSwapInt(testUnsafe,valueState,0,1)); } }
上面的程序中首先獲取Unsafe的一個(gè)實(shí)例,然后使用unsafe的objectFieldOffset方法獲取TestUnsafe類中value變量,計(jì)算在TestUnsafe類中value變量的內(nèi)存偏移地址并保存到valueState中。main中調(diào)用unsafe的compareAndSwapInt方法設(shè)置testUnsafe對(duì)象的value變量的值為1(如果是0的話)。value初始默認(rèn)是0,我們希望代碼能輸出true(即compareAndSwapInt能夠執(zhí)行成功),但是最終運(yùn)行時(shí)下面的結(jié)果
我們看到上面的異常報(bào)錯(cuò)在getUnsafe方法位置,下來我們看一看getUnsafe方法
public static Unsafe getUnsafe() { //(1)獲取調(diào)用getUnsafe類的這個(gè)Class類,按照上面的程序中的TestUnsafa類 Class var0 = Reflection.getCallerClass(); //(2)看下面的那個(gè)方法 if (!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } } /** * (3)判斷是不是啟動(dòng)類加載器加載的類,即看看是不是由BootStrapClassLoader加載的TestUnsafe.class, * 由于我們這是一個(gè)簡(jiǎn)單測(cè)試類,是由應(yīng)用程序類加載器AppClassLoader加載的,所以直接報(bào)出SecurityException異常 */ public static boolean isSystemDomainLoader(ClassLoader var0) { return var0 == null; }
由于Unsafe類rt.jar包提供的,該包下面的類都是通過Bootstrap類加載器加載的,而我們使用的main方法所在的類是由AppClassLoader加載的,所以在main方法中加載Unsafe類的時(shí)候根據(jù)雙親委派機(jī)制會(huì)委托給Bootstrap加載。那么如果想要使用Unsafe類應(yīng)該怎樣使用呢,《深入理解java虛擬機(jī)》中這一塊告訴我們可以使用反射來使用,下面我們來試一下
package test; import sun.misc.Unsafe; import java.lang.reflect.Field; public class TestUnsafe2 { static Unsafe unsafe; static long valueOffset; private volatile long value = 0; static { try { //使用反射獲取Unsafe的成員變量theUnsafe Field field = Unsafe.class.getDeclaredField("theUnsafe"); //設(shè)置為課存取 field.setAccessible(true); //設(shè)置該變量的值 unsafe = (Unsafe) field.get(null); //獲取value偏移量 valueOffset = unsafe.objectFieldOffset(TestUnsafe2.class.getDeclaredField("value")); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } public static void main(String[] args) { TestUnsafe2 test = new TestUnsafe2(); System.out.println("修改變量結(jié)果true/false?:" + unsafe.compareAndSwapInt(test,valueOffset,0,1) + "修改后的結(jié)果:" + test.value); } }
得到下面的結(jié)果:
五、JUC中原子操作類AtomicLong的原理探究
5.1、原操作類概述
JUC包中提供了很多原子操作類,這些類都是通過上面說到的非阻塞CAS算法來實(shí)現(xiàn)的,相比較使用鎖來實(shí)現(xiàn)原子性操作CAS在性能上有很大提高。由于原子操作類的原理都大致相同,所以下面分析AtomicLong類的實(shí)現(xiàn)原理來進(jìn)一步了解原子操作類。
5.2、AtomicLong的源碼
下面是AtomicLong原子類的部分源碼,其中主要包含其成員變量以及一些靜態(tài)代碼塊和構(gòu)造方法
public class AtomicLong extends Number implements java.io.Serializable { //(1)獲取Unsafe實(shí)例 private static final Unsafe unsafe = Unsafe.getUnsafe(); //(2)保存value值的偏移量 private static final long valueOffset; //(3)判斷當(dāng)前JVM是否支持Long類型的無鎖CAS static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8(); private static native boolean VMSupportsCS8(); static { try { //(4)獲取value值在AtomicLong中的偏移量 valueOffset = unsafe.objectFieldOffset (AtomicLong.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //(5)實(shí)際存的變量值value private volatile long value; //構(gòu)造方法 public AtomicLong(long initialValue) { value = initialValue; } }
在上面的部分代碼中,代碼(1)通過Unsafe.getUnsafe()方法獲取到Unsafe類的實(shí)例(AtomicLong類也是rt.jar包下面的,所以AtomicLong也是通過啟動(dòng)類加載器進(jìn)行類加載的)。(2)(4)兩處是計(jì)算并保存AtomicLong類中存儲(chǔ)的變量value的偏移量。(5)中的value被聲明為volatile的,這是為了在多線程下保證內(nèi)存的可見性,而value就是具體存放計(jì)數(shù)的變量。下面我們看看AtomicLong中的主要幾個(gè)函數(shù)
(1)遞增和遞減的源碼
//使用unsafe的方法,原子性的設(shè)置value值為原始值+1,返回值為遞增之后的值 public final long getAndIncrement() { return unsafe.getAndAddLong(this, valueOffset, 1L); } public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } //使用unsafe的方法,原子性的設(shè)置value值為原始值-1,返回值為遞減之后的值 public final long getAndDecrement() { return unsafe.getAndAddLong(this, valueOffset, -1L); } public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; }
在上面的代碼中都是通過調(diào)用Unsafe類的getAndAddLong方法來實(shí)現(xiàn)操作的,我們來看看這個(gè)方法,這個(gè)方法是個(gè)原子性操作:其中的第一個(gè)參數(shù)是AtomicLong實(shí)例的引用,第二個(gè)參數(shù)是value變量在AtomicLong中的偏移量,第三個(gè)參數(shù)是要設(shè)置為第二個(gè)變量的值。下面就是getAndAddLong方法的實(shí)現(xiàn),以及一些分析
public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { //public native long getLongVolatile(Object var1, long var2); //該方法就是獲取var1引用指向的內(nèi)存地址中偏移量為var2位置的值,然后賦給var6 var6 = this.getLongVolatile(var1, var2); /**public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); * var1:AtomicXXX類型的一個(gè)引用,指向堆內(nèi)存中的一塊地址 * var2:AtomicXXX源碼中的valueOffset,表示AtomicXXX源碼中實(shí)際存儲(chǔ)的值value在原子類型內(nèi)存中的地址偏移量 * var4:要比較的目標(biāo)值expectValue,如果從內(nèi)存指定地址處(var1和var2決定的那塊地址)的值和該值相等,則CAS成功 * var6:CAS成功后向該內(nèi)存中寫進(jìn)的新值 */ //該方法就是使用CAS的方式,比較指定內(nèi)存地址處(var1指向的內(nèi)存地址塊中偏移量為var2處)的值和上面同一塊地址處取出的var6是否相等, //相等就將var6+var4(這里可以看成var6+1)和指定內(nèi)存地址處(var2引用指向的地址塊中偏移量為var2處)的值交換,并返回true,然后就會(huì)結(jié)束循環(huán) //CAS失敗返回false,然后繼續(xù)執(zhí)行循環(huán)體內(nèi)部的代碼,直到成功(也就是自增運(yùn)算成功就會(huì)跳出循環(huán)并返回自增后的值) } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; }
(2)CompareAndSet方法
下面是compaerAndSet方法的實(shí)現(xiàn),主要還是調(diào)用unsafe類的compareAndSwapLong方法,其原理和上面分析的差不多,都是通過CAS的方式進(jìn)行比較交換值。
public final boolean compareAndSet(long expect, long update) { //public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); return unsafe.compareAndSwapLong(this, valueOffset, expect, update); }
(3)擴(kuò)展,下面是compareAndSwapInt的底層實(shí)現(xiàn),實(shí)際上是通過硬件同步原語來實(shí)現(xiàn)的CAS,下面的cmpxchg就是基于硬件原語實(shí)現(xiàn)的
UNSAFE_ENTRY(jboolean,Usafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UsafeWrapper("Usafe_CompareAndSwapInt"); oop p = JNIHasdles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x,addr,e)) == e; UNSAFE_END
(4)下面是一個(gè)例子,使用AtomicLong來進(jìn)行技術(shù)運(yùn)算
package test; import java.util.concurrent.atomic.AtomicLong; public class TestAtomic1 { //創(chuàng)建AtomicLong類型的計(jì)數(shù)器 private static AtomicLong atomicLong = new AtomicLong(); // private static Long atomicLong = 0L; //創(chuàng)建兩個(gè)數(shù)組,計(jì)算數(shù)組中的0的個(gè)數(shù) private static Integer[] arr1 = {0,1,2,3,0,5,6,0,56,0}; private static Integer[] arr2 = {10,1,2,3,0,5,6,0,56,0}; public static void main(String[] args) throws InterruptedException { //線程1統(tǒng)計(jì)arr1中0的個(gè)數(shù) Thread t1 = new Thread(new Runnable() { @Override public void run() { int size = arr1.length; for (int i = 0; i < size; i++) { if(arr1[i].intValue() == 0) { // atomicLong.getAndIncrement(); atomicLong++; } } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { int size = arr2.length; for (int i = 0; i < size; i++) { if(arr2[i].intValue() == 0) { // atomicLong.getAndIncrement(); atomicLong++; } } } }); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("兩個(gè)數(shù)組中0出現(xiàn)的次數(shù)為: " + atomicLong);//兩個(gè)數(shù)組中0出現(xiàn)的次數(shù)為: 7 } }
如果沒有使用原子類型進(jìn)行計(jì)數(shù)運(yùn)算,那么可能就是下面的結(jié)果
以上就是淺析從同步原語看非阻塞同步以及Java中的應(yīng)用的詳細(xì)內(nèi)容,更多關(guān)于同步 非阻塞同步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis學(xué)習(xí)教程(八)-Mybatis3.x與Spring4.x整合圖文詳解
這篇文章主要介紹了MyBatis學(xué)習(xí)教程(八)-Mybatis3.x與Spring4.x整合圖文詳解的相關(guān)資料,需要的朋友可以參考下2016-05-05簡(jiǎn)單了解Spring Cloud Alibaba相關(guān)知識(shí)
這篇文章主要介紹了簡(jiǎn)單了解Spring Cloud Alibaba相關(guān)知識(shí),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10Java?Stream對(duì)象并行處理方法parallel()代碼示例
在Java中Stream是一種用于處理集合數(shù)據(jù)的流式操作API,它提供了一種簡(jiǎn)潔、靈活、高效的方式來對(duì)集合進(jìn)行各種操作,下面這篇文章主要給大家介紹了關(guān)于Java?Stream對(duì)象并行處理方法parallel()的相關(guān)資料,需要的朋友可以參考下2023-11-11java中建立0-10m的消息(字符串)實(shí)現(xiàn)方法
下面小編就為大家?guī)硪黄猨ava中建立0-10m的消息(字符串)實(shí)現(xiàn)方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-05-05聊聊spring @Transactional 事務(wù)無法使用的可能原因
這篇文章主要介紹了spring @Transactional 事務(wù)無法使用的可能原因,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07細(xì)談java同步之JMM(Java Memory Model)
Java內(nèi)存模型是在硬件內(nèi)存模型上的更高層的抽象,它屏蔽了各種硬件和操作系統(tǒng)訪問的差異性,保證了Java程序在各種平臺(tái)下對(duì)內(nèi)存的訪問都能達(dá)到一致的效果。下面我們來一起學(xué)習(xí)下JMM2019-05-05