欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程之鎖的強(qiáng)化學(xué)習(xí)

 更新時(shí)間:2023年02月26日 14:31:14   作者:劉架構(gòu)  
Java多線程的鎖都是基于對(duì)象的,Java中的每一個(gè)對(duì)象都可以作為一個(gè)鎖。這篇文章主要來(lái)通過(guò)一下示例為大家強(qiáng)化一下鎖的相關(guān)知識(shí)的掌握,希望對(duì)大家有所幫助

首先強(qiáng)調(diào)一點(diǎn):Java多線程的鎖都是基于對(duì)象的,Java中的每一個(gè)對(duì)象都可以作為一個(gè)鎖。同時(shí),類(lèi)鎖也是對(duì)象鎖,類(lèi)是Class對(duì)象

Java8鎖

核心思想

關(guān)鍵字在實(shí)例方法上,鎖為當(dāng)前實(shí)例

關(guān)鍵字在靜態(tài)方法上,鎖為當(dāng)前Class對(duì)象

關(guān)鍵字在代碼塊上,鎖為括號(hào)里面的對(duì)象

在進(jìn)行線程執(zhí)行順序的時(shí)候,如果添加了線程睡眠,那么就要看鎖的對(duì)象是誰(shuí),同一把鎖 / 非同一把鎖是不一樣的

Synchronized

synchronized 是Java提供的關(guān)鍵字,用來(lái)保證原子性的

synchronized的作用域如下

  • 作用在普通方法上,此方法為原子方法:也就是說(shuō)同一個(gè)時(shí)刻只有一個(gè)線程可以進(jìn)入,其他線程必須在方法外等待,此時(shí)鎖是對(duì)象
  • 作用在靜態(tài)方法上,此方法為原子方法:也就是說(shuō)同一個(gè)時(shí)刻只有一個(gè)線程可以進(jìn)入,其他線程必須在方法外等待,此時(shí)鎖是當(dāng)前的Class對(duì)象
  • 作用在代碼塊上,此代碼塊是原子操作:也就是說(shuō)同一個(gè)時(shí)刻只有線程可以進(jìn)入,其他線程必須在方法外等待,鎖是 synchronized(XXX) 里面的 XXX

先看一段簡(jiǎn)單的代碼

public class SynchronizedTest {
    public static void main(String[] args) {
        test1();
        test2();
    }

    // 使用synchronized修飾的方法
    public synchronized static void test1() {
        System.out.println("SynchronizedTest.test1");
    }

    // 使用synchronized修飾的代碼塊
    public static void test2() {
        synchronized (SynchronizedTest.class) {
            System.out.println("SynchronizedTest.test2");
        }
    }
}

執(zhí)行之后,對(duì)其進(jìn)行執(zhí)行javap -v命令反編譯

// 省略啰嗦的代碼
public class cn.zq.sync.SynchronizedTest
  minor version: 0
  major version: 52
  flags: ACC_PUBLIC, ACC_SUPER
{
  // 源碼
  public cn.zq.sync.SynchronizedTest();
    descriptor: ()V
    flags: ACC_PUBLIC
  // main 方法
  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC

  // synchronized 修飾的靜態(tài)方法 test1()
  public static synchronized void test1();
    descriptor: ()V
    // 在這里我們可以看到 flags 中有一個(gè) ACC_SYNCHRONIZED
    // 這個(gè)就是一個(gè)標(biāo)記符這是 保證原子性的關(guān)鍵
    // 當(dāng)方法調(diào)用的時(shí)候,調(diào)用指令將會(huì)檢查方法的 ACC_SYNCHRONIZED 訪問(wèn)標(biāo)記符是否被設(shè)置
    // 如果設(shè)置了,線程將先獲取 monitor,獲取成功之后才會(huì)執(zhí)行方法體,方法執(zhí)行之后,釋放monitor
    // 在方法執(zhí)行期間,其他任何線程都無(wú)法在獲得一個(gè) monitor 對(duì)象,本質(zhì)上沒(méi)區(qū)別。
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=0, args_size=0
         0: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #5                  // String SynchronizedTest.test1
         5: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
      LineNumberTable:
        line 17: 0
        line 18: 8

  // 代碼塊使用的 synchronized
  public static void test2();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=2, args_size=0
         0: ldc           #7                  // class cn/zq/sync/SynchronizedTest
         2: dup
         3: astore_0
         // 這個(gè) monitorenter 是一個(gè)指令
         // 每個(gè)對(duì)象都有一個(gè)監(jiān)視器鎖(monitor),當(dāng)monitor被占用的時(shí)候就會(huì)處于鎖定狀態(tài)
         // 線程執(zhí)行monitorenter的時(shí)候,嘗試獲取monitor的鎖。過(guò)程如下
         // 1.任何monitor進(jìn)入數(shù)為0,則線程進(jìn)入并設(shè)置為1,此線程就是monitor的擁有者
         // 2.如果線程已經(jīng)占用,當(dāng)前線程再次進(jìn)入的時(shí)候,會(huì)將monitor的次數(shù)+1
         // 3.如何其他的線程已經(jīng)占用了monitor,則線程進(jìn)阻塞狀態(tài),直到monitor的進(jìn)入數(shù)為0
         // 4.此時(shí)其他線程才能獲取當(dāng)前代碼塊的執(zhí)行權(quán)
         4: monitorenter
         5: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
         8: ldc           #8                  // String SynchronizedTest.test2
        10: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        13: aload_0
        // 執(zhí)行monitorexit這條指令的線程必須是擁有monitor的
        // 執(zhí)行的之后,monitor的進(jìn)入數(shù)-1.如果為0,那么線程就退出 monitor,不再是此代碼塊的執(zhí)行者
        // 此時(shí)再由其他的線程獲得所有權(quán)
        // 其實(shí) wait/notify 等方法也依賴(lài)于monitor對(duì)象,
        // 所以只有在同步方法或者同步代碼塊中才可以使用,否則會(huì)報(bào)錯(cuò) java.lang.IllegalMonitorstateException 異常
        14: monitorexit
        15: goto          23
        18: astore_1
        19: aload_0
        20: monitorexit
        21: aload_1
        22: athrow
        23: return
      Exception table:
         from    to  target type
             5    15    18   any
            18    21    18   any
      LineNumberTable:
        line 21: 0
        line 22: 5
        line 23: 13
        line 24: 23
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 18
          locals = [ class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4
}
SourceFile: "SynchronizedTest.java"

總結(jié):

使用synchronized修飾的同步方法

  • 通過(guò)反編譯我們可以看到,被synchronized修飾的方法,其中的 flags中有一個(gè)標(biāo)記:ACC_SYNCHRONIZED
  • 當(dāng)線程執(zhí)行方法的時(shí)候,會(huì)先去檢查是否有這樣的一個(gè)標(biāo)記,如果有的話,說(shuō)明就是一個(gè)同步方法,此時(shí)會(huì)為當(dāng)前線程設(shè)置 monitor ,獲取成功之后才會(huì)去執(zhí)行方法體,執(zhí)行完畢之后釋放monitor

使用synchronized修飾的代碼塊

  • 通過(guò)反編譯我們看到,在代碼塊的兩側(cè)有JVM指令,在進(jìn)入代碼塊之前指令是 monitorenter
  • 當(dāng)線程執(zhí)行到代碼塊的時(shí)候,會(huì)先拿到monitor(初始值為0),然后線程將其設(shè)置為1,此時(shí)當(dāng)前線程獨(dú)占monitor
  • 如果當(dāng)前持有monitor的線程再次進(jìn)入monitor,則monitor的值+1,當(dāng)其退出的時(shí)候,monitor的次數(shù)-1
  • 當(dāng)線程線程退出一次monitor的時(shí)候,會(huì)執(zhí)行monitorexit指令,但是只有持有monitor的線程才能獲取并執(zhí)行monitorexit指令,當(dāng)當(dāng)前線程monitor為0的時(shí)候,當(dāng)前線程退出持有鎖
  • 此時(shí)其他線程再來(lái)爭(zhēng)搶
  • 但是為什么要有兩個(gè) monitorexit呢?

這個(gè)時(shí)候我們會(huì)發(fā)現(xiàn)synchronized是可重入鎖,其實(shí)現(xiàn)原理就是monitor的個(gè)數(shù)增加和減少

同時(shí)wait / notify方法的執(zhí)行也會(huì)依賴(lài) monitor,所以wait和notify方法必須放在同步代碼塊中,否則會(huì)報(bào)錯(cuò) java.lang.IllegalMonitorstateException

因?yàn)榉椒▍^(qū)域很大,所以設(shè)置一個(gè)標(biāo)記,現(xiàn)在執(zhí)行完判斷之后,就全部鎖起來(lái),而代碼塊不確定大小,就需要細(xì)化monitor的范圍

ReentrantLock

ReentrantLock是Lock接口的一個(gè)實(shí)現(xiàn)類(lèi)

在ReentrantLock內(nèi)部有一個(gè)抽象靜態(tài)內(nèi)部類(lèi)Sync

其中一個(gè)是 NonfairSync(非公平鎖),另外一個(gè)是 FairSync (公平鎖),二者都實(shí)現(xiàn)了此抽象內(nèi)部類(lèi)Sync,ReentrantLock默認(rèn)使用的是 非公平鎖 ,我們看一下源碼:

public class ReentrantLock implements Lock, java.io.Serializable {

    // 鎖的類(lèi)型
    private final Sync sync;
    
    // 抽象靜態(tài)類(lèi)Sync繼承了AbstractQueueSynchroniser [這個(gè)在下面進(jìn)行解釋]
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        // 抽象加鎖方法
        abstract void lock();

        // 不公平的 tryLock 也就是不公平的嘗試獲取
        final boolean nonfairTryAcquire(int acquires) {
            // 獲取當(dāng)前線程對(duì)象
            final Thread current = Thread.currentThread();
            // 獲取線程的狀態(tài)
            int c = getState();
            // 根據(jù)線程的不同狀態(tài)執(zhí)行不同的邏輯
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 獲取獨(dú)占模式的線程的當(dāng)前鎖的狀態(tài)
            else if (current == getExclusiveOwnerThread()) {
                // 獲取新的層級(jí)大小
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 設(shè)置鎖的狀態(tài)
                setState(nextc);
                return true;
            }
            return false;
        }

        // 嘗試釋放方法
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        // 返回當(dāng)前線程是不是獨(dú)占的
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // 返回 ConditionObject 對(duì)象
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // 獲得獨(dú)占的線程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // 獲得獨(dú)占線程的狀態(tài)
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        // 判斷是否是加鎖的
        final boolean isLocked() {
            return getState() != 0;
        }

        // 序列化
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); 
        }
    }

    // 非公平鎖繼承了Sync
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        // 加鎖操作
        final void lock() {
            // 判斷是不是第一次加鎖 底層調(diào)用 Unsafe的compareAndSwapInt()方法
            if (compareAndSetState(0, 1))
                // 設(shè)置為獨(dú)占鎖
                setExclusiveOwnerThread(Thread.currentThread());
            // 如果不是第一次加鎖,則調(diào)用 acquire 方法在加一層鎖
            else
                acquire(1);
        }

        // 返回嘗試加鎖是否成功
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    // 公平鎖
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        // 加鎖操作,直接設(shè)置為1
        final void lock() {
            acquire(1);
        }

        // 嘗試加鎖
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

Lock接口

public interface Lock {
    // 加鎖
    void lock();
    // 不斷加鎖
    void lockInterruptibly() throws InterruptedException;
    // 嘗試加鎖
    boolean tryLock();
    // 嘗試加鎖,具有超時(shí)時(shí)間
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 釋放鎖
    void unlock();
    // Condition 對(duì)象
    Condition newCondition();
}

Condition接口

public interface Condition {
    // 等待
    void await() throws InterruptedException;
    // 超時(shí)等待
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 超時(shí)納秒等待
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 可中斷等待
    void awaitUninterruptibly();
    // 等待死亡
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // 指定喚醒
    void signal();
    // 喚醒所有
    void signalAll();
}

為什么官方提供的是非公平鎖,因?yàn)槿绻枪芥i,假如一個(gè)線程需要執(zhí)行很久,那執(zhí)行效率會(huì)大大降低

ReentrantLock的其他方法

public class ReentrantLock implements Lock, java.io.Serializable {

    // 鎖的類(lèi)型
    private final Sync sync;

    // 默認(rèn)是非公平鎖
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    // 有參構(gòu)造,可以設(shè)置鎖的類(lèi)型
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    // 加鎖
    public void lock() {
        sync.lock();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    // 解鎖 調(diào)用release() 因?yàn)槭侵厝腈i,所以需要減少重入的層數(shù)
    public void unlock() {
        sync.release(1);
    }

    // 返回Condition對(duì)象 ,用來(lái)執(zhí)行線程的喚醒等待等操作
    public Condition newCondition() {
        return sync.newCondition();
    }

    // 獲取鎖的層數(shù)
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    // 是否加鎖
    public boolean isLocked() {
        return sync.isLocked();
    }
    // 是否是公平鎖
    public final boolean isFair() {
        return sync instanceof FairSync;
    }
    // 獲取獨(dú)占鎖
    protected Thread getOwner() {
        return sync.getOwner();
    }
    // 查詢(xún)是否有任何線程正在等待獲取此鎖
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    // 查詢(xún)給定線程是否正在等待獲取此鎖
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }
    // 獲取隊(duì)列的長(zhǎng)度
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    // 返回一個(gè)包含可能正在等待獲取該鎖的線程的集合
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    // 判斷是否等待
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    // 獲得等待隊(duì)列的長(zhǎng)度
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    // 獲取正在等待的線程集合
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    // toString()
    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null) ?
                                   "[Unlocked]" :
                                   "[Locked by thread " + o.getName() + "]");
    }
}

總結(jié):

1.ReentrantLock是獨(dú)占鎖

2.ReentrantLock是可重入鎖

3.底層使用AbstractQueuedSynchronizer實(shí)現(xiàn)

4.synchronized 和 ReentrantLock的區(qū)別

  • synchronized是是關(guān)鍵字,可以作用在靜態(tài)方法、普通方法、靜態(tài)代碼塊,底層使用monitor實(shí)現(xiàn),synchronized是內(nèi)置鎖,是悲觀鎖,其發(fā)生異常會(huì)中斷鎖,所以不會(huì)發(fā)生死鎖。是非中斷鎖
  • ReentrantLock是類(lèi),作用在方法中,其比synchronized更加靈活,但是必須手動(dòng)加鎖釋放鎖,是樂(lè)觀鎖,發(fā)生異常不會(huì)中斷鎖,必須在finally中釋放鎖,是可中斷的,使用Lock的讀鎖可以提供效率

AQS

AQS:AbstractQueueSynchronizer => 抽象隊(duì)列同步器

AQS定義了一套多線程訪問(wèn)共享資源的同步器框架,很多同步器的實(shí)現(xiàn)都依賴(lài)AQS。如ReentrantLock、Semaphore、CountDownLatch …

首先看一下AQS隊(duì)列的框架

它維護(hù)了一個(gè)volatile int state (代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭(zhēng)搶資源被阻塞的時(shí)候會(huì)先進(jìn)進(jìn)入此隊(duì)列),這里的volatile是核心。在下個(gè)部分進(jìn)行講解~

state的訪問(wèn)方式有三種

  • getState()
  • setState()
  • compareAndSetState()

AQS定義了兩種資源共享方式:Exclusive(獨(dú)占,只有一個(gè)線程可以執(zhí)行,如ReentrantLock)和Share(共享,多個(gè)線程可同時(shí)執(zhí)行,如Semaphore、CountdownLatch)

不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義的同步器在實(shí)現(xiàn)的時(shí)候只需要實(shí)現(xiàn)共享資源的獲取和釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì))AQS在頂層已經(jīng)實(shí)現(xiàn)好了。

自定義同步器時(shí)需要實(shí)現(xiàn)以下方法即可

  • isHeldExclusively():該線程是否正在獨(dú)占資源。只有用的Condition才需要去實(shí)現(xiàn)它
  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功返回true,否則返回false
  • tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功返回true,否則返回false
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗,0表示成功但沒(méi)有剩余可用資源,正數(shù)表示成功,且還有剩余資源
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待節(jié)點(diǎn)返回true,否則返回fasle

以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()時(shí),會(huì)調(diào)用tryAcquire()獨(dú)占該鎖,然后將state+1,此后其他線程在調(diào)用tryAcquire()就會(huì)失敗,直到A線程unlock()到state為0為止,其他線程才有機(jī)會(huì)獲取該鎖。當(dāng)前在A釋放鎖之前,A線程是可以重復(fù)獲取此鎖的(state)會(huì)累加。這就是可重入,但是獲取多少次,就要釋放多少次。

再和CountdownLock為例,任務(wù)分為N個(gè)子線程去執(zhí)行,state也初始化為N(注意N要與線程的個(gè)數(shù)一致)。這N個(gè)子線程是并行執(zhí)行的,每個(gè)子線程執(zhí)行完之后countDown一次。state會(huì)CAS-1。等到所有的子線程都執(zhí)行完后(即state=0),會(huì)upark()主調(diào)用線程,然后主調(diào)用線程就會(huì)從await()函數(shù)返回,繼續(xù)剩余動(dòng)作

一般來(lái)說(shuō),自定義同步器要么是獨(dú)占方法,要么是共享方式,也只需要實(shí)現(xiàn)tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一組即可,但是AQS也支持自定義同步器同時(shí)實(shí)現(xiàn)獨(dú)占鎖和共享鎖兩種方式,如:ReentrantReadWriteLock

AQS的源碼

AbstractQueueSynchronizer 繼承了 AbstractOwnableSynchronizer

AbstractOwnableSynchronizer類(lèi)

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    // 獨(dú)占模式當(dāng)前的擁有者
    private transient Thread exclusiveOwnerThread;

    // 設(shè)置獨(dú)占模式當(dāng)前的擁有者
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    // 得到獨(dú)占模式當(dāng)前的擁有者
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractQueueSynchronizer類(lèi)

public abstract class AbstractQueuedSynchronizer    extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() { }

    // AbstractQueueSynchronizer 中的靜態(tài)內(nèi)部類(lèi) Node 節(jié)點(diǎn)
    static final class Node {

        // 指示節(jié)點(diǎn)正在以共享模式等待的標(biāo)記
        static final Node SHARED = new Node();

        // 指示節(jié)點(diǎn)正在以獨(dú)占模式等待的標(biāo)記
        static final Node EXCLUSIVE = null;

        // 表示線程已經(jīng)取消
        static final int CANCELLED =  1;

        // 表示線程之后需要釋放
        static final int SIGNAL    = -1;

        // 表示線程正在等待條件
        static final int CONDITION = -2;

        // 指示下一個(gè) acquireShared 應(yīng)該無(wú)條件傳播
        static final int PROPAGATE = -3;

        // 狀態(tài)標(biāo)記
        volatile int waitStatus;

        // 隊(duì)列的前一個(gè)節(jié)點(diǎn)
        volatile Node prev;

        // 隊(duì)列的后一個(gè)節(jié)點(diǎn)
        volatile Node next;

        // 線程
        volatile Thread thread;

        // 下一個(gè)正在等待的節(jié)點(diǎn)
        Node nextWaiter;

        // 判斷是否時(shí)共享的
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 返回上一個(gè)節(jié)點(diǎn),不能為null,為null拋出空指針異常
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        // 構(gòu)造
        Node() {    // Used to establish initial head or SHARED marker
        }

        // 有參構(gòu)造,用來(lái)添加線程的隊(duì)列
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        // 有參構(gòu)造,根據(jù)等待條件使用
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    // 頭節(jié)點(diǎn)
    private transient volatile Node head;

    // 尾節(jié)點(diǎn)
    private transient volatile Node tail;
    // 狀態(tài)
    private volatile int state;

    // 獲取當(dāng)前的狀態(tài)
    protected final int getState() {
        return state;
    }

    //設(shè)置當(dāng)前的狀態(tài)
    protected final void setState(int newState) {
        state = newState;
    }

    // 比較設(shè)置當(dāng)前的狀態(tài)
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // 納秒數(shù),使之更快的旋轉(zhuǎn)
    static final long spinForTimeoutThreshold = 1000L;

    // 將節(jié)點(diǎn)插入隊(duì)列
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // 加一個(gè)等待節(jié)點(diǎn)
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    // 設(shè)置頭節(jié)點(diǎn)
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    // 如果存在后繼節(jié)點(diǎn),就喚醒
    private void unparkSuccessor(Node node) {
        // 獲得節(jié)點(diǎn)的狀態(tài)
        int ws = node.waitStatus;
        // 如果為負(fù)數(shù),就執(zhí)行比較并設(shè)置方法設(shè)置狀態(tài)
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 喚醒后面的節(jié)點(diǎn)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    // 共享模式的釋放動(dòng)作,并且向后繼節(jié)點(diǎn)發(fā)出信號(hào)
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    // 設(shè)置隊(duì)列的頭,并檢查后繼者能否在共享模式下等待,如果可以,就是否傳播設(shè)置為>0或者propagate狀態(tài)
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    // 取消正在進(jìn)行的嘗試
    private void cancelAcquire(Node node) {
        // 節(jié)點(diǎn)為null,直接返回
        if (node == null)
            return;

        node.thread = null;

        // 跳過(guò)已經(jīng)取消的前一個(gè)節(jié)點(diǎn)
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

    // 還有好多方法... 其實(shí)本質(zhì)就是基于 隊(duì)列的判斷和操作,AQS提供了獨(dú)占鎖和共享鎖的設(shè)計(jì)
    // 在AQS中,使用到了Unsafe類(lèi),所以AQS其實(shí)就是基于CAS算法的,
    // AQS的一些方法就是直接調(diào)用 Unsafe 的方法 如下所示

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    // 比較并設(shè)置頭
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    // 比較并設(shè)置尾
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    // 比較并設(shè)置狀態(tài)
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    // 比較并設(shè)置下一個(gè)節(jié)點(diǎn)
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    // 除此之外 AQS 還有一個(gè)實(shí)現(xiàn)了Condition的類(lèi) 如下
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;

        // 條件隊(duì)列的第一個(gè)節(jié)點(diǎn)
        private transient Node firstWaiter;

        // 條件隊(duì)列的最后一個(gè)節(jié)點(diǎn)
        private transient Node lastWaiter;

        public ConditionObject() { }

        // 在等待隊(duì)列中添加一個(gè)新的節(jié)點(diǎn)
        private Node addConditionWaiter() {
            // 獲取最后一個(gè)節(jié)點(diǎn)
            Node t = lastWaiter;
            // 如果最后一個(gè)節(jié)點(diǎn)被取消了,就清除它
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
              return node;
        }

        // 刪除并轉(zhuǎn)移節(jié)點(diǎn)直到它沒(méi)有取消或者不為null
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        // 刪除所有的節(jié)點(diǎn)
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        // 取消節(jié)點(diǎn)的連接
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        // 將等待最長(zhǎng)的線程,喚醒
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        // 喚醒所有的等待線程
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        // 實(shí)現(xiàn)不間斷的條件等待
        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

        // 模式意味著在退出等待時(shí)重新中斷
        private static final int REINTERRUPT =  1;

        // 模式的含義是在退出等待時(shí)拋出InterruptedException異常
        private static final int THROW_IE    = -1;

        // 檢查中斷,如果在信號(hào)通知之前被中斷,則返回THROW_IE;
        // 如果在信號(hào)通知之后,則返回REINTERRUPT;如果未被中斷,則返回 0
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
        }

        // 拋出InterruptedException,重新中斷當(dāng)前線程,
        // 或不執(zhí)行任何操作,具體取決于模式。
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        // 實(shí)現(xiàn)不可中斷的條件等待
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        // 納秒級(jí)別的等待
        public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

        // 絕對(duì)定時(shí)等待
        public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() > abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        // 超時(shí)等待
        public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        // 判斷是不是獨(dú)占的
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        // 返回是否有正在等待的
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    return true;
            }
            return false;
        }

        // 獲得等待隊(duì)列的長(zhǎng)度
        protected final int getWaitQueueLength() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    ++n;
            }
            return n;
        }

        // 獲取所有正在等待的線程集合
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION) {
                    Thread t = w.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    }
}

總結(jié):

1.AQS為我們提供了很多實(shí)現(xiàn)。AQS內(nèi)部有兩個(gè)內(nèi)部類(lèi),ConditionObject和Node節(jié)點(diǎn)

2.和開(kāi)頭說(shuō)的一樣,其維護(hù)了一個(gè)state和一個(gè)隊(duì)列,也提供了獨(dú)占和共享的實(shí)現(xiàn)

3.總結(jié)一下流程

  • 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功就直接返回
  • 沒(méi)成功,則addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式
  • acquireQueued()使得線程在隊(duì)列中休息,有機(jī)會(huì)(輪到自己,會(huì)被unpark())會(huì)去嘗試獲取資源。獲取到資源之后才會(huì)返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),就返回true,否則返回false
  • 如果線程在等待過(guò)程中被中斷過(guò),它不是響應(yīng)的。只是獲取資源之后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上

4.release() 是獨(dú)占模式下線程共享資源的底層入口,它會(huì)釋放指定量的資源,如果徹底釋放了(state = 0)

5.如果獲取鎖的線程在release時(shí)異常了,沒(méi)有unpark隊(duì)列中的其他結(jié)點(diǎn),這時(shí)隊(duì)列中的其他結(jié)點(diǎn)會(huì)怎么辦?是不是沒(méi)法再被喚醒了?

這時(shí),隊(duì)列中等待鎖的線程將永遠(yuǎn)處于park狀態(tài),無(wú)法再被喚醒!

6.獲取鎖的線程在什么情形下會(huì)release拋出異常呢 ?

  • 線程突然死掉了?可以通過(guò)thread.stop來(lái)停止線程的執(zhí)行,但該函數(shù)的執(zhí)行條件要嚴(yán)苛的多,而且函數(shù)注明是非線程安全的,已經(jīng)標(biāo)明Deprecated;
  • 線程被interupt了?線程在運(yùn)行態(tài)是不響應(yīng)中斷的,所以也不會(huì)拋出異常;

7.acquireShared()的流程

  • tryAcquireShared()嘗試獲取資源,成功則直接返回;
  • 失敗則通過(guò)doAcquireShared()進(jìn)入等待隊(duì)列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個(gè)等待過(guò)程也是忽略中斷的。

8.releaseShared()

釋放掉資源之后,喚醒和后繼

7.不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了。自定義同步器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:

  • isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。

volatile

volatile是Java提供的關(guān)鍵字,是輕量級(jí)的同步機(jī)制 JSR133提出,Java5增強(qiáng)了語(yǔ)義

volatile關(guān)鍵字有三個(gè)重要的特點(diǎn)

  • 保證內(nèi)存可見(jiàn)性
  • 不保證原子性
  • 禁止指令重排序

提到volatile,就要提到JMM - 什么是JMM

JMM:Java Memory Model

本身就是一種抽象的概念,并不真實(shí)存在,它描述的是一組規(guī)范和規(guī)則,通過(guò)這種規(guī)則定義了程序的各個(gè)變量(包括實(shí)例字段、靜態(tài)字段、和構(gòu)造數(shù)組對(duì)象的元素)的訪問(wèn)方式

JMM關(guān)于同步的規(guī)定

  • 線程解鎖前,必須把共享變量的值刷新到主內(nèi)存
  • 線程加鎖前,必須讀取主內(nèi)存的最新的值到自己的工作內(nèi)存
  • 加鎖和解鎖必須是同一把鎖

happens-before 規(guī)則

前一個(gè)操作對(duì)下一個(gè)操作是完全可見(jiàn)的,如果下一個(gè)操作對(duì)下下一個(gè)操作完全可見(jiàn),那么前一個(gè)操作也對(duì)下下個(gè)操作可見(jiàn)

重排序

JVM對(duì)指令的執(zhí)行,會(huì)進(jìn)行優(yōu)化重新排序,可以發(fā)生在編譯重排序、CPU重排序

什么是內(nèi)存屏障?

內(nèi)存屏障分為2種

  • 讀屏障(LoadBarrier)
  • 寫(xiě)屏障(Store Barrier)

內(nèi)存屏障的作用

  • 阻止屏障兩側(cè)的指令重排序
  • 強(qiáng)制把緩沖區(qū) / 高速緩存中的臟數(shù)據(jù)寫(xiě)回主內(nèi)存,或者讓緩存中相應(yīng)的的數(shù)據(jù)失效

編譯器生成字節(jié)碼的時(shí)候,會(huì)在指令序列中插入內(nèi)存屏障來(lái)禁止特定類(lèi)型的處理器重排序。編譯器選擇了一個(gè)比較保守的JMM內(nèi)存屏障插入策略,這樣就可以保證在任何處理器平臺(tái),任何程序中都有正確的volatile語(yǔ)義

  • 在每個(gè)volatile寫(xiě)操作之前插入一個(gè)StoreStore屏障
  • 在每個(gè)volatile寫(xiě)操作之后入一個(gè)StoreLoad屏障
  • 在每個(gè)volatile讀操作之前插入一個(gè)LoadLoad屏障
  • 在每個(gè)volatile讀操作之前插入一個(gè)LoadStore屏障

原子性

  • 問(wèn):i++為什么不是線程安全的?
  • 因?yàn)?i++ 不是原子操作,i++有三個(gè)操作

如何解決?

  • 使用 synchronized
  • 使用AtomicInteger [JUC下的原子類(lèi)]

有序性

1.計(jì)算機(jī)在執(zhí)行程序的時(shí)候,為了提高性能,編譯器和處理器通常會(huì)對(duì)指令重排序,一般分為3種-

  • 源代碼 -> 編譯器優(yōu)化的重排 -> 指令并行的重排 -> 內(nèi)存系統(tǒng)的重排 -> 最終執(zhí)行的指令
  • 單線程環(huán)境里面確保程序最終執(zhí)行結(jié)果和代碼順序執(zhí)行的結(jié)果一致
  • 處理器在執(zhí)行重排序之前必須考慮指令之間的數(shù)據(jù)依賴(lài)性
  • 多線程環(huán)境種線程交替執(zhí)行,由于編譯器優(yōu)化重排序的存在,兩個(gè)線程中使用的變量能否保證一致性是無(wú)法確定的,結(jié)果無(wú)法預(yù)測(cè)

2.指令重排序

多線程環(huán)境種線程交替執(zhí)行,由于編譯器優(yōu)化重排序的存在,兩個(gè)線程中使用的變量能否保證一致性是無(wú)法確定的,結(jié)果無(wú)法預(yù)測(cè)此時(shí)使用volatile禁用指令重排序,就可以解決這個(gè)問(wèn)題

volatile的使用

單例設(shè)計(jì)模式中的 安全的雙重檢查鎖

volatile的底層實(shí)現(xiàn)

根據(jù)JMM,所有線程拿到的都是主內(nèi)存的副本,然后存儲(chǔ)到各自線程的空間,當(dāng)某一線程修改之后,立即修改主內(nèi)存,然后主內(nèi)存通知其他線程修改

Java代碼 instance = new Singleton();//instance 是 volatile 變量 匯編代碼:0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp); 有 volatile 變量修飾的共享變量進(jìn)行寫(xiě)操作的時(shí)候會(huì)多第二行匯編代碼,通過(guò)查 IA-32 架構(gòu)軟件開(kāi)發(fā)者手冊(cè)可知,lock 前綴的指令在多核處理器下會(huì)引發(fā)了兩件事情。將當(dāng)前處理器緩存行的數(shù)據(jù)會(huì)寫(xiě)回到系統(tǒng)內(nèi)存。這個(gè)寫(xiě)回內(nèi)存的操作會(huì)引起在其他 CPU 里緩存了該內(nèi)存地址的數(shù)據(jù)無(wú)效。

如果對(duì)聲明了volatile變量進(jìn)行寫(xiě)操作,JVM就會(huì)向處理器發(fā)送一條Lock前綴的指令,將這個(gè)變量所在緩存行的數(shù)據(jù)寫(xiě)回到系統(tǒng)內(nèi)存。但是就算寫(xiě)回到內(nèi)存,如果其他處理器緩存的值還是舊的,再執(zhí)行計(jì)算操作就會(huì)有問(wèn)題,所以在多處理器下,為了保證各個(gè)處理器的緩存是一致的,就會(huì)實(shí)現(xiàn)緩存一致性協(xié)議,每個(gè)處理器通過(guò)嗅探在總線上傳播的數(shù)據(jù)來(lái)檢查自己緩存的值是不是過(guò)期了,當(dāng)處理器發(fā)現(xiàn)自己緩存行對(duì)應(yīng)的內(nèi)存地址被修改,就會(huì)將當(dāng)前處理器的緩存行設(shè)置成無(wú)效狀態(tài),當(dāng)處理器要對(duì)這個(gè)數(shù)據(jù)進(jìn)行修改操作的時(shí)候,會(huì)強(qiáng)制重新從系統(tǒng)內(nèi)存里把數(shù)據(jù)讀到處理器緩存里。

自旋鎖 ,自旋鎖的其他種類(lèi)

CAS 自旋鎖

  • CAS(Compare And Swap)比較并替換,是線程并發(fā)運(yùn)行時(shí)用到的一種技術(shù)
  • CAS是原子操作,保證并發(fā)安全,而不能保證并發(fā)同步
  • CAS是CPU的一個(gè)指令(需要JNI調(diào)用Native方法,才能調(diào)用CPU的指令)
  • CAS是非阻塞的、輕量級(jí)的樂(lè)觀鎖

我們可以實(shí)現(xiàn)通過(guò)手寫(xiě)代碼完成CAS自旋鎖

CAS包括三個(gè)操作數(shù)

  • 內(nèi)存位置 - V
  • 期望值- A
  • 新值 - B

如果內(nèi)存位置的值與期望值匹配,那么處理器會(huì)自動(dòng)將該位置的值設(shè)置為新值,否則不做改變。無(wú)論是哪種情況,都會(huì)在CAS指令之前返回該位置的值。

public class Demo {
    volatile static int count = 0;

    public static void request() throws Exception {
        TimeUnit.MILLISECONDS.sleep(5);
        // 表示期望值
        int expectedCount;
        while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) {
        }
    }

    public static synchronized boolean compareAndSwap(int expectedCount, int newValue) {
        if (expectedCount == getCount()) {
            count = newValue;
            return true;
        }
        return false;
    }

    public static int getCount() {
        return count;
    }

    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        int threadSize = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i = 0; i < threadSize; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 10; j++) {
                        request();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("count :" + count + " 耗時(shí):" + (end - start));
    }
}

上述是我們自己書(shū)寫(xiě)的CAS自旋鎖,但是JDK已經(jīng)提供了響應(yīng)的方法

Java提供了 CAS 的支持,在 sun.misc.Unsafe 類(lèi)中,如下

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

參數(shù)說(shuō)明

  • var1:表示要操作的對(duì)象
  • var2:表示要操作對(duì)象中屬性地址的偏移量
  • var4:表示需要修改數(shù)據(jù)的期望的值
  • var5:表示需要修改為的新值

CAS的實(shí)現(xiàn)原理

CAS通過(guò)調(diào)用JNI的代碼實(shí)現(xiàn),JNI:Java Native Interface ,允許Java調(diào)用其他語(yǔ)言

而CompareAndSwapXxx系列的方法就是借助“C語(yǔ)言”CPU底層指令實(shí)現(xiàn)的

以常用的 Inter x86來(lái)說(shuō),最后映射到CPU的指令為“cmpxchg”,這個(gè)是一個(gè)原子指令,CPU執(zhí)行此命令的時(shí)候,實(shí)現(xiàn)比較并替換的操作

cmpxchg 如何保證多核心下的線程安全

系統(tǒng)底層進(jìn)行CAS操作的時(shí)候,會(huì)判斷當(dāng)前操作系統(tǒng)是否為多核心,如果是,就給“總線”加鎖,只有一個(gè)線程對(duì)總線加鎖,保證只有一個(gè)線程進(jìn)行操作,加鎖之后會(huì)執(zhí)行CAS操作,也就是說(shuō)CAS的原子性是平臺(tái)級(jí)別的

CAS這么強(qiáng),有沒(méi)有什么問(wèn)題?

高并發(fā)情況下,CAS會(huì)一直重試,會(huì)損耗性能

CAS的ABA問(wèn)題

CAS需要在操作值得時(shí)候檢查下值有沒(méi)有變化,如果沒(méi)有發(fā)生變化就更新,但是如果原來(lái)一個(gè)值為A,經(jīng)過(guò)一輪的操作之后,變成了B,然后又是一輪的操作,又變成了A,此時(shí)這個(gè)位置有沒(méi)有發(fā)生改變?改變了的,因?yàn)椴皇且恢笔茿,這就是ABA問(wèn)題

如何解決ABA問(wèn)題?

解決ABA問(wèn)題就是給值增加一個(gè)修改版本號(hào),每次值的變化,都會(huì)修改它的版本號(hào),CAS在操作的時(shí)候都會(huì)去對(duì)比此版本號(hào)。

下面給出一個(gè)ABA的案例

public class CasAbaDemo {
    public static AtomicInteger a = new AtomicInteger(1);

    public static void main(String[] args) {
        Thread main = new Thread(() -> {
            System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get());
            try {
                int executedNum = a.get();
                int newNum = executedNum + 1;
                TimeUnit.SECONDS.sleep(3);
                boolean isCasSuccess = a.compareAndSet(executedNum, newNum);
                System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主線程");

        Thread thread = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                a.incrementAndGet();
                System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get());
                a.decrementAndGet();
                System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "干擾線程");

        main.start();
        thread.start();
    }
}

Java中ABA解決辦法(AtomicStampedReference)

AtomicStampedReference 主要包含一個(gè)引用對(duì)象以及一個(gè)自動(dòng)更新的整數(shù) “stamp”的pair對(duì)象來(lái)解決ABA問(wèn)題

public class AtomicStampedReference<V> {

    private static class Pair<T> {
        // 數(shù)據(jù)引用
        final T reference;
        // 版本號(hào)
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

    private volatile Pair<V> pair;

    /**
     * 期望引用
     * @param expectedReference the expected value of the reference
     * 新值引用
     * @param newReference the new value for the reference
     * 期望引用的版本號(hào)
     * @param expectedStamp the expected value of the stamp
     * 新值的版本號(hào)
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            // 期望引用與當(dāng)前引用一致
            expectedReference == current.reference &&
            // 期望版本與當(dāng)前版本一致
            expectedStamp == current.stamp &&
            // 數(shù)據(jù)一致
            ((newReference == current.reference &&
              newStamp == current.stamp) 
             ||
             // 數(shù)據(jù)不一致
             casPair(current, Pair.of(newReference, newStamp)));
    }   
}

修改之后完成ABA問(wèn)題

public class CasAbaDemo02 {
    public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1);

    public static void main(String[] args) {
        Thread main = new Thread(() -> {
            System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference());
            try {
                Integer executedReference = a.getReference();
                Integer newReference = executedReference + 1;
                Integer expectStamp = a.getStamp();
                Integer newStamp = expectStamp + 1;
                TimeUnit.SECONDS.sleep(3);
                boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp);
                System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主線程");

        Thread thread = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference());
                a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1);
                System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "干擾線程");

        main.start();
        thread.start();
    }
}

到此這篇關(guān)于Java多線程之鎖的強(qiáng)化學(xué)習(xí)的文章就介紹到這了,更多相關(guān)Java多線程 鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論