Java?同步工具與組合類(lèi)的線程安全性解析
何為線程安全的類(lèi)?
一般來(lái)說(shuō),我們要設(shè)計(jì)一個(gè)線程安全的類(lèi),要從三個(gè)方面去考慮:
- 構(gòu)成狀態(tài)的所有變量。比如某個(gè)域是集合類(lèi)型,則集合元素也構(gòu)成該實(shí)例的狀態(tài)。
- 某些操作所隱含的不變性條件。
- 變量的所有權(quán),或稱(chēng)它是否會(huì)被發(fā)布。
基于條件的同步策略
不變性條件取決于類(lèi)的語(yǔ)義,比如說(shuō)計(jì)數(shù)器類(lèi)的 counter
屬性被設(shè)置為 Integer
類(lèi)型,雖然其域值在 Integer.MIN_VALUE
到 Integer.MAX_VALUE
之間,但是它的值必須非負(fù)。即:隨著計(jì)數(shù)的進(jìn)行,conuter >= 0
總是成立。
除了不變性條件之外,一些操作還需要通過(guò)后驗(yàn)條件,以此判斷狀態(tài)的更改是否有效。比如一個(gè)計(jì)數(shù)器計(jì)到 17 時(shí),它的下一個(gè)狀態(tài)只可能是 18。這實(shí)際涉及到了對(duì)原先狀態(tài)的 "讀 - 改 - 寫(xiě)" 三個(gè)連續(xù)的步驟,典型的如自增 ++
等。"無(wú)記憶性" 的狀態(tài)是不需要后驗(yàn)條件的,比如每隔一段時(shí)間測(cè)量的溫度值。
先驗(yàn)條件可能是更加關(guān)注的問(wèn)題,因?yàn)?"先判斷后執(zhí)行" 的邏輯到處存在。比如說(shuō)對(duì)一個(gè)列表執(zhí)行 remove 操作時(shí),首先需要保證列表是非空的,否則就應(yīng)該拋出異常。
在并發(fā)環(huán)境下,這些條件均可能會(huì)隨著其它線程的修改而出現(xiàn)失真。
狀態(tài)發(fā)布與所有權(quán)
在許多情況下,所有權(quán)和封裝性是相互關(guān)聯(lián)的。比如對(duì)象通過(guò) private
關(guān)鍵字封裝了它的狀態(tài),即表明實(shí)例獨(dú)占對(duì)該狀態(tài)的所有權(quán) ( 所有權(quán)意味控制權(quán) )。反之,則稱(chēng)該狀態(tài)被發(fā)布。被發(fā)布的實(shí)例狀態(tài)可能會(huì)被到處修改,因此它們?cè)诙嗑€程環(huán)境中也存在風(fēng)險(xiǎn)。
容器類(lèi)通常和元素表現(xiàn)出 "所有權(quán)" 分離的形式。比如說(shuō)一個(gè)聲明為 final
的列表,客戶端雖然無(wú)法修改其本身的引用,但可以自由地修改其元素的狀態(tài)。這些事實(shí)上被發(fā)布的元素必須被安全地共享。
這要求元素:
- 自身是事實(shí)不可變的實(shí)例。
- 線程安全的實(shí)例。
- 被鎖保護(hù)。
實(shí)例封閉
大多數(shù)對(duì)象都是組合對(duì)象,或者說(shuō)這些狀態(tài)也是對(duì)象。對(duì)組合類(lèi)的線程安全性分析大致分為兩類(lèi):
- 如果這些狀態(tài)線程不安全,那應(yīng)該如何安全地使用組合類(lèi)?
- 即使所有的狀態(tài)都線程安全,是否可以推斷組合類(lèi)也線程安全?或者說(shuō)組合類(lèi)是否還需要額外的同步策略?
對(duì)于第一個(gè)問(wèn)題,見(jiàn)下方的 Bank
代碼,它模擬了一個(gè)轉(zhuǎn)賬業(yè)務(wù):
class Bank { private Integer amount_A = 100; private Integer amount_B = 50; public synchronized void transaction(Integer amount){ var log_0 = amount_A + amount_B; amount_A += amount; amount_B -= amount; var log_1 = amount_A + amount_B; assert log_0 == log_1; } }
雖然 amount_A
和 amount_B
本身作為普通的 Integer
類(lèi)型并不是線程安全的,但是它們具備線程安全的語(yǔ)義:
- 它們是
private
成員,因此不存在被意外共享的問(wèn)題。 - 它們唯一與外界交互的
transaction()
方法被鎖保護(hù)。
也可以理解成:Bank
是為兩個(gè) Integer
狀態(tài)提供線程安全性的容器。在此處,同步策略由 synchronized
內(nèi)置鎖實(shí)現(xiàn)。
編譯器會(huì)在 synchronized 的代碼區(qū)前后安插
monitorenter
和monitorexit
字節(jié)碼表示進(jìn)入 / 退出同步代碼塊。Java 的內(nèi)置鎖也稱(chēng)之監(jiān)視器鎖,或者監(jiān)視器。
至于第二個(gè)問(wèn)題,答案是:看情況,具體地說(shuō)是分析是否存在不變性條件,在這里,它指代在轉(zhuǎn)賬過(guò)程當(dāng)中,a 和 b 兩個(gè)賬戶的余額之和應(yīng)當(dāng)不變。如果使用原子類(lèi)型保護(hù) amount_A
和 amount_B
的狀態(tài),那么是否就可以撤下 transaction()
方法上的內(nèi)置鎖了?
class UnsafeBank { private final AtomicInteger amount_A = new AtomicInteger(100); private final AtomicInteger amount_B = new AtomicInteger(50); public void transaction(Integer amount){ amount_A.set(amount_A.get() - amount); amount_B.set(amount_B.get() + amount); } }
transaction()
方法現(xiàn)在失去了鎖的保護(hù)。這樣,某線程 A 在執(zhí)行交易的過(guò)程中,另一個(gè)線程 B 也可能會(huì) "趁機(jī)" 修改 amount_B
的賬目 —— 這個(gè)時(shí)機(jī)發(fā)生在線程 A 執(zhí)行 amount_B.get()
之后,但在 amount_B.set()
之前。最終,B 線程的修改將被覆蓋而丟失,在它看來(lái),盡管兩個(gè)狀態(tài)均是原子變量,但不變性條件仍然被破壞了。
由此得到一個(gè)結(jié)論 —— 就算所有的可變狀態(tài)都是原子的,我們可能仍需要在封裝類(lèi)的層面進(jìn)一步考慮同步策略,最簡(jiǎn)單直接的就是找出封裝類(lèi)內(nèi)的所有復(fù)合操作:
- 對(duì)同一個(gè)變量 ( 反復(fù) ) 讀-改-寫(xiě)。
- 修改受某個(gè)不變性條件約束的多個(gè)變量。
正確地拓展同步策略
在大部分情況下,我們不能通過(guò)直接修改類(lèi)源碼的形式補(bǔ)充同步策略。比如,普通的 List<T>
接口不保證底下的各種實(shí)現(xiàn)是線程安全的,但我們可以通過(guò)類(lèi)似代理的方式將線程安全委托給第三方。
比如:
class ThreadSafeArrayList { private final List<Integer> list; public ThreadSafeArrayList(List<Integer> l){list = l;} // 添加新的方法 public synchronized boolean putIfAbsent(Integer a){ if(list.contains(a)) { list.add(a); return true; } return false; } // 代理 add 方法,其它略 public synchronized boolean add(Integer a) { return list.add(a); } // ... }
事實(shí)上,Java 類(lèi)庫(kù)已經(jīng)有了對(duì)應(yīng)的線程安全類(lèi)。通常,我們應(yīng)當(dāng)優(yōu)先重用這些已有的類(lèi)。在下方的代碼塊中,我們使用 Collection.synchronizedList
工廠方法創(chuàng)建一個(gè)線程安全的 list
對(duì)象,這樣似乎就只需要為新拓展的 putIfAbsent()
方法加鎖了。
class ThreadUnSafeArrayList { private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); // 添加新的方法 public synchronized boolean putIfAbsent(Integer a){ if(list.contains(a)) { list.add(a); return true; } return false; } public boolean add(Integer a){return list.add(a);} //... }
但是,上述的代碼是錯(cuò)誤的。為什么?問(wèn)題在于,我們使用了錯(cuò)誤的鎖進(jìn)行了同步。當(dāng)調(diào)用的是 add
方法時(shí),使用的是列表對(duì)象的內(nèi)置鎖;但調(diào)用 putIfAbsent
方法時(shí),我們使用的卻是 ThreadUnsafeArrayList
對(duì)象的內(nèi)置鎖。這意味著 putIfAbsent
方法對(duì)于其它的方法來(lái)說(shuō)不是原子的,因此無(wú)法確保一個(gè)線程執(zhí)行 putIfAbsent
方法時(shí),其它線程是否會(huì)通過(guò)調(diào)用其它方法修改列表。
因此,想要讓這個(gè)方法正確執(zhí)行,我們必須要在正確的地方上鎖。
class ThreadUnSafeArrayList { private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); public boolean putIfAbsent(Integer a){ synchronized (list){ if(list.contains(a)) { list.add(a); return true; } return false; } } }
同步容器
同步容器是安全的,但在某些情況下仍然需要客戶端加鎖。
常見(jiàn)的操作如:
- 迭代;
- 跳轉(zhuǎn) ( 比如,尋找下一個(gè)元素 );
- 條件運(yùn)算,如 "若沒(méi)有則 XX 操作" ( 一種常見(jiàn)的復(fù)合操作 );
復(fù)合操作不受同步容器保護(hù)
這里有兩個(gè)線程 T1,T2 分別會(huì)以不可預(yù)測(cè)的次序執(zhí)行兩個(gè)代碼塊,它們負(fù)責(zé)刪除和讀取 list
中的末尾元素。我們?cè)谶@里使用的是庫(kù)中的同步列表,因此可以確保 size()
,remove()
,get()
方法全部是原子的。但是,當(dāng)程序以 x1
,y1
,x2
,y2
的操作次序執(zhí)行時(shí),主程序最終仍然會(huì)拋出IndexOutOfBoundsException
異常。
class DemoOfConcurrentFail { public final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); { Collections.addAll(list, 1, 2, 3, 4, 5); } public static void main(String[] args) { var testList = new DemoOfConcurrentFail().list; Runnable t1 = () -> { var last = testList.size() - 1; // x1 testList.remove(last); // x2 }; Runnable t2 = () -> { var last = testList.size() -1; // y1 var r = testList.get(last); // y2 System.out.println(r); }; new Thread(t1).start(); new Thread(t2).start(); } }
究其原因,兩個(gè)線程 T1,T2 執(zhí)行的復(fù)合操作沒(méi)有受鎖保護(hù) ( 實(shí)際上就是前文銀行轉(zhuǎn)賬的例子中犯過(guò)的錯(cuò)誤 )。所以正確的做法是對(duì)復(fù)合操作整體加鎖。
比如:
var mutex = new Object(); Runnable t1 = () -> { synchronized (mutex){ var last = testList.size() - 1; // x1 testList.remove(last); // x2 } }; Runnable t2 = () -> { synchronized (mutex){ var last = testList.size() -1; // y1 var r = testList.get(last); // y2 System.out.println(r); } }; // ...
同步容器的迭代問(wèn)題
在迭代操作中,類(lèi)似的問(wèn)題也仍然存在。無(wú)論是直接的 for 循環(huán)還是 for-each 循環(huán),對(duì)容器的遍歷方式是使用 Iterator。而使用迭代器本身也是先判斷 ( hasNext ) 再讀取 ( next ) 的復(fù)合過(guò)程。Java 對(duì)同步容器的迭代處理是:假設(shè)某一個(gè)線程在迭代的過(guò)程中發(fā)現(xiàn)容器被修改過(guò)了,則立刻失敗 ( 也稱(chēng)及時(shí)失敗 ),并拋出一個(gè) ConcurrentModificationException
異常。
// 可能需要運(yùn)行多次才能拋出 ConcurrentModificationException Runnable t1 = () -> { // 刪除中間的元素 int mid = testList.size() / 2; testList.remove(mid); }; Runnable t2 = () -> { for(var item : testList){ System.out.println(item); } }; new Thread(t1).start(); new Thread(t2).start();
類(lèi)似地,想要不受打擾地迭代容器元素,我們也要在 for 循環(huán)的外面加鎖,但是可能并不是一個(gè)好的主意。假如容器的規(guī)模非常大,或者每個(gè)元素的處理時(shí)間非常長(zhǎng),那么其它等待容器執(zhí)行短作業(yè)的線程會(huì)因此陷入長(zhǎng)時(shí)間的等待,這會(huì)帶來(lái)活躍性問(wèn)題。
一個(gè)可行的方法就是實(shí)現(xiàn)讀寫(xiě)分離 —— 一旦有寫(xiě)操作,則重新拷貝一份新的容器副本,而在此期間所有讀操作則仍在原來(lái)的容器中進(jìn)行,實(shí)現(xiàn) "讀-讀共享"。當(dāng)讀操作遠(yuǎn)多于寫(xiě)操作時(shí),這種做法無(wú)疑可以大幅度地提高程序的吞吐量,見(jiàn)后文的并發(fā)容器 CopyOnWriteArrayList
。
警惕隱含迭代的操作
不僅是顯式的 for 循環(huán)會(huì)觸發(fā)迭代。比如容器的 toString
方法在底層調(diào)用 StringBuilder.append()
方法依次將每一個(gè)元素的字符串拼接起來(lái)。除此之外,包括 equals
,containsAll
,removeAll
,retainAll
,乃至將容器本身作為參數(shù)的構(gòu)造器,都隱含了對(duì)容器的迭代過(guò)程。這些間接的迭代錯(cuò)誤都有可能拋出 ConcurrentModificationException
異常。
并發(fā)容器
考慮到重量級(jí)鎖對(duì)性能的影響,Java 后續(xù)提供了各種并發(fā)容器來(lái)改進(jìn)同步容器的性能問(wèn)題。同步容器將所有操作完全串行化。當(dāng)鎖競(jìng)爭(zhēng)尤其激烈時(shí),程序的吞吐量將大大降低。因此,使用并發(fā)容器來(lái)替代同步容器,在絕大部分情況下都算是一頓 "免費(fèi)的午餐"。
ConcurrentHashMap
ConcurrentHashMap
使用了更小的封鎖粒度換取了更大程度的共享,這個(gè)封鎖機(jī)制稱(chēng)之為分段鎖 ( Lock Stripping )。簡(jiǎn)單點(diǎn)說(shuō),就是每一個(gè)桶由單獨(dú)的鎖來(lái)保護(hù),操作不同桶的兩個(gè)線程不需要相互等待。好處是,在高并發(fā)環(huán)境下,ConcurrentHashMap
帶來(lái)了更大的吞吐量,但問(wèn)題是,封鎖粒度的減小削弱了容器的一致性語(yǔ)義,或稱(chēng)弱一致性 ( Weakly Consistent )。
比如說(shuō)需要在整個(gè) Map 上計(jì)算的 size()
和 isEmpty()
方法,弱一致性會(huì)使得這些方法的計(jì)算結(jié)果是一個(gè)過(guò)期值。這考慮到是一個(gè)權(quán)衡,因?yàn)樵诓l(fā)環(huán)境下,這兩個(gè)方法的作用很小,因?yàn)槠浞祷刂悼偸遣粩嘧兓?。因此,這些操作的需求被弱化了,以換取其它更重要的性能優(yōu)化,比如 get
,put
,cotainsKey
,remove
等。
因此,除非一部分嚴(yán)謹(jǐn)?shù)臉I(yè)務(wù)無(wú)法容忍弱一致性,否則并發(fā)的 HashMap 是要比同步 HashMap 更優(yōu)的選擇。
CopyOnWriteArrayList
該工具在讀操作遠(yuǎn)多于寫(xiě)操作的場(chǎng)合下能夠提供更好的并發(fā)性能,在迭代時(shí)不需要對(duì)容器進(jìn)行加鎖或者復(fù)制。當(dāng)發(fā)生修改時(shí),該容器會(huì)創(chuàng)建并重新發(fā)布一個(gè)新的容器副本。在新副本創(chuàng)建之前,一切讀操作仍然以舊的容器為準(zhǔn),因此這不會(huì)拋出 ConcurrentModificationException
問(wèn)題。
相對(duì)的,如果頻繁調(diào)用 add
,remove
,set
等方法,則該容器的吞吐量會(huì)大大降低,因?yàn)檫@些操作需要反復(fù)調(diào)用系統(tǒng)的 copy 方法復(fù)制底層的數(shù)組 ( 這也是沒(méi)有設(shè)計(jì) "CopyOnWriteLinkedList" 的原因,因?yàn)榭截惖男蕰?huì)更低 )。同時(shí),寫(xiě)入時(shí)復(fù)制的特性使得 CopyOnWriteArrayList
是弱一致性的。
阻塞隊(duì)列 & 生產(chǎn)者 — 消費(fèi)者模式
阻塞隊(duì)列,簡(jiǎn)單地說(shuō),就是當(dāng)隊(duì)列為空時(shí),執(zhí)行 take
操作會(huì)進(jìn)入阻塞狀態(tài);當(dāng)隊(duì)列滿時(shí),執(zhí)行 put
操作也會(huì)進(jìn)入阻塞狀態(tài)。阻塞隊(duì)列也可以分有界隊(duì)列和無(wú)界隊(duì)列。無(wú)界隊(duì)列永遠(yuǎn)不會(huì)充滿,因此執(zhí)行 put
方法永遠(yuǎn)不會(huì)進(jìn)入阻塞狀態(tài)。但是,如果生產(chǎn)者的執(zhí)行效率遠(yuǎn)超過(guò)消費(fèi)者,那么無(wú)界隊(duì)列的無(wú)限擴(kuò)張最終會(huì)耗盡內(nèi)存。有界隊(duì)列則可以保證當(dāng)隊(duì)列充滿時(shí),生產(chǎn)者被 put
阻塞,通過(guò)這種方式來(lái)讓消費(fèi)者趕上工作進(jìn)度。
可以用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者 — 消費(fèi)者模式,最常見(jiàn)的生產(chǎn)者 — 消費(fèi)者模式是線程池與工作隊(duì)列的組合。這種模式將 "發(fā)布任務(wù)" 與 "領(lǐng)取任務(wù)" 解耦,最大的便捷是簡(jiǎn)化了復(fù)雜的負(fù)載管理,因?yàn)樯a(chǎn)者和消費(fèi)者的執(zhí)行速度并不總是相匹配的。同時(shí),生產(chǎn)者和消費(fèi)者的角色是相對(duì)的。比如處于流水線中游的組件,它們既作為上游的消費(fèi)者,也作為下游的生產(chǎn)者。
Java 庫(kù)已經(jīng)包含了關(guān)于阻塞隊(duì)列的多種實(shí)現(xiàn),它自身保證 put
和 take
操作是線程安全的。
LinkedBlockingQueue
和ArrayBlockingQueue
:此兩者的區(qū)別可以參考 Link 和 Array,見(jiàn):Java中ArrayBlockingQueue和LinkedBlockingQueue。兩者均為 FIFO 的隊(duì)列。PriorityBlockingQueue
:優(yōu)先級(jí)隊(duì)列,當(dāng)我們希望以一定次序處理任務(wù)時(shí),它要比 FIFO 隊(duì)列更實(shí)用。SynchronousQueue
:譯為同步阻塞隊(duì)列。這個(gè)隊(duì)列事實(shí)上沒(méi)有緩存空間,而是維護(hù)一組可用的線程。當(dāng)隊(duì)列收到消息時(shí),它可以立刻分配一個(gè)線程去處理。但是如果沒(méi)有多余的工作線程,那么調(diào)用put
或者take
會(huì)立刻陷入阻塞狀態(tài)。因此,僅當(dāng)有足夠多的消費(fèi)者,并且總是有一個(gè)消費(fèi)者準(zhǔn)備好獲取交付的工作時(shí),才適合使用同步隊(duì)列。
下方的代碼塊是由 SynchronousQueue
實(shí)現(xiàn)的簡(jiǎn)易 Demo,每個(gè)線程會(huì)搶占式消費(fèi)消息。
var chan = new SynchronousQueue<Integer>(); var worker = new Thread(()->{ while(true){ try { final var x = chan.take(); System.out.println("t1 consume: " + x); } catch (InterruptedException e) {e.printStackTrace();} } }); var worker2 = new Thread(()->{ while(true){ try { final var x = chan.take(); System.out.println("t2 consume: " + x); } catch (InterruptedException e) {e.printStackTrace();} } }); worker.start(); worker2.start(); for(var i = 0 ; i < 10; i ++) chan.put(i);
基于所有權(quán)的角度去分析,生產(chǎn)者 — 消費(fèi)者模式和阻塞隊(duì)列一起促進(jìn)了 串行的線程封閉。線程封閉對(duì)象只能由單個(gè)對(duì)象擁有,但可以通過(guò)在執(zhí)行的最后發(fā)布該對(duì)象 ( 即表示之后不會(huì)再使用它 ),以表示 "轉(zhuǎn)讓" 所有權(quán)。
阻塞隊(duì)列簡(jiǎn)化了轉(zhuǎn)移的邏輯。除此之外,還可以通過(guò) ConcurrentMap 的原子方法 remove,或者是 AtomicReference 的 compareAndSet ( 即 CAS 機(jī)制 ) 實(shí)現(xiàn)安全的串行線程封閉。
雙端隊(duì)列和工作竊取
Java 6 之后增加了新的容器類(lèi)型 —— Deque 和 BlockDeque,它們是對(duì) Queue 以及 BlockingQueue 的拓展。Deque 實(shí)現(xiàn)了再隊(duì)列頭和隊(duì)列尾的高效插入和移除,具體實(shí)現(xiàn)包括了 ArrayDeque 和 LinkedBlockingDeque。
雙端隊(duì)列適用于另一種工作模式 —— 工作竊取 ( Work Stealing )。比如,一個(gè)工作線程已經(jīng)完成清空了自己的任務(wù)隊(duì)列,它就可以從其它忙碌的工作線程的任務(wù)隊(duì)列的尾部獲取隊(duì)列。這種模式要比生產(chǎn)者 —— 消費(fèi)者具備更高的可伸縮性,因?yàn)楣ぷ骶€程不會(huì)在單個(gè)共享的任務(wù)隊(duì)列上發(fā)生競(jìng)爭(zhēng)。
工作竊取特別適合遞歸的并發(fā)問(wèn)題,即執(zhí)行一個(gè)任務(wù)時(shí)會(huì)產(chǎn)生更多的工作,比如:Web 爬蟲(chóng),GC 垃圾回收時(shí)的圖搜索算法。
阻塞和中斷方法
線程可能會(huì)被阻塞,或者是暫停執(zhí)行,原因有多種:等待 I/O 結(jié)束,等待獲得鎖,等待從 Thread.sleep
中喚醒,等待另一個(gè)線程的計(jì)算結(jié)果。被阻塞的線程必須要在這些 "外因" 被解決之后才有機(jī)會(huì)繼續(xù)執(zhí)行,即恢復(fù)到 RUNNABLE ( 也稱(chēng)就緒 ) 狀態(tài),等待被再次調(diào)度 CPU 執(zhí)行。
這段描述其實(shí)對(duì)應(yīng)了 JVM 線程的兩個(gè)狀態(tài):BLOCKING 和 WAITING。
- BLOCKING,當(dāng)線程準(zhǔn)備進(jìn)入一段新的同步代碼塊時(shí),因不能獲得鎖而等待。
- WAITING,當(dāng)線程已經(jīng)進(jìn)入同步代碼塊之后,在執(zhí)行的過(guò)程中因不滿足某些條件而暫停。這時(shí)可以調(diào)用
waiting
方法 釋放已占據(jù)的鎖。其它工作線程得以搶占此鎖并執(zhí)行,直到滿足先驗(yàn)條件為真時(shí),其它線程可以通過(guò)notifyAll
方法重新令監(jiān)視此鎖的所有 WAITING 線程再次爭(zhēng)鎖并繼續(xù)工作。wait
/notify
/notifyAll
構(gòu)成了線程之間的協(xié)商機(jī)制,見(jiàn)下面的代碼塊。
static class Status{public boolean v;} public static void main(String[] args) throws InterruptedException{ var status = new Status(); status.v = false; var mutex = new Object(); new Thread(()->{ synchronized (mutex){ System.out.println("get mutex"); // 此時(shí)檢測(cè)的狀態(tài)為 false, 進(jìn)入 WAITING 狀態(tài)。 if(!status.v) try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();} // 被喚醒后重新檢測(cè)狀態(tài)為 true。 System.out.println(status.v); } }).start(); new Thread(()->{ synchronized (mutex){ // 將狀態(tài)設(shè)置為 true,喚醒上面的線程 status.v = true; mutex.notify(); } }).start(); }
只有處于 RUNNABLE 狀態(tài)的線程才會(huì)實(shí)際獲得 CPU 使用權(quán)。
在 Java 中,一切會(huì)發(fā)生阻塞的方法都會(huì)被要求處理 InterruptedException
受檢異常。調(diào)用阻塞方法的方法也會(huì)變成阻塞方法。線程內(nèi)部有一個(gè) boolean
類(lèi)型的狀態(tài)位表示中斷,調(diào)用 interrupt
方法可以將該狀態(tài)位標(biāo)識(shí)為 true
。
但是這不意味著該線程就會(huì)立刻中斷:
- 如果該線程并沒(méi)有調(diào)用阻塞的方法并一直處于 RUNNABLE 狀態(tài),則標(biāo)記中斷不會(huì)有任何實(shí)際效果。
- 如果發(fā)起中斷時(shí)目標(biāo)線程正處于阻塞狀態(tài),則會(huì)拋出
InterruptedException
異常。
同步工具類(lèi)
Java 還提供了諸如信號(hào)量 ( Semaphore ),柵欄 ( Barrier ),以及閉鎖 ( Latch ) 作為同步工具類(lèi),它們都包含了一定的結(jié)構(gòu)性屬性:這些狀態(tài)將決定執(zhí)行同步工具類(lèi)的線程是執(zhí)行還是等待。
閉鎖
閉鎖是一種同步工具類(lèi),可以延遲線程的進(jìn)度直到閉鎖打開(kāi)。在此之前,所有的線程必須等待,而在閉鎖結(jié)束之后,這個(gè)鎖將永久保持打開(kāi)狀態(tài)。這個(gè)特性適用于 需要確保某個(gè)任務(wù)的前序任務(wù) ( 比如初始化 ) 全部完成之后才可以執(zhí)行的場(chǎng)合,見(jiàn)下方的代碼:Worker 線程等待另兩個(gè)初始化線程準(zhǔn)備就緒之后輸出 p
的結(jié)果。
// class Point{int x,y;} final var p = new Point(); final var p_latch = new CountDownLatch(2); // Worker new Thread(()->{ try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();} System.out.printf("Point(x=%d,y=%d)",p.x,p.y); }).start(); // Init x new Thread(()->{ p.x = 1; p_latch.countDown(); }).start(); // Init y new Thread(()->{ p.y = 2; p_latch.countDown(); }).start();
FutureTask 也可以拿來(lái)做閉鎖,它實(shí)現(xiàn)了 Future 的語(yǔ)義,表示一個(gè)抽象的可生成結(jié)果的計(jì)算,一般需要由線程池驅(qū)動(dòng)執(zhí)行,表示一個(gè)異步的任務(wù)。
Runnable 接口表示無(wú)返回值的計(jì)算,Callable<T> 代表有返回值的計(jì)算。
final var futurePoint = new FutureTask<>(()->new Point(1,2)); new Thread(futurePoint).start(); new Thread(()->{ try { // 在 Callable 計(jì)算出結(jié)果之前阻塞 var p = futurePoint.get(); System.out.printf("Point(x=%d,y=%d)",p.x,p.y); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }).start();
信號(hào)量
計(jì)數(shù)信號(hào)量用于控制某個(gè)資源的同時(shí)訪問(wèn)數(shù)量,通常用于配置有容量限制的資源池,或稱(chēng)有界阻塞容器。Semaphore 管理一組許可,線程在需要時(shí)首先獲取許可,并在操作結(jié)束之后歸還許可。如果許可數(shù)量被耗盡,那么線程則必須要阻塞到其它任意線程歸還許可 ( 默認(rèn)情況下遵循 Non-Fair 策略 ) 為止。特別地,當(dāng)信號(hào)量的許可數(shù)為 1 時(shí),則可認(rèn)為是不可重入的互斥鎖。
下面是一個(gè)利用信號(hào)量 + 同步容器實(shí)現(xiàn)的簡(jiǎn)易阻塞隊(duì)列:
class BoundedBlockingQueue<E>{ final private List<E> list = Collections.synchronizedList(new LinkedList<>()); final private Semaphore se; public BoundedBlockingQueue(int cap){ se = new Semaphore(cap); } public void enqueue(E e) throws InterruptedException { se.acquire(); list.add(0,e); } public E dequeue(){ final var done = list.remove(0); se.release(); return done; } @Override public String toString() { return "BoundedBlockingQueue{" + "list=" + list + '}'; } }
柵欄
柵欄 ( Barrier ) 類(lèi)似于閉鎖,同樣都會(huì)阻塞到某一個(gè)事件發(fā)生。閉鎖強(qiáng)調(diào)等待某個(gè)事件發(fā)生之后再執(zhí)行動(dòng)作,而柵欄更強(qiáng)調(diào)在某個(gè)事件發(fā)生之前等待其它線程。它可用于實(shí)現(xiàn)一些協(xié)議:"所有人在指定的時(shí)間去會(huì)議室碰頭,等到所有的人到齊之后再開(kāi)會(huì)",比如數(shù)據(jù)庫(kù)事務(wù)的兩階段提交。
Java 提供了一個(gè)名為 CyclicBarrier
的柵欄,它指定了 N 個(gè)工作線程 反復(fù)地 在柵欄位置匯集。在某線程執(zhí)行完畢之后,調(diào)用 await()
方法阻塞自身,以等待其它更慢的線程到達(dá)柵欄位置。當(dāng)設(shè)定的 N 個(gè)線程均調(diào)用 await()
之后,柵欄將打開(kāi),此時(shí)所有的線程將可以繼續(xù)向下執(zhí)行代碼,而柵欄本身的狀態(tài)會(huì)重置,以便復(fù)用 ( 因而命名為 Cyclic- )。
見(jiàn)下面的代碼,4 個(gè)線程并行執(zhí)行初始化工作 ( 以隨機(jī)時(shí)間的 sleep
模擬延遲 ),并等待所有線程初始化完畢之后同時(shí)打印信息。
final int N = 4; final var barrier = new CyclicBarrier(N); final Thread[] workers = new Thread[N]; for(var i : new Integer[]{0,1,2,3}){ var t = new Thread(()->{ try { // 模擬隨機(jī)的延時(shí) var rdm = new Random().nextInt(1000); Thread.sleep(rdm); // 在所有其它線程到達(dá)之前阻塞 barrier.await(); // 所有線程到達(dá)之后執(zhí)行,每個(gè)線程打印延時(shí)時(shí)間 System.out.printf("prepare for %d millis\n",rdm); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); workers[i] = t; t.start(); } // 等待所有的任務(wù)并行執(zhí)行完畢。 for(var worker : workers){worker.join();}
在不涉及 IO 操作和數(shù)據(jù)共享的計(jì)算問(wèn)題當(dāng)中,線程數(shù)量為 NCPU 或者 NCPU + 1 時(shí)會(huì)獲得最優(yōu)的吞吐量,更多的線程也不會(huì)帶來(lái)帶來(lái)幫助,甚至性能還會(huì)下降,因?yàn)?CPU 需要頻繁的切換上下文。
一旦線程成功地到達(dá)柵欄,則 await()
方法會(huì)其標(biāo)記為 "子線程"。CyclicBarrier
的構(gòu)造器還接受額外的 Runnable 接口做回調(diào)函數(shù),當(dāng)所有線程全部到達(dá)柵欄之后,CyclicBarrier
會(huì)從子線程當(dāng)中挑選出一個(gè)領(lǐng)導(dǎo)線程去執(zhí)行它 ( 即,每一輪通過(guò)柵欄之后,它都會(huì)被執(zhí)行且僅一次 ),我們可以在此實(shí)現(xiàn)日志記錄等操作。
final var barrier = new CyclicBarrier(N,()->{ System.out.println("all runners ready"); });
在并行任務(wù)中構(gòu)建高效的緩存
為了用簡(jiǎn)單的例子說(shuō)明問(wèn)題,我們?cè)谶@里特別強(qiáng)調(diào)并行 ( Parallel ) 任務(wù),這些任務(wù)的計(jì)算過(guò)程是純粹 ( Pure ) 的 —— 這樣的函數(shù)被稱(chēng)之純函數(shù)。無(wú)論它們何時(shí)被調(diào)用,被哪個(gè)線程調(diào)用,同樣的輸入永遠(yuǎn)得到同樣的輸出。純函數(shù)不和外部環(huán)境交互,因此自然也就不存在競(jìng)態(tài)條件。
一個(gè)非常自然的想法是使用緩存 ( 或稱(chēng)記憶機(jī)制 Memorized ) 避免重復(fù)的運(yùn)算。在純粹的映射關(guān)系中,固定的輸入總是對(duì)應(yīng)固定的輸出,因此使用 K-V 鍵值對(duì)來(lái)記憶結(jié)果再好不過(guò)了。我們基于 HashMap 給出最簡(jiǎn)單的一版實(shí)現(xiàn),然后再探討如何改進(jìn)它們。
class MapCacheV1 { private final HashMap<Integer,String> cache = new HashMap<>(); public synchronized String getResult(Integer id){ var v = cache.get(id); if (v == null){ // 設(shè)定中,這個(gè)靜態(tài)方法具有 500ms 左右的延遲。 v = PURE.slowOperation(id); cache.put(id,v); } return v; } }
盡管我們打算將 MapCache 用于無(wú)競(jìng)態(tài)條件的并行任務(wù),但 getResult()
方法仍然加上了同步鎖,因?yàn)?HashMap 本身不是線程安全的,cache
需要以安全的方式被并發(fā)訪問(wèn)。然而,這種做法無(wú)疑會(huì)使得 getResult()
方法變得十分笨重,因?yàn)樵究梢圆⑿械穆僮?nbsp;PURE.slowOperation()
也被鎖在了代碼塊內(nèi)部。
最先想到的是使用更加高效的 ConcurrentHashMap
類(lèi)取代線程不安全的 HashMap
,以獲得免費(fèi)的多線程性能提升:
class MapCacheV2 { private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>(); public String getResult(Integer id){ var v = cache.get(id); if(v == null){ v = PURE.slowOperation(id); cache.put(id,v); } return v; } }
同時(shí),我們這一次取消掉了 getResult()
上的同步鎖。這樣,多線程可以并行地執(zhí)行慢操作,只在修改 cache
時(shí)發(fā)生競(jìng)爭(zhēng)。但這個(gè)緩存仍有一些不足 —— 當(dāng)某個(gè)線程 A 在計(jì)算新值時(shí) ( 即這 500ms 之內(nèi) ),其它線程并不知道。因此,多個(gè)線程有可能會(huì)計(jì)算同一個(gè)新值,甚至導(dǎo)致其它的計(jì)算任務(wù)無(wú)法進(jìn)行。
針對(duì)這個(gè)問(wèn)題,我們?cè)僖淮翁岢龈倪M(jìn)。不妨讓 cache
保存 "計(jì)算過(guò)程",而非值。
這樣,工作線程將有三種行為:
- 緩存中沒(méi)有此計(jì)算任務(wù),注冊(cè)并執(zhí)行。
- 緩存中有此計(jì)算任務(wù),但未完畢,當(dāng)前線程阻塞 ( 將 CPU 讓給其它需要計(jì)算的線程 )。
- 緩存中有此計(jì)算任務(wù),且已計(jì)算完畢,直接返回。
回顧前文在閉鎖中提到的 FutureTask<V>
類(lèi)型,它適合用于當(dāng)前的實(shí)現(xiàn),見(jiàn)下方的代碼:
class MapCacheV3 { private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>(); public String getResult(Integer id) throws ExecutionException, InterruptedException { // 獲取一個(gè)計(jì)算任務(wù),而非值 final var task = cache.get(id); if(task == null){ final var newTask = new FutureTask<>(()-> PURE.slowOperation(id)); // cache.putIfAbsent() cache.put(id,newTask); newTask.run(); // 提交并執(zhí)行任務(wù)。 return newTask.get(); }else return task.get(); } }
MapCacheV3
的實(shí)現(xiàn)已經(jīng)近乎完美了。唯一不足的是:我們對(duì) cache
的操作仍然是 "先判斷后執(zhí)行" 的復(fù)合操作,但現(xiàn)在 getResult
并沒(méi)有同步鎖的保護(hù)。兩個(gè)線程仍然同時(shí)調(diào)用 cache.get()
并判空,并開(kāi)始執(zhí)行重復(fù)的計(jì)算。
下面的版本給出了最終的解決方案:使用 ConcurrentMap
的 putIfAbsent()
原子方法修復(fù)可能重復(fù)添加計(jì)算任務(wù)的問(wèn)題。
class MapCacheV4 { private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>(); public String getResult(Integer id) throws ExecutionException, InterruptedException { final var task = cache.get(id); if(task == null){ final var newTask = new FutureTask<>(()-> PURE.slowOperation(id)); // put 和 putIfAbsent 方法均會(huì)返回此 Key 對(duì)應(yīng)的上一個(gè)舊值 Value。 // 如果 put 的是一個(gè)新的 Key,則返回值為 null。 final var running = cache.putIfAbsent(id,newTask); if(running == null) {newTask.run();return newTask.get();} else return running.get(); }else return task.get(); } }
值得注意的是,一旦 cache
存儲(chǔ)的是計(jì)算任務(wù)而非值,那么就可能存在緩存污染的問(wèn)題。一旦某個(gè) FutureTask 的計(jì)算被取消,或者失敗,應(yīng)當(dāng)及時(shí)將它從緩存中移除以保證將來(lái)的計(jì)算成功,而不是放任其駐留在緩存內(nèi)部返回失敗的結(jié)果。
緩存思想幾乎應(yīng)用在各個(gè)地方。比如在 Web 服務(wù)中,用戶的數(shù)據(jù)往往不會(huì)總是直接來(lái)自數(shù)據(jù)庫(kù),而是 Redis 這樣的消息中間件。在實(shí)際的應(yīng)用環(huán)境下,還有更加復(fù)雜的問(wèn)題需要被考慮到,比如緩存內(nèi)容過(guò)時(shí) ( expired ),或者是定期清理緩存空間等。
到此這篇關(guān)于Java 同步工具與組合類(lèi)的線程安全性解析的文章就介紹到這了,更多相關(guān)Java 同步工具內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java字符串技巧之刪除標(biāo)點(diǎn)或最后字符的方法
這篇文章主要介紹了Java字符串技巧之刪除標(biāo)點(diǎn)或最后字符的方法,是Java入門(mén)學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-11-11Java中List使用stream流轉(zhuǎn)成map的幾種方式詳解
Stream是Java8中處理集合的關(guān)鍵抽象概念,它可以指定你希望對(duì)集合進(jìn)行的操作,可以執(zhí)行非常復(fù)雜的查找、過(guò)濾和映射數(shù)據(jù)等操作,下面這篇文章主要給大家介紹了關(guān)于Java中List使用stream流轉(zhuǎn)成map的幾種方式,需要的朋友可以參考下2023-04-04spring boot整合mybatis利用Mysql實(shí)現(xiàn)主鍵UUID的方法
這篇文章主要給大家介紹了關(guān)于spring boot整合mybatis利用Mysql實(shí)現(xiàn)主鍵UUID的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2018-03-03spring cloud consul使用ip注冊(cè)服務(wù)的方法示例
這篇文章主要介紹了spring cloud consul使用ip注冊(cè)服務(wù)的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03Mybatis查詢返回Map<String,Object>類(lèi)型的實(shí)現(xiàn)
本文主要介紹了Mybatis查詢返回Map<String,Object>類(lèi)型的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07又一波Java專(zhuān)業(yè)人士必備書(shū)籍來(lái)襲
又一波Java專(zhuān)業(yè)人士必備書(shū)籍來(lái)襲,這篇文章主要向大家推薦了Java專(zhuān)業(yè)人士必讀的書(shū),感興趣的小伙伴們不要錯(cuò)過(guò)2016-09-09