欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java?同步工具與組合類(lèi)的線程安全性解析

 更新時(shí)間:2022年09月02日 09:36:03   作者:花花子???????  
這篇文章主要介紹了Java?同步工具與組合類(lèi)的線程安全性解析,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下

何為線程安全的類(lèi)?

一般來(lái)說(shuō),我們要設(shè)計(jì)一個(gè)線程安全的類(lèi),要從三個(gè)方面去考慮:

  • 構(gòu)成狀態(tài)的所有變量。比如某個(gè)域是集合類(lèi)型,則集合元素也構(gòu)成該實(shí)例的狀態(tài)。
  • 某些操作所隱含的不變性條件。
  • 變量的所有權(quán),或稱(chēng)它是否會(huì)被發(fā)布。

基于條件的同步策略

不變性條件取決于類(lèi)的語(yǔ)義,比如說(shuō)計(jì)數(shù)器類(lèi)的 counter 屬性被設(shè)置為 Integer 類(lèi)型,雖然其域值在 Integer.MIN_VALUE 到 Integer.MAX_VALUE 之間,但是它的值必須非負(fù)。即:隨著計(jì)數(shù)的進(jìn)行,conuter >= 0 總是成立。

除了不變性條件之外,一些操作還需要通過(guò)后驗(yàn)條件,以此判斷狀態(tài)的更改是否有效。比如一個(gè)計(jì)數(shù)器計(jì)到 17 時(shí),它的下一個(gè)狀態(tài)只可能是 18。這實(shí)際涉及到了對(duì)原先狀態(tài)的 "讀 - 改 - 寫(xiě)" 三個(gè)連續(xù)的步驟,典型的如自增 ++ 等。"無(wú)記憶性" 的狀態(tài)是不需要后驗(yàn)條件的,比如每隔一段時(shí)間測(cè)量的溫度值。

先驗(yàn)條件可能是更加關(guān)注的問(wèn)題,因?yàn)?"先判斷后執(zhí)行" 的邏輯到處存在。比如說(shuō)對(duì)一個(gè)列表執(zhí)行 remove 操作時(shí),首先需要保證列表是非空的,否則就應(yīng)該拋出異常。

在并發(fā)環(huán)境下,這些條件均可能會(huì)隨著其它線程的修改而出現(xiàn)失真。

狀態(tài)發(fā)布與所有權(quán)

在許多情況下,所有權(quán)和封裝性是相互關(guān)聯(lián)的。比如對(duì)象通過(guò) private 關(guān)鍵字封裝了它的狀態(tài),即表明實(shí)例獨(dú)占對(duì)該狀態(tài)的所有權(quán) ( 所有權(quán)意味控制權(quán) )。反之,則稱(chēng)該狀態(tài)被發(fā)布。被發(fā)布的實(shí)例狀態(tài)可能會(huì)被到處修改,因此它們?cè)诙嗑€程環(huán)境中也存在風(fēng)險(xiǎn)。

容器類(lèi)通常和元素表現(xiàn)出 "所有權(quán)" 分離的形式。比如說(shuō)一個(gè)聲明為 final 的列表,客戶端雖然無(wú)法修改其本身的引用,但可以自由地修改其元素的狀態(tài)。這些事實(shí)上被發(fā)布的元素必須被安全地共享。

這要求元素:

  • 自身是事實(shí)不可變的實(shí)例。
  • 線程安全的實(shí)例。
  • 被鎖保護(hù)。

實(shí)例封閉

大多數(shù)對(duì)象都是組合對(duì)象,或者說(shuō)這些狀態(tài)也是對(duì)象。對(duì)組合類(lèi)的線程安全性分析大致分為兩類(lèi):

  • 如果這些狀態(tài)線程不安全,那應(yīng)該如何安全地使用組合類(lèi)?
  • 即使所有的狀態(tài)都線程安全,是否可以推斷組合類(lèi)也線程安全?或者說(shuō)組合類(lèi)是否還需要額外的同步策略?

對(duì)于第一個(gè)問(wèn)題,見(jiàn)下方的 Bank 代碼,它模擬了一個(gè)轉(zhuǎn)賬業(yè)務(wù):

class Bank {
    private Integer amount_A = 100;
    private Integer amount_B = 50;
    public synchronized void transaction(Integer amount){
        var log_0 = amount_A + amount_B;
        amount_A += amount;
        amount_B -= amount;
        var log_1 = amount_A + amount_B;
        assert log_0 == log_1;
    }
}

雖然 amount_A 和 amount_B 本身作為普通的 Integer 類(lèi)型并不是線程安全的,但是它們具備線程安全的語(yǔ)義:

  • 它們是 private 成員,因此不存在被意外共享的問(wèn)題。
  • 它們唯一與外界交互的 transaction() 方法被鎖保護(hù)。

也可以理解成:Bank 是為兩個(gè) Integer 狀態(tài)提供線程安全性的容器。在此處,同步策略由 synchronized 內(nèi)置鎖實(shí)現(xiàn)。

編譯器會(huì)在 synchronized 的代碼區(qū)前后安插 monitorenter 和 monitorexit 字節(jié)碼表示進(jìn)入 / 退出同步代碼塊。Java 的內(nèi)置鎖也稱(chēng)之監(jiān)視器鎖,或者監(jiān)視器。

至于第二個(gè)問(wèn)題,答案是:看情況,具體地說(shuō)是分析是否存在不變性條件,在這里,它指代在轉(zhuǎn)賬過(guò)程當(dāng)中,a 和 b 兩個(gè)賬戶的余額之和應(yīng)當(dāng)不變。如果使用原子類(lèi)型保護(hù) amount_A 和 amount_B 的狀態(tài),那么是否就可以撤下 transaction() 方法上的內(nèi)置鎖了?

class UnsafeBank {
    private final AtomicInteger amount_A = new AtomicInteger(100);
    private final AtomicInteger amount_B = new AtomicInteger(50);
    public void transaction(Integer amount){
        amount_A.set(amount_A.get() - amount);
        amount_B.set(amount_B.get() + amount);
    }
}

transaction() 方法現(xiàn)在失去了鎖的保護(hù)。這樣,某線程 A 在執(zhí)行交易的過(guò)程中,另一個(gè)線程 B 也可能會(huì) "趁機(jī)" 修改 amount_B 的賬目 —— 這個(gè)時(shí)機(jī)發(fā)生在線程 A 執(zhí)行 amount_B.get() 之后,但在 amount_B.set() 之前。最終,B 線程的修改將被覆蓋而丟失,在它看來(lái),盡管兩個(gè)狀態(tài)均是原子變量,但不變性條件仍然被破壞了。

由此得到一個(gè)結(jié)論 —— 就算所有的可變狀態(tài)都是原子的,我們可能仍需要在封裝類(lèi)的層面進(jìn)一步考慮同步策略,最簡(jiǎn)單直接的就是找出封裝類(lèi)內(nèi)的所有復(fù)合操作:

  • 對(duì)同一個(gè)變量 ( 反復(fù) ) 讀-改-寫(xiě)。
  • 修改受某個(gè)不變性條件約束的多個(gè)變量。

正確地拓展同步策略

在大部分情況下,我們不能通過(guò)直接修改類(lèi)源碼的形式補(bǔ)充同步策略。比如,普通的 List<T> 接口不保證底下的各種實(shí)現(xiàn)是線程安全的,但我們可以通過(guò)類(lèi)似代理的方式將線程安全委托給第三方。

比如:

class ThreadSafeArrayList {
    private final List<Integer> list;
    public ThreadSafeArrayList(List<Integer> l){list =  l;}
    
    // 添加新的方法
    public synchronized boolean putIfAbsent(Integer a){
        if(list.contains(a)) {
            list.add(a);
            return true;
        }
        return false;
    }
    // 代理 add 方法,其它略
    public synchronized boolean add(Integer a) {
        return list.add(a);
    }

    // ...
}

事實(shí)上,Java 類(lèi)庫(kù)已經(jīng)有了對(duì)應(yīng)的線程安全類(lèi)。通常,我們應(yīng)當(dāng)優(yōu)先重用這些已有的類(lèi)。在下方的代碼塊中,我們使用 Collection.synchronizedList 工廠方法創(chuàng)建一個(gè)線程安全的 list 對(duì)象,這樣似乎就只需要為新拓展的 putIfAbsent() 方法加鎖了。

class ThreadUnSafeArrayList {
    private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());

    // 添加新的方法
    public synchronized boolean putIfAbsent(Integer a){
        if(list.contains(a)) {
            list.add(a);
            return true;
        }
        return false;
    }
    
    public boolean add(Integer a){return list.add(a);}
    //...
}

但是,上述的代碼是錯(cuò)誤的。為什么?問(wèn)題在于,我們使用了錯(cuò)誤的鎖進(jìn)行了同步。當(dāng)調(diào)用的是 add 方法時(shí),使用的是列表對(duì)象的內(nèi)置鎖;但調(diào)用 putIfAbsent 方法時(shí),我們使用的卻是 ThreadUnsafeArrayList 對(duì)象的內(nèi)置鎖。這意味著 putIfAbsent 方法對(duì)于其它的方法來(lái)說(shuō)不是原子的,因此無(wú)法確保一個(gè)線程執(zhí)行 putIfAbsent 方法時(shí),其它線程是否會(huì)通過(guò)調(diào)用其它方法修改列表。

因此,想要讓這個(gè)方法正確執(zhí)行,我們必須要在正確的地方上鎖。

class ThreadUnSafeArrayList {
    private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
    public boolean putIfAbsent(Integer a){
        synchronized (list){
            if(list.contains(a)) {
                list.add(a);
                return true;
            }
            return false;
        }
    }
}

同步容器

同步容器是安全的,但在某些情況下仍然需要客戶端加鎖。

常見(jiàn)的操作如:

  • 迭代;
  • 跳轉(zhuǎn) ( 比如,尋找下一個(gè)元素 );
  • 條件運(yùn)算,如 "若沒(méi)有則 XX 操作" ( 一種常見(jiàn)的復(fù)合操作 );

復(fù)合操作不受同步容器保護(hù)

這里有兩個(gè)線程 T1,T2 分別會(huì)以不可預(yù)測(cè)的次序執(zhí)行兩個(gè)代碼塊,它們負(fù)責(zé)刪除和讀取 list 中的末尾元素。我們?cè)谶@里使用的是庫(kù)中的同步列表,因此可以確保 size(),remove()get() 方法全部是原子的。但是,當(dāng)程序以 x1,y1,x2,y2 的操作次序執(zhí)行時(shí),主程序最終仍然會(huì)拋出IndexOutOfBoundsException 異常。

class DemoOfConcurrentFail {
    public final List<Integer> list = Collections.synchronizedList(new ArrayList<>());

    {
        Collections.addAll(list, 1, 2, 3, 4, 5);
    }
    public static void main(String[] args) {
        var testList = new DemoOfConcurrentFail().list;
        Runnable t1 = () -> {
            var last = testList.size() - 1;  // x1
            testList.remove(last);  // x2
        };
        Runnable t2 = () -> {
            var last = testList.size() -1;  // y1
            var  r = testList.get(last);  // y2
            System.out.println(r);
        };
        new Thread(t1).start();
        new Thread(t2).start();

    }
}

究其原因,兩個(gè)線程 T1,T2 執(zhí)行的復(fù)合操作沒(méi)有受鎖保護(hù) ( 實(shí)際上就是前文銀行轉(zhuǎn)賬的例子中犯過(guò)的錯(cuò)誤 )。所以正確的做法是對(duì)復(fù)合操作整體加鎖。

比如:

var mutex = new Object();
Runnable t1 = () -> {
    synchronized (mutex){
        var last = testList.size() - 1;  // x1
        testList.remove(last);  // x2
    }
};
Runnable t2 = () -> {
    synchronized (mutex){
        var last = testList.size() -1;  // y1
        var  r = testList.get(last);  // y2
        System.out.println(r);
    }
};

// ...

同步容器的迭代問(wèn)題

在迭代操作中,類(lèi)似的問(wèn)題也仍然存在。無(wú)論是直接的 for 循環(huán)還是 for-each 循環(huán),對(duì)容器的遍歷方式是使用 Iterator。而使用迭代器本身也是先判斷 ( hasNext ) 再讀取 ( next ) 的復(fù)合過(guò)程。Java 對(duì)同步容器的迭代處理是:假設(shè)某一個(gè)線程在迭代的過(guò)程中發(fā)現(xiàn)容器被修改過(guò)了,則立刻失敗 ( 也稱(chēng)及時(shí)失敗 ),并拋出一個(gè) ConcurrentModificationException 異常。

// 可能需要運(yùn)行多次才能拋出 ConcurrentModificationException
Runnable t1 = () -> {
    // 刪除中間的元素
    int mid =  testList.size() / 2;
    testList.remove(mid);
};
Runnable t2 = () -> {
    for(var item : testList){
        System.out.println(item);
    }
};
new Thread(t1).start();
new Thread(t2).start();

類(lèi)似地,想要不受打擾地迭代容器元素,我們也要在 for 循環(huán)的外面加鎖,但是可能并不是一個(gè)好的主意。假如容器的規(guī)模非常大,或者每個(gè)元素的處理時(shí)間非常長(zhǎng),那么其它等待容器執(zhí)行短作業(yè)的線程會(huì)因此陷入長(zhǎng)時(shí)間的等待,這會(huì)帶來(lái)活躍性問(wèn)題。

一個(gè)可行的方法就是實(shí)現(xiàn)讀寫(xiě)分離 —— 一旦有寫(xiě)操作,則重新拷貝一份新的容器副本,而在此期間所有讀操作則仍在原來(lái)的容器中進(jìn)行,實(shí)現(xiàn) "讀-讀共享"。當(dāng)讀操作遠(yuǎn)多于寫(xiě)操作時(shí),這種做法無(wú)疑可以大幅度地提高程序的吞吐量,見(jiàn)后文的并發(fā)容器 CopyOnWriteArrayList。

警惕隱含迭代的操作

不僅是顯式的 for 循環(huán)會(huì)觸發(fā)迭代。比如容器的 toString 方法在底層調(diào)用 StringBuilder.append() 方法依次將每一個(gè)元素的字符串拼接起來(lái)。除此之外,包括 equalscontainsAll,removeAll,retainAll,乃至將容器本身作為參數(shù)的構(gòu)造器,都隱含了對(duì)容器的迭代過(guò)程。這些間接的迭代錯(cuò)誤都有可能拋出 ConcurrentModificationException 異常。

并發(fā)容器

考慮到重量級(jí)鎖對(duì)性能的影響,Java 后續(xù)提供了各種并發(fā)容器來(lái)改進(jìn)同步容器的性能問(wèn)題。同步容器將所有操作完全串行化。當(dāng)鎖競(jìng)爭(zhēng)尤其激烈時(shí),程序的吞吐量將大大降低。因此,使用并發(fā)容器來(lái)替代同步容器,在絕大部分情況下都算是一頓 "免費(fèi)的午餐"。

ConcurrentHashMap

ConcurrentHashMap 使用了更小的封鎖粒度換取了更大程度的共享,這個(gè)封鎖機(jī)制稱(chēng)之為分段鎖 ( Lock Stripping )。簡(jiǎn)單點(diǎn)說(shuō),就是每一個(gè)桶由單獨(dú)的鎖來(lái)保護(hù),操作不同桶的兩個(gè)線程不需要相互等待。好處是,在高并發(fā)環(huán)境下,ConcurrentHashMap 帶來(lái)了更大的吞吐量,但問(wèn)題是,封鎖粒度的減小削弱了容器的一致性語(yǔ)義,或稱(chēng)弱一致性 ( Weakly Consistent )。

比如說(shuō)需要在整個(gè) Map 上計(jì)算的 size() 和 isEmpty() 方法,弱一致性會(huì)使得這些方法的計(jì)算結(jié)果是一個(gè)過(guò)期值。這考慮到是一個(gè)權(quán)衡,因?yàn)樵诓l(fā)環(huán)境下,這兩個(gè)方法的作用很小,因?yàn)槠浞祷刂悼偸遣粩嘧兓?。因此,這些操作的需求被弱化了,以換取其它更重要的性能優(yōu)化,比如 get,put,cotainsKey,remove 等。

因此,除非一部分嚴(yán)謹(jǐn)?shù)臉I(yè)務(wù)無(wú)法容忍弱一致性,否則并發(fā)的 HashMap 是要比同步 HashMap 更優(yōu)的選擇。

CopyOnWriteArrayList

該工具在讀操作遠(yuǎn)多于寫(xiě)操作的場(chǎng)合下能夠提供更好的并發(fā)性能,在迭代時(shí)不需要對(duì)容器進(jìn)行加鎖或者復(fù)制。當(dāng)發(fā)生修改時(shí),該容器會(huì)創(chuàng)建并重新發(fā)布一個(gè)新的容器副本。在新副本創(chuàng)建之前,一切讀操作仍然以舊的容器為準(zhǔn),因此這不會(huì)拋出 ConcurrentModificationException 問(wèn)題。

相對(duì)的,如果頻繁調(diào)用 add,remove,set 等方法,則該容器的吞吐量會(huì)大大降低,因?yàn)檫@些操作需要反復(fù)調(diào)用系統(tǒng)的 copy 方法復(fù)制底層的數(shù)組 ( 這也是沒(méi)有設(shè)計(jì) "CopyOnWriteLinkedList" 的原因,因?yàn)榭截惖男蕰?huì)更低 )。同時(shí),寫(xiě)入時(shí)復(fù)制的特性使得 CopyOnWriteArrayList 是弱一致性的。

阻塞隊(duì)列 & 生產(chǎn)者 — 消費(fèi)者模式

阻塞隊(duì)列,簡(jiǎn)單地說(shuō),就是當(dāng)隊(duì)列為空時(shí),執(zhí)行 take 操作會(huì)進(jìn)入阻塞狀態(tài);當(dāng)隊(duì)列滿時(shí),執(zhí)行 put 操作也會(huì)進(jìn)入阻塞狀態(tài)。阻塞隊(duì)列也可以分有界隊(duì)列和無(wú)界隊(duì)列。無(wú)界隊(duì)列永遠(yuǎn)不會(huì)充滿,因此執(zhí)行 put 方法永遠(yuǎn)不會(huì)進(jìn)入阻塞狀態(tài)。但是,如果生產(chǎn)者的執(zhí)行效率遠(yuǎn)超過(guò)消費(fèi)者,那么無(wú)界隊(duì)列的無(wú)限擴(kuò)張最終會(huì)耗盡內(nèi)存。有界隊(duì)列則可以保證當(dāng)隊(duì)列充滿時(shí),生產(chǎn)者被 put 阻塞,通過(guò)這種方式來(lái)讓消費(fèi)者趕上工作進(jìn)度。

可以用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者 — 消費(fèi)者模式,最常見(jiàn)的生產(chǎn)者 — 消費(fèi)者模式是線程池與工作隊(duì)列的組合。這種模式將 "發(fā)布任務(wù)" 與 "領(lǐng)取任務(wù)" 解耦,最大的便捷是簡(jiǎn)化了復(fù)雜的負(fù)載管理,因?yàn)樯a(chǎn)者和消費(fèi)者的執(zhí)行速度并不總是相匹配的。同時(shí),生產(chǎn)者和消費(fèi)者的角色是相對(duì)的。比如處于流水線中游的組件,它們既作為上游的消費(fèi)者,也作為下游的生產(chǎn)者。

Java 庫(kù)已經(jīng)包含了關(guān)于阻塞隊(duì)列的多種實(shí)現(xiàn),它自身保證 put 和 take 操作是線程安全的。

  • LinkedBlockingQueue 和 ArrayBlockingQueue:此兩者的區(qū)別可以參考 Link 和 Array,見(jiàn):Java中ArrayBlockingQueue和LinkedBlockingQueue。兩者均為 FIFO 的隊(duì)列。
  • PriorityBlockingQueue:優(yōu)先級(jí)隊(duì)列,當(dāng)我們希望以一定次序處理任務(wù)時(shí),它要比 FIFO 隊(duì)列更實(shí)用。
  • SynchronousQueue:譯為同步阻塞隊(duì)列。這個(gè)隊(duì)列事實(shí)上沒(méi)有緩存空間,而是維護(hù)一組可用的線程。當(dāng)隊(duì)列收到消息時(shí),它可以立刻分配一個(gè)線程去處理。但是如果沒(méi)有多余的工作線程,那么調(diào)用 put 或者 take 會(huì)立刻陷入阻塞狀態(tài)。因此,僅當(dāng)有足夠多的消費(fèi)者,并且總是有一個(gè)消費(fèi)者準(zhǔn)備好獲取交付的工作時(shí),才適合使用同步隊(duì)列。

下方的代碼塊是由 SynchronousQueue 實(shí)現(xiàn)的簡(jiǎn)易 Demo,每個(gè)線程會(huì)搶占式消費(fèi)消息。

var chan = new SynchronousQueue<Integer>();
var worker = new Thread(()->{
    while(true){
        try {
            final var x = chan.take();
            System.out.println("t1 consume: " + x);
        } catch (InterruptedException e) {e.printStackTrace();}
    }
});
var worker2 = new Thread(()->{
    while(true){
        try {
            final var x = chan.take();
            System.out.println("t2 consume: " + x);
        } catch (InterruptedException e) {e.printStackTrace();}
    }
});
worker.start();
worker2.start();

for(var i = 0 ; i < 10; i ++) chan.put(i);

基于所有權(quán)的角度去分析,生產(chǎn)者 — 消費(fèi)者模式和阻塞隊(duì)列一起促進(jìn)了 串行的線程封閉。線程封閉對(duì)象只能由單個(gè)對(duì)象擁有,但可以通過(guò)在執(zhí)行的最后發(fā)布該對(duì)象 ( 即表示之后不會(huì)再使用它 ),以表示 "轉(zhuǎn)讓" 所有權(quán)。

阻塞隊(duì)列簡(jiǎn)化了轉(zhuǎn)移的邏輯。除此之外,還可以通過(guò) ConcurrentMap 的原子方法 remove,或者是 AtomicReference 的 compareAndSet ( 即 CAS 機(jī)制 ) 實(shí)現(xiàn)安全的串行線程封閉。

雙端隊(duì)列和工作竊取

Java 6 之后增加了新的容器類(lèi)型 —— Deque 和 BlockDeque,它們是對(duì) Queue 以及 BlockingQueue 的拓展。Deque 實(shí)現(xiàn)了再隊(duì)列頭和隊(duì)列尾的高效插入和移除,具體實(shí)現(xiàn)包括了 ArrayDeque 和 LinkedBlockingDeque。

雙端隊(duì)列適用于另一種工作模式 —— 工作竊取 ( Work Stealing )。比如,一個(gè)工作線程已經(jīng)完成清空了自己的任務(wù)隊(duì)列,它就可以從其它忙碌的工作線程的任務(wù)隊(duì)列的尾部獲取隊(duì)列。這種模式要比生產(chǎn)者 —— 消費(fèi)者具備更高的可伸縮性,因?yàn)楣ぷ骶€程不會(huì)在單個(gè)共享的任務(wù)隊(duì)列上發(fā)生競(jìng)爭(zhēng)。

工作竊取特別適合遞歸的并發(fā)問(wèn)題,即執(zhí)行一個(gè)任務(wù)時(shí)會(huì)產(chǎn)生更多的工作,比如:Web 爬蟲(chóng),GC 垃圾回收時(shí)的圖搜索算法。

阻塞和中斷方法

線程可能會(huì)被阻塞,或者是暫停執(zhí)行,原因有多種:等待 I/O 結(jié)束,等待獲得鎖,等待從 Thread.sleep 中喚醒,等待另一個(gè)線程的計(jì)算結(jié)果。被阻塞的線程必須要在這些 "外因" 被解決之后才有機(jī)會(huì)繼續(xù)執(zhí)行,即恢復(fù)到 RUNNABLE ( 也稱(chēng)就緒 ) 狀態(tài),等待被再次調(diào)度 CPU 執(zhí)行。

這段描述其實(shí)對(duì)應(yīng)了 JVM 線程的兩個(gè)狀態(tài):BLOCKING 和 WAITING。

  • BLOCKING,當(dāng)線程準(zhǔn)備進(jìn)入一段新的同步代碼塊時(shí),因不能獲得鎖而等待。
  • WAITING,當(dāng)線程已經(jīng)進(jìn)入同步代碼塊之后,在執(zhí)行的過(guò)程中因不滿足某些條件而暫停。這時(shí)可以調(diào)用 waiting 方法 釋放已占據(jù)的鎖。其它工作線程得以搶占此鎖并執(zhí)行,直到滿足先驗(yàn)條件為真時(shí),其它線程可以通過(guò) notifyAll 方法重新令監(jiān)視此鎖的所有 WAITING 線程再次爭(zhēng)鎖并繼續(xù)工作。wait / notify / notifyAll 構(gòu)成了線程之間的協(xié)商機(jī)制,見(jiàn)下面的代碼塊。
static class Status{public boolean v;}
public static void main(String[] args) throws InterruptedException{
    var status = new Status();
    status.v = false;
    var mutex = new Object();
    new Thread(()->{
        synchronized (mutex){
            System.out.println("get mutex");
            // 此時(shí)檢測(cè)的狀態(tài)為 false, 進(jìn)入 WAITING 狀態(tài)。
            if(!status.v) try {mutex.wait();} catch (InterruptedException e) {e.printStackTrace();}
            // 被喚醒后重新檢測(cè)狀態(tài)為 true。
            System.out.println(status.v);
        }
    }).start();
    new Thread(()->{
        synchronized (mutex){
            // 將狀態(tài)設(shè)置為 true,喚醒上面的線程
            status.v = true;
            mutex.notify(); 
        }
    }).start();
}

只有處于 RUNNABLE 狀態(tài)的線程才會(huì)實(shí)際獲得 CPU 使用權(quán)。

Java些操使線程釋放鎖資源

Java JVM中線程狀態(tài)詳解

在 Java 中,一切會(huì)發(fā)生阻塞的方法都會(huì)被要求處理 InterruptedException 受檢異常。調(diào)用阻塞方法的方法也會(huì)變成阻塞方法。線程內(nèi)部有一個(gè) boolean 類(lèi)型的狀態(tài)位表示中斷,調(diào)用 interrupt 方法可以將該狀態(tài)位標(biāo)識(shí)為 true。

但是這不意味著該線程就會(huì)立刻中斷:

  • 如果該線程并沒(méi)有調(diào)用阻塞的方法并一直處于 RUNNABLE 狀態(tài),則標(biāo)記中斷不會(huì)有任何實(shí)際效果。
  • 如果發(fā)起中斷時(shí)目標(biāo)線程正處于阻塞狀態(tài),則會(huì)拋出 InterruptedException 異常。

同步工具類(lèi)

Java 還提供了諸如信號(hào)量 ( Semaphore ),柵欄 ( Barrier ),以及閉鎖 ( Latch ) 作為同步工具類(lèi),它們都包含了一定的結(jié)構(gòu)性屬性:這些狀態(tài)將決定執(zhí)行同步工具類(lèi)的線程是執(zhí)行還是等待。

閉鎖

閉鎖是一種同步工具類(lèi),可以延遲線程的進(jìn)度直到閉鎖打開(kāi)。在此之前,所有的線程必須等待,而在閉鎖結(jié)束之后,這個(gè)鎖將永久保持打開(kāi)狀態(tài)。這個(gè)特性適用于 需要確保某個(gè)任務(wù)的前序任務(wù) ( 比如初始化 ) 全部完成之后才可以執(zhí)行的場(chǎng)合,見(jiàn)下方的代碼:Worker 線程等待另兩個(gè)初始化線程準(zhǔn)備就緒之后輸出 p 的結(jié)果。

// class Point{int x,y;}
final var p = new Point();
final var p_latch = new CountDownLatch(2);
// Worker
new Thread(()->{
    try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();}
    System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
}).start();

// Init x
new Thread(()->{
    p.x = 1;
    p_latch.countDown();
}).start();

// Init y
new Thread(()->{
    p.y = 2;
    p_latch.countDown();
}).start();

FutureTask 也可以拿來(lái)做閉鎖,它實(shí)現(xiàn)了 Future 的語(yǔ)義,表示一個(gè)抽象的可生成結(jié)果的計(jì)算,一般需要由線程池驅(qū)動(dòng)執(zhí)行,表示一個(gè)異步的任務(wù)。

Runnable 接口表示無(wú)返回值的計(jì)算,Callable<T> 代表有返回值的計(jì)算。

final var futurePoint = new FutureTask<>(()->new Point(1,2));
new Thread(futurePoint).start();
new Thread(()->{
    try {
        // 在 Callable 計(jì)算出結(jié)果之前阻塞
        var p = futurePoint.get();
        System.out.printf("Point(x=%d,y=%d)",p.x,p.y);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).start();

信號(hào)量

計(jì)數(shù)信號(hào)量用于控制某個(gè)資源的同時(shí)訪問(wèn)數(shù)量,通常用于配置有容量限制的資源池,或稱(chēng)有界阻塞容器。Semaphore 管理一組許可,線程在需要時(shí)首先獲取許可,并在操作結(jié)束之后歸還許可。如果許可數(shù)量被耗盡,那么線程則必須要阻塞到其它任意線程歸還許可 ( 默認(rèn)情況下遵循 Non-Fair 策略 ) 為止。特別地,當(dāng)信號(hào)量的許可數(shù)為 1 時(shí),則可認(rèn)為是不可重入的互斥鎖。

下面是一個(gè)利用信號(hào)量 + 同步容器實(shí)現(xiàn)的簡(jiǎn)易阻塞隊(duì)列:

class BoundedBlockingQueue<E>{
    final private List<E> list = Collections.synchronizedList(new LinkedList<>());
    final private Semaphore se;

    public BoundedBlockingQueue(int cap){
        se = new Semaphore(cap);
    }
    public void enqueue(E e) throws InterruptedException {
        se.acquire();
        list.add(0,e);
    }
    public E dequeue(){
        final var done = list.remove(0);
        se.release();
        return done;
    }
    @Override
    public String toString() {
        return "BoundedBlockingQueue{" +
                "list=" + list +
                '}';
    }
}

柵欄

柵欄 ( Barrier ) 類(lèi)似于閉鎖,同樣都會(huì)阻塞到某一個(gè)事件發(fā)生。閉鎖強(qiáng)調(diào)等待某個(gè)事件發(fā)生之后再執(zhí)行動(dòng)作,而柵欄更強(qiáng)調(diào)在某個(gè)事件發(fā)生之前等待其它線程。它可用于實(shí)現(xiàn)一些協(xié)議:"所有人在指定的時(shí)間去會(huì)議室碰頭,等到所有的人到齊之后再開(kāi)會(huì)",比如數(shù)據(jù)庫(kù)事務(wù)的兩階段提交。

Java 提供了一個(gè)名為 CyclicBarrier 的柵欄,它指定了 N 個(gè)工作線程 反復(fù)地 在柵欄位置匯集。在某線程執(zhí)行完畢之后,調(diào)用 await() 方法阻塞自身,以等待其它更慢的線程到達(dá)柵欄位置。當(dāng)設(shè)定的 N 個(gè)線程均調(diào)用 await() 之后,柵欄將打開(kāi),此時(shí)所有的線程將可以繼續(xù)向下執(zhí)行代碼,而柵欄本身的狀態(tài)會(huì)重置,以便復(fù)用 ( 因而命名為 Cyclic- )。

見(jiàn)下面的代碼,4 個(gè)線程并行執(zhí)行初始化工作 ( 以隨機(jī)時(shí)間的 sleep 模擬延遲 ),并等待所有線程初始化完畢之后同時(shí)打印信息。

final int N = 4;
final var barrier =  new CyclicBarrier(N);
final Thread[] workers = new Thread[N];

for(var i : new Integer[]{0,1,2,3}){
    var t = new Thread(()->{
        try {
            // 模擬隨機(jī)的延時(shí)
            var rdm = new Random().nextInt(1000);
            Thread.sleep(rdm);

            // 在所有其它線程到達(dá)之前阻塞
            barrier.await();

            // 所有線程到達(dá)之后執(zhí)行,每個(gè)線程打印延時(shí)時(shí)間
            System.out.printf("prepare for %d millis\n",rdm);
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    });
    workers[i] = t;
    t.start();
}
// 等待所有的任務(wù)并行執(zhí)行完畢。
for(var worker : workers){worker.join();}

在不涉及 IO 操作和數(shù)據(jù)共享的計(jì)算問(wèn)題當(dāng)中,線程數(shù)量為 NCPU 或者 NCPU + 1 時(shí)會(huì)獲得最優(yōu)的吞吐量,更多的線程也不會(huì)帶來(lái)帶來(lái)幫助,甚至性能還會(huì)下降,因?yàn)?CPU 需要頻繁的切換上下文。

一旦線程成功地到達(dá)柵欄,則 await() 方法會(huì)其標(biāo)記為 "子線程"。CyclicBarrier 的構(gòu)造器還接受額外的 Runnable 接口做回調(diào)函數(shù),當(dāng)所有線程全部到達(dá)柵欄之后,CyclicBarrier 會(huì)從子線程當(dāng)中挑選出一個(gè)領(lǐng)導(dǎo)線程去執(zhí)行它 ( 即,每一輪通過(guò)柵欄之后,它都會(huì)被執(zhí)行且僅一次 ),我們可以在此實(shí)現(xiàn)日志記錄等操作。

final var barrier =  new CyclicBarrier(N,()->{
    System.out.println("all runners ready");
});

在并行任務(wù)中構(gòu)建高效的緩存

為了用簡(jiǎn)單的例子說(shuō)明問(wèn)題,我們?cè)谶@里特別強(qiáng)調(diào)并行 ( Parallel ) 任務(wù),這些任務(wù)的計(jì)算過(guò)程是純粹 ( Pure ) 的 —— 這樣的函數(shù)被稱(chēng)之純函數(shù)。無(wú)論它們何時(shí)被調(diào)用,被哪個(gè)線程調(diào)用,同樣的輸入永遠(yuǎn)得到同樣的輸出。純函數(shù)不和外部環(huán)境交互,因此自然也就不存在競(jìng)態(tài)條件。

一個(gè)非常自然的想法是使用緩存 ( 或稱(chēng)記憶機(jī)制 Memorized ) 避免重復(fù)的運(yùn)算。在純粹的映射關(guān)系中,固定的輸入總是對(duì)應(yīng)固定的輸出,因此使用 K-V 鍵值對(duì)來(lái)記憶結(jié)果再好不過(guò)了。我們基于 HashMap 給出最簡(jiǎn)單的一版實(shí)現(xiàn),然后再探討如何改進(jìn)它們。

class MapCacheV1 {
    private final HashMap<Integer,String> cache = new HashMap<>();
    public synchronized String getResult(Integer id){
        var v = cache.get(id);
        if (v == null){
            // 設(shè)定中,這個(gè)靜態(tài)方法具有 500ms 左右的延遲。
            v = PURE.slowOperation(id);
            cache.put(id,v);
        }
        return v;
    }
}

盡管我們打算將 MapCache 用于無(wú)競(jìng)態(tài)條件的并行任務(wù),但 getResult() 方法仍然加上了同步鎖,因?yàn)?HashMap 本身不是線程安全的,cache 需要以安全的方式被并發(fā)訪問(wèn)。然而,這種做法無(wú)疑會(huì)使得 getResult() 方法變得十分笨重,因?yàn)樵究梢圆⑿械穆僮?nbsp;PURE.slowOperation() 也被鎖在了代碼塊內(nèi)部。

最先想到的是使用更加高效的 ConcurrentHashMap 類(lèi)取代線程不安全的 HashMap以獲得免費(fèi)的多線程性能提升:

class MapCacheV2 {
    private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>();
    public String getResult(Integer id){
        var v = cache.get(id);
        if(v == null){
            v = PURE.slowOperation(id);
            cache.put(id,v);
        }
        return v;
    }
}

同時(shí),我們這一次取消掉了 getResult() 上的同步鎖。這樣,多線程可以并行地執(zhí)行慢操作,只在修改 cache 時(shí)發(fā)生競(jìng)爭(zhēng)。但這個(gè)緩存仍有一些不足 —— 當(dāng)某個(gè)線程 A 在計(jì)算新值時(shí) ( 即這 500ms 之內(nèi) ),其它線程并不知道。因此,多個(gè)線程有可能會(huì)計(jì)算同一個(gè)新值,甚至導(dǎo)致其它的計(jì)算任務(wù)無(wú)法進(jìn)行。

針對(duì)這個(gè)問(wèn)題,我們?cè)僖淮翁岢龈倪M(jìn)。不妨讓 cache 保存 "計(jì)算過(guò)程",而非值。

這樣,工作線程將有三種行為:

  • 緩存中沒(méi)有此計(jì)算任務(wù),注冊(cè)并執(zhí)行。
  • 緩存中有此計(jì)算任務(wù),但未完畢,當(dāng)前線程阻塞 ( 將 CPU 讓給其它需要計(jì)算的線程 )。
  • 緩存中有此計(jì)算任務(wù),且已計(jì)算完畢,直接返回。

回顧前文在閉鎖中提到的 FutureTask<V> 類(lèi)型,它適合用于當(dāng)前的實(shí)現(xiàn),見(jiàn)下方的代碼:

class MapCacheV3 {
    private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>();
    public String getResult(Integer id) throws ExecutionException, InterruptedException {
        // 獲取一個(gè)計(jì)算任務(wù),而非值
        final var task = cache.get(id);
        if(task == null){
            final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));
            // cache.putIfAbsent()
            cache.put(id,newTask);
            newTask.run();
            // 提交并執(zhí)行任務(wù)。
            return newTask.get();
        }else return task.get();
    }
}

MapCacheV3 的實(shí)現(xiàn)已經(jīng)近乎完美了。唯一不足的是:我們對(duì) cache 的操作仍然是 "先判斷后執(zhí)行" 的復(fù)合操作,但現(xiàn)在 getResult 并沒(méi)有同步鎖的保護(hù)。兩個(gè)線程仍然同時(shí)調(diào)用 cache.get() 并判空,并開(kāi)始執(zhí)行重復(fù)的計(jì)算。

下面的版本給出了最終的解決方案:使用 ConcurrentMap 的 putIfAbsent() 原子方法修復(fù)可能重復(fù)添加計(jì)算任務(wù)的問(wèn)題。

class MapCacheV4 {
    private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>();
    public String getResult(Integer id) throws ExecutionException, InterruptedException {
        final var task = cache.get(id);
        if(task == null){
            final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));
            // put 和 putIfAbsent 方法均會(huì)返回此 Key 對(duì)應(yīng)的上一個(gè)舊值 Value。
            // 如果 put 的是一個(gè)新的 Key,則返回值為 null。
            final var running = cache.putIfAbsent(id,newTask);
            if(running == null) {newTask.run();return newTask.get();} else return running.get();
        }else return task.get();
    }
}

值得注意的是,一旦 cache 存儲(chǔ)的是計(jì)算任務(wù)而非值,那么就可能存在緩存污染的問(wèn)題。一旦某個(gè) FutureTask 的計(jì)算被取消,或者失敗,應(yīng)當(dāng)及時(shí)將它從緩存中移除以保證將來(lái)的計(jì)算成功,而不是放任其駐留在緩存內(nèi)部返回失敗的結(jié)果。

緩存思想幾乎應(yīng)用在各個(gè)地方。比如在 Web 服務(wù)中,用戶的數(shù)據(jù)往往不會(huì)總是直接來(lái)自數(shù)據(jù)庫(kù),而是 Redis 這樣的消息中間件。在實(shí)際的應(yīng)用環(huán)境下,還有更加復(fù)雜的問(wèn)題需要被考慮到,比如緩存內(nèi)容過(guò)時(shí) ( expired ),或者是定期清理緩存空間等。

到此這篇關(guān)于Java 同步工具與組合類(lèi)的線程安全性解析的文章就介紹到這了,更多相關(guān)Java 同步工具內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java字符串技巧之刪除標(biāo)點(diǎn)或最后字符的方法

    Java字符串技巧之刪除標(biāo)點(diǎn)或最后字符的方法

    這篇文章主要介紹了Java字符串技巧之刪除標(biāo)點(diǎn)或最后字符的方法,是Java入門(mén)學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-11-11
  • Java中List使用stream流轉(zhuǎn)成map的幾種方式詳解

    Java中List使用stream流轉(zhuǎn)成map的幾種方式詳解

    Stream是Java8中處理集合的關(guān)鍵抽象概念,它可以指定你希望對(duì)集合進(jìn)行的操作,可以執(zhí)行非常復(fù)雜的查找、過(guò)濾和映射數(shù)據(jù)等操作,下面這篇文章主要給大家介紹了關(guān)于Java中List使用stream流轉(zhuǎn)成map的幾種方式,需要的朋友可以參考下
    2023-04-04
  • Java中==與equals的區(qū)別小結(jié)

    Java中==與equals的區(qū)別小結(jié)

    這篇文章主要介紹了Java中==與equals的區(qū)別小結(jié),本文總結(jié)結(jié)論:== 與 equals()比較的內(nèi)容是不同的,equals()方式是String類(lèi)中的方法,它用于比較兩個(gè)對(duì)象引用所指的內(nèi)容是否相等,而 == 比較的是兩個(gè)對(duì)象引用的地址是否相等,需要的朋友可以參考下
    2015-06-06
  • java的arrays數(shù)組排序示例分享

    java的arrays數(shù)組排序示例分享

    排序算法,基本的高級(jí)語(yǔ)言都有一些提供。C語(yǔ)言有qsort()函數(shù),C++有sort()函數(shù),java語(yǔ)言有Arrays類(lèi)(不是Array)。用這些排序時(shí),都可以寫(xiě)自己的排序規(guī)則
    2014-02-02
  • Java集合包中的fail fast機(jī)制詳解

    Java集合包中的fail fast機(jī)制詳解

    這篇文章主要介紹了Java集合包中的fail fast機(jī)制詳解,當(dāng)我們使用iterator迭代器遍歷一個(gè)集合的過(guò)程中,如果其它線程,或者它自己向這個(gè)集合新增或刪除了一個(gè)key-value,那么當(dāng)前線程就會(huì)拋出ConcurrentModificationException異常,需要的朋友可以參考下
    2023-12-12
  • spring boot整合mybatis利用Mysql實(shí)現(xiàn)主鍵UUID的方法

    spring boot整合mybatis利用Mysql實(shí)現(xiàn)主鍵UUID的方法

    這篇文章主要給大家介紹了關(guān)于spring boot整合mybatis利用Mysql實(shí)現(xiàn)主鍵UUID的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-03-03
  • spring cloud consul使用ip注冊(cè)服務(wù)的方法示例

    spring cloud consul使用ip注冊(cè)服務(wù)的方法示例

    這篇文章主要介紹了spring cloud consul使用ip注冊(cè)服務(wù)的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • Mybatis查詢返回Map<String,Object>類(lèi)型的實(shí)現(xiàn)

    Mybatis查詢返回Map<String,Object>類(lèi)型的實(shí)現(xiàn)

    本文主要介紹了Mybatis查詢返回Map<String,Object>類(lèi)型的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • 圖文結(jié)合講解Java單例模式

    圖文結(jié)合講解Java單例模式

    這篇文章主要以圖文結(jié)合的方式為大家詳細(xì)介紹了Java單例模式的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • 又一波Java專(zhuān)業(yè)人士必備書(shū)籍來(lái)襲

    又一波Java專(zhuān)業(yè)人士必備書(shū)籍來(lái)襲

    又一波Java專(zhuān)業(yè)人士必備書(shū)籍來(lái)襲,這篇文章主要向大家推薦了Java專(zhuān)業(yè)人士必讀的書(shū),感興趣的小伙伴們不要錯(cuò)過(guò)
    2016-09-09

最新評(píng)論