Java?同步工具與組合類的線程安全性解析
何為線程安全的類?
一般來說,我們要設計一個線程安全的類,要從三個方面去考慮:
- 構成狀態(tài)的所有變量。比如某個域是集合類型,則集合元素也構成該實例的狀態(tài)。
- 某些操作所隱含的不變性條件。
- 變量的所有權,或稱它是否會被發(fā)布。
基于條件的同步策略
不變性條件取決于類的語義,比如說計數器類的 counter
屬性被設置為 Integer
類型,雖然其域值在 Integer.MIN_VALUE
到 Integer.MAX_VALUE
之間,但是它的值必須非負。即:隨著計數的進行,conuter >= 0
總是成立。
除了不變性條件之外,一些操作還需要通過后驗條件,以此判斷狀態(tài)的更改是否有效。比如一個計數器計到 17 時,它的下一個狀態(tài)只可能是 18。這實際涉及到了對原先狀態(tài)的 "讀 - 改 - 寫" 三個連續(xù)的步驟,典型的如自增 ++
等。"無記憶性" 的狀態(tài)是不需要后驗條件的,比如每隔一段時間測量的溫度值。
先驗條件可能是更加關注的問題,因為 "先判斷后執(zhí)行" 的邏輯到處存在。比如說對一個列表執(zhí)行 remove 操作時,首先需要保證列表是非空的,否則就應該拋出異常。
在并發(fā)環(huán)境下,這些條件均可能會隨著其它線程的修改而出現失真。
狀態(tài)發(fā)布與所有權
在許多情況下,所有權和封裝性是相互關聯的。比如對象通過 private
關鍵字封裝了它的狀態(tài),即表明實例獨占對該狀態(tài)的所有權 ( 所有權意味控制權 )。反之,則稱該狀態(tài)被發(fā)布。被發(fā)布的實例狀態(tài)可能會被到處修改,因此它們在多線程環(huán)境中也存在風險。
容器類通常和元素表現出 "所有權" 分離的形式。比如說一個聲明為 final
的列表,客戶端雖然無法修改其本身的引用,但可以自由地修改其元素的狀態(tài)。這些事實上被發(fā)布的元素必須被安全地共享。
這要求元素:
- 自身是事實不可變的實例。
- 線程安全的實例。
- 被鎖保護。
實例封閉
大多數對象都是組合對象,或者說這些狀態(tài)也是對象。對組合類的線程安全性分析大致分為兩類:
- 如果這些狀態(tài)線程不安全,那應該如何安全地使用組合類?
- 即使所有的狀態(tài)都線程安全,是否可以推斷組合類也線程安全?或者說組合類是否還需要額外的同步策略?
對于第一個問題,見下方的 Bank
代碼,它模擬了一個轉賬業(yè)務:
class Bank { private Integer amount_A = 100; private Integer amount_B = 50; public synchronized void transaction(Integer amount){ var log_0 = amount_A + amount_B; amount_A += amount; amount_B -= amount; var log_1 = amount_A + amount_B; assert log_0 == log_1; } }
雖然 amount_A
和 amount_B
本身作為普通的 Integer
類型并不是線程安全的,但是它們具備線程安全的語義:
- 它們是
private
成員,因此不存在被意外共享的問題。 - 它們唯一與外界交互的
transaction()
方法被鎖保護。
也可以理解成:Bank
是為兩個 Integer
狀態(tài)提供線程安全性的容器。在此處,同步策略由 synchronized
內置鎖實現。
編譯器會在 synchronized 的代碼區(qū)前后安插
monitorenter
和monitorexit
字節(jié)碼表示進入 / 退出同步代碼塊。Java 的內置鎖也稱之監(jiān)視器鎖,或者監(jiān)視器。
至于第二個問題,答案是:看情況,具體地說是分析是否存在不變性條件,在這里,它指代在轉賬過程當中,a 和 b 兩個賬戶的余額之和應當不變。如果使用原子類型保護 amount_A
和 amount_B
的狀態(tài),那么是否就可以撤下 transaction()
方法上的內置鎖了?
class UnsafeBank { private final AtomicInteger amount_A = new AtomicInteger(100); private final AtomicInteger amount_B = new AtomicInteger(50); public void transaction(Integer amount){ amount_A.set(amount_A.get() - amount); amount_B.set(amount_B.get() + amount); } }
transaction()
方法現在失去了鎖的保護。這樣,某線程 A 在執(zhí)行交易的過程中,另一個線程 B 也可能會 "趁機" 修改 amount_B
的賬目 —— 這個時機發(fā)生在線程 A 執(zhí)行 amount_B.get()
之后,但在 amount_B.set()
之前。最終,B 線程的修改將被覆蓋而丟失,在它看來,盡管兩個狀態(tài)均是原子變量,但不變性條件仍然被破壞了。
由此得到一個結論 —— 就算所有的可變狀態(tài)都是原子的,我們可能仍需要在封裝類的層面進一步考慮同步策略,最簡單直接的就是找出封裝類內的所有復合操作:
- 對同一個變量 ( 反復 ) 讀-改-寫。
- 修改受某個不變性條件約束的多個變量。
正確地拓展同步策略
在大部分情況下,我們不能通過直接修改類源碼的形式補充同步策略。比如,普通的 List<T>
接口不保證底下的各種實現是線程安全的,但我們可以通過類似代理的方式將線程安全委托給第三方。
比如:
class ThreadSafeArrayList { private final List<Integer> list; public ThreadSafeArrayList(List<Integer> l){list = l;} // 添加新的方法 public synchronized boolean putIfAbsent(Integer a){ if(list.contains(a)) { list.add(a); return true; } return false; } // 代理 add 方法,其它略 public synchronized boolean add(Integer a) { return list.add(a); } // ... }
事實上,Java 類庫已經有了對應的線程安全類。通常,我們應當優(yōu)先重用這些已有的類。在下方的代碼塊中,我們使用 Collection.synchronizedList
工廠方法創(chuàng)建一個線程安全的 list
對象,這樣似乎就只需要為新拓展的 putIfAbsent()
方法加鎖了。
class ThreadUnSafeArrayList { private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); // 添加新的方法 public synchronized boolean putIfAbsent(Integer a){ if(list.contains(a)) { list.add(a); return true; } return false; } public boolean add(Integer a){return list.add(a);} //... }
但是,上述的代碼是錯誤的。為什么?問題在于,我們使用了錯誤的鎖進行了同步。當調用的是 add
方法時,使用的是列表對象的內置鎖;但調用 putIfAbsent
方法時,我們使用的卻是 ThreadUnsafeArrayList
對象的內置鎖。這意味著 putIfAbsent
方法對于其它的方法來說不是原子的,因此無法確保一個線程執(zhí)行 putIfAbsent
方法時,其它線程是否會通過調用其它方法修改列表。
因此,想要讓這個方法正確執(zhí)行,我們必須要在正確的地方上鎖。
class ThreadUnSafeArrayList { private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); public boolean putIfAbsent(Integer a){ synchronized (list){ if(list.contains(a)) { list.add(a); return true; } return false; } } }
同步容器
同步容器是安全的,但在某些情況下仍然需要客戶端加鎖。
常見的操作如:
- 迭代;
- 跳轉 ( 比如,尋找下一個元素 );
- 條件運算,如 "若沒有則 XX 操作" ( 一種常見的復合操作 );
復合操作不受同步容器保護
這里有兩個線程 T1,T2 分別會以不可預測的次序執(zhí)行兩個代碼塊,它們負責刪除和讀取 list
中的末尾元素。我們在這里使用的是庫中的同步列表,因此可以確保 size()
,remove()
,get()
方法全部是原子的。但是,當程序以 x1
,y1
,x2
,y2
的操作次序執(zhí)行時,主程序最終仍然會拋出IndexOutOfBoundsException
異常。
class DemoOfConcurrentFail { public final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); { Collections.addAll(list, 1, 2, 3, 4, 5); } public static void main(String[] args) { var testList = new DemoOfConcurrentFail().list; Runnable t1 = () -> { var last = testList.size() - 1; // x1 testList.remove(last); // x2 }; Runnable t2 = () -> { var last = testList.size() -1; // y1 var r = testList.get(last); // y2 System.out.println(r); }; new Thread(t1).start(); new Thread(t2).start(); } }
究其原因,兩個線程 T1,T2 執(zhí)行的復合操作沒有受鎖保護 ( 實際上就是前文銀行轉賬的例子中犯過的錯誤 )。所以正確的做法是對復合操作整體加鎖。
比如:
var mutex = new Object(); Runnable t1 = () -> { synchronized (mutex){ var last = testList.size() - 1; // x1 testList.remove(last); // x2 } }; Runnable t2 = () -> { synchronized (mutex){ var last = testList.size() -1; // y1 var r = testList.get(last); // y2 System.out.println(r); } }; // ...
同步容器的迭代問題
在迭代操作中,類似的問題也仍然存在。無論是直接的 for 循環(huán)還是 for-each 循環(huán),對容器的遍歷方式是使用 Iterator。而使用迭代器本身也是先判斷 ( hasNext ) 再讀取 ( next ) 的復合過程。Java 對同步容器的迭代處理是:假設某一個線程在迭代的過程中發(fā)現容器被修改過了,則立刻失敗 ( 也稱及時失敗 ),并拋出一個 ConcurrentModificationException
異常。
// 可能需要運行多次才能拋出 ConcurrentModificationException Runnable t1 = () -> { // 刪除中間的元素 int mid = testList.size() / 2; testList.remove(mid); }; Runnable t2 = () -> { for(var item : testList){ System.out.println(item); } }; new Thread(t1).start(); new Thread(t2).start();
類似地,想要不受打擾地迭代容器元素,我們也要在 for 循環(huán)的外面加鎖,但是可能并不是一個好的主意。假如容器的規(guī)模非常大,或者每個元素的處理時間非常長,那么其它等待容器執(zhí)行短作業(yè)的線程會因此陷入長時間的等待,這會帶來活躍性問題。
一個可行的方法就是實現讀寫分離 —— 一旦有寫操作,則重新拷貝一份新的容器副本,而在此期間所有讀操作則仍在原來的容器中進行,實現 "讀-讀共享"。當讀操作遠多于寫操作時,這種做法無疑可以大幅度地提高程序的吞吐量,見后文的并發(fā)容器 CopyOnWriteArrayList
。
警惕隱含迭代的操作
不僅是顯式的 for 循環(huán)會觸發(fā)迭代。比如容器的 toString
方法在底層調用 StringBuilder.append()
方法依次將每一個元素的字符串拼接起來。除此之外,包括 equals
,containsAll
,removeAll
,retainAll
,乃至將容器本身作為參數的構造器,都隱含了對容器的迭代過程。這些間接的迭代錯誤都有可能拋出 ConcurrentModificationException
異常。
并發(fā)容器
考慮到重量級鎖對性能的影響,Java 后續(xù)提供了各種并發(fā)容器來改進同步容器的性能問題。同步容器將所有操作完全串行化。當鎖競爭尤其激烈時,程序的吞吐量將大大降低。因此,使用并發(fā)容器來替代同步容器,在絕大部分情況下都算是一頓 "免費的午餐"。
ConcurrentHashMap
ConcurrentHashMap
使用了更小的封鎖粒度換取了更大程度的共享,這個封鎖機制稱之為分段鎖 ( Lock Stripping )。簡單點說,就是每一個桶由單獨的鎖來保護,操作不同桶的兩個線程不需要相互等待。好處是,在高并發(fā)環(huán)境下,ConcurrentHashMap
帶來了更大的吞吐量,但問題是,封鎖粒度的減小削弱了容器的一致性語義,或稱弱一致性 ( Weakly Consistent )。
比如說需要在整個 Map 上計算的 size()
和 isEmpty()
方法,弱一致性會使得這些方法的計算結果是一個過期值。這考慮到是一個權衡,因為在并發(fā)環(huán)境下,這兩個方法的作用很小,因為其返回值總是不斷變化的。因此,這些操作的需求被弱化了,以換取其它更重要的性能優(yōu)化,比如 get
,put
,cotainsKey
,remove
等。
因此,除非一部分嚴謹的業(yè)務無法容忍弱一致性,否則并發(fā)的 HashMap 是要比同步 HashMap 更優(yōu)的選擇。
CopyOnWriteArrayList
該工具在讀操作遠多于寫操作的場合下能夠提供更好的并發(fā)性能,在迭代時不需要對容器進行加鎖或者復制。當發(fā)生修改時,該容器會創(chuàng)建并重新發(fā)布一個新的容器副本。在新副本創(chuàng)建之前,一切讀操作仍然以舊的容器為準,因此這不會拋出 ConcurrentModificationException
問題。
相對的,如果頻繁調用 add
,remove
,set
等方法,則該容器的吞吐量會大大降低,因為這些操作需要反復調用系統(tǒng)的 copy 方法復制底層的數組 ( 這也是沒有設計 "CopyOnWriteLinkedList" 的原因,因為拷貝的效率會更低 )。同時,寫入時復制的特性使得 CopyOnWriteArrayList
是弱一致性的。
阻塞隊列 & 生產者 — 消費者模式
阻塞隊列,簡單地說,就是當隊列為空時,執(zhí)行 take
操作會進入阻塞狀態(tài);當隊列滿時,執(zhí)行 put
操作也會進入阻塞狀態(tài)。阻塞隊列也可以分有界隊列和無界隊列。無界隊列永遠不會充滿,因此執(zhí)行 put
方法永遠不會進入阻塞狀態(tài)。但是,如果生產者的執(zhí)行效率遠超過消費者,那么無界隊列的無限擴張最終會耗盡內存。有界隊列則可以保證當隊列充滿時,生產者被 put
阻塞,通過這種方式來讓消費者趕上工作進度。
可以用阻塞隊列實現生產者 — 消費者模式,最常見的生產者 — 消費者模式是線程池與工作隊列的組合。這種模式將 "發(fā)布任務" 與 "領取任務" 解耦,最大的便捷是簡化了復雜的負載管理,因為生產者和消費者的執(zhí)行速度并不總是相匹配的。同時,生產者和消費者的角色是相對的。比如處于流水線中游的組件,它們既作為上游的消費者,也作為下游的生產者。
Java 庫已經包含了關于阻塞隊列的多種實現,它自身保證 put
和 take
操作是線程安全的。
LinkedBlockingQueue
和ArrayBlockingQueue
:此兩者的區(qū)別可以參考 Link 和 Array,見:Java中ArrayBlockingQueue和LinkedBlockingQueue。兩者均為 FIFO 的隊列。PriorityBlockingQueue
:優(yōu)先級隊列,當我們希望以一定次序處理任務時,它要比 FIFO 隊列更實用。SynchronousQueue
:譯為同步阻塞隊列。這個隊列事實上沒有緩存空間,而是維護一組可用的線程。當隊列收到消息時,它可以立刻分配一個線程去處理。但是如果沒有多余的工作線程,那么調用put
或者take
會立刻陷入阻塞狀態(tài)。因此,僅當有足夠多的消費者,并且總是有一個消費者準備好獲取交付的工作時,才適合使用同步隊列。
下方的代碼塊是由 SynchronousQueue
實現的簡易 Demo,每個線程會搶占式消費消息。
var chan = new SynchronousQueue<Integer>(); var worker = new Thread(()->{ while(true){ try { final var x = chan.take(); System.out.println("t1 consume: " + x); } catch (InterruptedException e) {e.printStackTrace();} } }); var worker2 = new Thread(()->{ while(true){ try { final var x = chan.take(); System.out.println("t2 consume: " + x); } catch (InterruptedException e) {e.printStackTrace();} } }); worker.start(); worker2.start(); for(var i = 0 ; i < 10; i ++) chan.put(i);
基于所有權的角度去分析,生產者 — 消費者模式和阻塞隊列一起促進了 串行的線程封閉。線程封閉對象只能由單個對象擁有,但可以通過在執(zhí)行的最后發(fā)布該對象 ( 即表示之后不會再使用它 ),以表示 "轉讓" 所有權。
阻塞隊列簡化了轉移的邏輯。除此之外,還可以通過 ConcurrentMap 的原子方法 remove,或者是 AtomicReference 的 compareAndSet ( 即 CAS 機制 ) 實現安全的串行線程封閉。
雙端隊列和工作竊取
Java 6 之后增加了新的容器類型 —— Deque 和 BlockDeque,它們是對 Queue 以及 BlockingQueue 的拓展。Deque 實現了再隊列頭和隊列尾的高效插入和移除,具體實現包括了 ArrayDeque 和 LinkedBlockingDeque。
雙端隊列適用于另一種工作模式 —— 工作竊取 ( Work Stealing )。比如,一個工作線程已經完成清空了自己的任務隊列,它就可以從其它忙碌的工作線程的任務隊列的尾部獲取隊列。這種模式要比生產者 —— 消費者具備更高的可伸縮性,因為工作線程不會在單個共享的任務隊列上發(fā)生競爭。
工作竊取特別適合遞歸的并發(fā)問題,即執(zhí)行一個任務時會產生更多的工作,比如:Web 爬蟲,GC 垃圾回收時的圖搜索算法。
阻塞和中斷方法
線程可能會被阻塞,或者是暫停執(zhí)行,原因有多種:等待 I/O 結束,等待獲得鎖,等待從 Thread.sleep
中喚醒,等待另一個線程的計算結果。被阻塞的線程必須要在這些 "外因" 被解決之后才有機會繼續(xù)執(zhí)行,即恢復到 RUNNABLE ( 也稱就緒 ) 狀態(tài),等待被再次調度 CPU 執(zhí)行。
這段描述其實對應了 JVM 線程的兩個狀態(tài):BLOCKING 和 WAITING。
- BLOCKING,當線程準備進入一段新的同步代碼塊時,因不能獲得鎖而等待。
- WAITING,當線程已經進入同步代碼塊之后,在執(zhí)行的過程中因不滿足某些條件而暫停。這時可以調用
waiting
方法 釋放已占據的鎖。其它工作線程得以搶占此鎖并執(zhí)行,直到滿足先驗條件為真時,其它線程可以通過notifyAll
方法重新令監(jiān)視此鎖的所有 WAITING 線程再次爭鎖并繼續(xù)工作。wait
/notify
/notifyAll
構成了線程之間的協商機制,見下面的代碼塊。
static class Status{public boolean v;} public static void main(String[] args) throws InterruptedException{ var status = new Status(); status.v = false; var mutex = new Object(); new Thread(()->{ synchronized (mutex){ System.out.println("get mutex"); // 此時檢測的狀態(tài)為 false, 進入 WAITING 狀態(tài)。 if(!status.v) try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();} // 被喚醒后重新檢測狀態(tài)為 true。 System.out.println(status.v); } }).start(); new Thread(()->{ synchronized (mutex){ // 將狀態(tài)設置為 true,喚醒上面的線程 status.v = true; mutex.notify(); } }).start(); }
只有處于 RUNNABLE 狀態(tài)的線程才會實際獲得 CPU 使用權。
在 Java 中,一切會發(fā)生阻塞的方法都會被要求處理 InterruptedException
受檢異常。調用阻塞方法的方法也會變成阻塞方法。線程內部有一個 boolean
類型的狀態(tài)位表示中斷,調用 interrupt
方法可以將該狀態(tài)位標識為 true
。
但是這不意味著該線程就會立刻中斷:
- 如果該線程并沒有調用阻塞的方法并一直處于 RUNNABLE 狀態(tài),則標記中斷不會有任何實際效果。
- 如果發(fā)起中斷時目標線程正處于阻塞狀態(tài),則會拋出
InterruptedException
異常。
同步工具類
Java 還提供了諸如信號量 ( Semaphore ),柵欄 ( Barrier ),以及閉鎖 ( Latch ) 作為同步工具類,它們都包含了一定的結構性屬性:這些狀態(tài)將決定執(zhí)行同步工具類的線程是執(zhí)行還是等待。
閉鎖
閉鎖是一種同步工具類,可以延遲線程的進度直到閉鎖打開。在此之前,所有的線程必須等待,而在閉鎖結束之后,這個鎖將永久保持打開狀態(tài)。這個特性適用于 需要確保某個任務的前序任務 ( 比如初始化 ) 全部完成之后才可以執(zhí)行的場合,見下方的代碼:Worker 線程等待另兩個初始化線程準備就緒之后輸出 p
的結果。
// class Point{int x,y;} final var p = new Point(); final var p_latch = new CountDownLatch(2); // Worker new Thread(()->{ try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();} System.out.printf("Point(x=%d,y=%d)",p.x,p.y); }).start(); // Init x new Thread(()->{ p.x = 1; p_latch.countDown(); }).start(); // Init y new Thread(()->{ p.y = 2; p_latch.countDown(); }).start();
FutureTask 也可以拿來做閉鎖,它實現了 Future 的語義,表示一個抽象的可生成結果的計算,一般需要由線程池驅動執(zhí)行,表示一個異步的任務。
Runnable 接口表示無返回值的計算,Callable<T> 代表有返回值的計算。
final var futurePoint = new FutureTask<>(()->new Point(1,2)); new Thread(futurePoint).start(); new Thread(()->{ try { // 在 Callable 計算出結果之前阻塞 var p = futurePoint.get(); System.out.printf("Point(x=%d,y=%d)",p.x,p.y); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }).start();
信號量
計數信號量用于控制某個資源的同時訪問數量,通常用于配置有容量限制的資源池,或稱有界阻塞容器。Semaphore 管理一組許可,線程在需要時首先獲取許可,并在操作結束之后歸還許可。如果許可數量被耗盡,那么線程則必須要阻塞到其它任意線程歸還許可 ( 默認情況下遵循 Non-Fair 策略 ) 為止。特別地,當信號量的許可數為 1 時,則可認為是不可重入的互斥鎖。
下面是一個利用信號量 + 同步容器實現的簡易阻塞隊列:
class BoundedBlockingQueue<E>{ final private List<E> list = Collections.synchronizedList(new LinkedList<>()); final private Semaphore se; public BoundedBlockingQueue(int cap){ se = new Semaphore(cap); } public void enqueue(E e) throws InterruptedException { se.acquire(); list.add(0,e); } public E dequeue(){ final var done = list.remove(0); se.release(); return done; } @Override public String toString() { return "BoundedBlockingQueue{" + "list=" + list + '}'; } }
柵欄
柵欄 ( Barrier ) 類似于閉鎖,同樣都會阻塞到某一個事件發(fā)生。閉鎖強調等待某個事件發(fā)生之后再執(zhí)行動作,而柵欄更強調在某個事件發(fā)生之前等待其它線程。它可用于實現一些協議:"所有人在指定的時間去會議室碰頭,等到所有的人到齊之后再開會",比如數據庫事務的兩階段提交。
Java 提供了一個名為 CyclicBarrier
的柵欄,它指定了 N 個工作線程 反復地 在柵欄位置匯集。在某線程執(zhí)行完畢之后,調用 await()
方法阻塞自身,以等待其它更慢的線程到達柵欄位置。當設定的 N 個線程均調用 await()
之后,柵欄將打開,此時所有的線程將可以繼續(xù)向下執(zhí)行代碼,而柵欄本身的狀態(tài)會重置,以便復用 ( 因而命名為 Cyclic- )。
見下面的代碼,4 個線程并行執(zhí)行初始化工作 ( 以隨機時間的 sleep
模擬延遲 ),并等待所有線程初始化完畢之后同時打印信息。
final int N = 4; final var barrier = new CyclicBarrier(N); final Thread[] workers = new Thread[N]; for(var i : new Integer[]{0,1,2,3}){ var t = new Thread(()->{ try { // 模擬隨機的延時 var rdm = new Random().nextInt(1000); Thread.sleep(rdm); // 在所有其它線程到達之前阻塞 barrier.await(); // 所有線程到達之后執(zhí)行,每個線程打印延時時間 System.out.printf("prepare for %d millis\n",rdm); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); workers[i] = t; t.start(); } // 等待所有的任務并行執(zhí)行完畢。 for(var worker : workers){worker.join();}
在不涉及 IO 操作和數據共享的計算問題當中,線程數量為 NCPU 或者 NCPU + 1 時會獲得最優(yōu)的吞吐量,更多的線程也不會帶來帶來幫助,甚至性能還會下降,因為 CPU 需要頻繁的切換上下文。
一旦線程成功地到達柵欄,則 await()
方法會其標記為 "子線程"。CyclicBarrier
的構造器還接受額外的 Runnable 接口做回調函數,當所有線程全部到達柵欄之后,CyclicBarrier
會從子線程當中挑選出一個領導線程去執(zhí)行它 ( 即,每一輪通過柵欄之后,它都會被執(zhí)行且僅一次 ),我們可以在此實現日志記錄等操作。
final var barrier = new CyclicBarrier(N,()->{ System.out.println("all runners ready"); });
在并行任務中構建高效的緩存
為了用簡單的例子說明問題,我們在這里特別強調并行 ( Parallel ) 任務,這些任務的計算過程是純粹 ( Pure ) 的 —— 這樣的函數被稱之純函數。無論它們何時被調用,被哪個線程調用,同樣的輸入永遠得到同樣的輸出。純函數不和外部環(huán)境交互,因此自然也就不存在競態(tài)條件。
一個非常自然的想法是使用緩存 ( 或稱記憶機制 Memorized ) 避免重復的運算。在純粹的映射關系中,固定的輸入總是對應固定的輸出,因此使用 K-V 鍵值對來記憶結果再好不過了。我們基于 HashMap 給出最簡單的一版實現,然后再探討如何改進它們。
class MapCacheV1 { private final HashMap<Integer,String> cache = new HashMap<>(); public synchronized String getResult(Integer id){ var v = cache.get(id); if (v == null){ // 設定中,這個靜態(tài)方法具有 500ms 左右的延遲。 v = PURE.slowOperation(id); cache.put(id,v); } return v; } }
盡管我們打算將 MapCache 用于無競態(tài)條件的并行任務,但 getResult()
方法仍然加上了同步鎖,因為 HashMap 本身不是線程安全的,cache
需要以安全的方式被并發(fā)訪問。然而,這種做法無疑會使得 getResult()
方法變得十分笨重,因為原本可以并行的慢操作 PURE.slowOperation()
也被鎖在了代碼塊內部。
最先想到的是使用更加高效的 ConcurrentHashMap
類取代線程不安全的 HashMap
,以獲得免費的多線程性能提升:
class MapCacheV2 { private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>(); public String getResult(Integer id){ var v = cache.get(id); if(v == null){ v = PURE.slowOperation(id); cache.put(id,v); } return v; } }
同時,我們這一次取消掉了 getResult()
上的同步鎖。這樣,多線程可以并行地執(zhí)行慢操作,只在修改 cache
時發(fā)生競爭。但這個緩存仍有一些不足 —— 當某個線程 A 在計算新值時 ( 即這 500ms 之內 ),其它線程并不知道。因此,多個線程有可能會計算同一個新值,甚至導致其它的計算任務無法進行。
針對這個問題,我們再一次提出改進。不妨讓 cache
保存 "計算過程",而非值。
這樣,工作線程將有三種行為:
- 緩存中沒有此計算任務,注冊并執(zhí)行。
- 緩存中有此計算任務,但未完畢,當前線程阻塞 ( 將 CPU 讓給其它需要計算的線程 )。
- 緩存中有此計算任務,且已計算完畢,直接返回。
回顧前文在閉鎖中提到的 FutureTask<V>
類型,它適合用于當前的實現,見下方的代碼:
class MapCacheV3 { private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>(); public String getResult(Integer id) throws ExecutionException, InterruptedException { // 獲取一個計算任務,而非值 final var task = cache.get(id); if(task == null){ final var newTask = new FutureTask<>(()-> PURE.slowOperation(id)); // cache.putIfAbsent() cache.put(id,newTask); newTask.run(); // 提交并執(zhí)行任務。 return newTask.get(); }else return task.get(); } }
MapCacheV3
的實現已經近乎完美了。唯一不足的是:我們對 cache
的操作仍然是 "先判斷后執(zhí)行" 的復合操作,但現在 getResult
并沒有同步鎖的保護。兩個線程仍然同時調用 cache.get()
并判空,并開始執(zhí)行重復的計算。
下面的版本給出了最終的解決方案:使用 ConcurrentMap
的 putIfAbsent()
原子方法修復可能重復添加計算任務的問題。
class MapCacheV4 { private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>(); public String getResult(Integer id) throws ExecutionException, InterruptedException { final var task = cache.get(id); if(task == null){ final var newTask = new FutureTask<>(()-> PURE.slowOperation(id)); // put 和 putIfAbsent 方法均會返回此 Key 對應的上一個舊值 Value。 // 如果 put 的是一個新的 Key,則返回值為 null。 final var running = cache.putIfAbsent(id,newTask); if(running == null) {newTask.run();return newTask.get();} else return running.get(); }else return task.get(); } }
值得注意的是,一旦 cache
存儲的是計算任務而非值,那么就可能存在緩存污染的問題。一旦某個 FutureTask 的計算被取消,或者失敗,應當及時將它從緩存中移除以保證將來的計算成功,而不是放任其駐留在緩存內部返回失敗的結果。
緩存思想幾乎應用在各個地方。比如在 Web 服務中,用戶的數據往往不會總是直接來自數據庫,而是 Redis 這樣的消息中間件。在實際的應用環(huán)境下,還有更加復雜的問題需要被考慮到,比如緩存內容過時 ( expired ),或者是定期清理緩存空間等。
到此這篇關于Java 同步工具與組合類的線程安全性解析的文章就介紹到這了,更多相關Java 同步工具內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java中List使用stream流轉成map的幾種方式詳解
Stream是Java8中處理集合的關鍵抽象概念,它可以指定你希望對集合進行的操作,可以執(zhí)行非常復雜的查找、過濾和映射數據等操作,下面這篇文章主要給大家介紹了關于Java中List使用stream流轉成map的幾種方式,需要的朋友可以參考下2023-04-04spring boot整合mybatis利用Mysql實現主鍵UUID的方法
這篇文章主要給大家介紹了關于spring boot整合mybatis利用Mysql實現主鍵UUID的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧。2018-03-03spring cloud consul使用ip注冊服務的方法示例
這篇文章主要介紹了spring cloud consul使用ip注冊服務的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-03-03Mybatis查詢返回Map<String,Object>類型的實現
本文主要介紹了Mybatis查詢返回Map<String,Object>類型的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-07-07