欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java AQS信號量Semaphore的使用

 更新時間:2023年02月02日 10:00:24   作者:飛奔的小付  
Semaphore來自于JDK1.5的JUC包,直譯過來就是信號量,被作為一種多線程并發(fā)控制工具來使用。本文將詳解其原理與使用方法,感興趣的可以學習一下

一.什么是Semaphore

Semaphore,俗稱信號量,它是操作系統中PV操作的原語在java的實現,它也是基于AbstractQueuedSynchronizer實現的。

Semaphore的功能非常強大,大小為1的信號量就類似于互斥鎖,通過同時只能有一個線程獲取信號量實現。大小為n(n>0)的信號量可以實現限流的功能,它可以實現只能有n個線程同時獲取信號量。

PV操作是操作系統一種實現進程互斥與同步的有效方法。PV操作與信號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執(zhí)行的正確性。
P操作的主要動作是:
①S減1;
②若S減1后仍大于或等于0,則進程繼續(xù)執(zhí)行;
③若S減1后小于0,則該進程被阻塞后放入等待該信號量的等待隊列中,然后轉進程調度。
V操作的主要動作是:
①S加1;
②若相加后結果大于0,則進程繼續(xù)執(zhí)行;
③若相加后結果小于或等于0,則從該信號的等待隊列中釋放一個等待進程,然后再返回原進程繼續(xù)執(zhí)行或轉進程調度。

二.Semaphore的使用

構造器

  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
   }

permits 表示許可證的數量(資源數)

fair 表示公平性,如果這個設為 true 的話,下次執(zhí)行的線程會是等待最久的線程

常用方法

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)

  • acquire() 表示阻塞并獲取許可
  • tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的線程不會阻塞
  • release() 表示釋放許可
  • int availablePermits():返回此信號量中當前可用的許可證數。
  • int getQueueLength():返回正在等待獲取許可證的線程數。
  • boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
  • void reducePermit(int reduction):減少 reduction 個許可證
  • Collection getQueuedThreads():返回所有等待獲取許可證的線程集合

應用場景

可以用于做流量控制,特別是公用資源有限的應用場景

限流

/**
     * 實現一個同時只能處理5個請求的限流器
     */
    private static Semaphore semaphore = new Semaphore(5);
    /**
     * 定義一個線程池
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
    public static void exec() {
        try {
            semaphore.acquire(1);
            // 模擬真實方法執(zhí)行
            System.out.println("執(zhí)行exec方法" );
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        {
            for (; ; ) {
                Thread.sleep(100);
                // 模擬請求以10個/s的速度
                executor.execute(() -> exec());
            }
        }
    }

三.Semaphore源碼分析

主要關注 Semaphore的加鎖解鎖(共享鎖)邏輯實現,線程競爭鎖失敗入隊阻塞邏輯和獲取鎖的線程釋放鎖喚醒阻塞線程競爭鎖的邏輯實現

	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //嘗試獲取共享鎖,大于等于0則直接獲取鎖成功,小于0時則共享鎖阻塞
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared的實現

   final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                // 當減一之后的值小于0 或者 
                // compareAndSetState成功,把state變?yōu)閞emaining,即將狀態(tài)減一
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

入隊阻塞

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入隊,創(chuàng)建節(jié)點 使用共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            //獲取當前節(jié)點的前軀節(jié)點
                final Node p = node.predecessor();
                //如果節(jié)點為head節(jié)點
                if (p == head) {
                	//阻塞動作比較重,通常會再嘗試獲取資源,沒有獲取到返回負數
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //判斷是否可以阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

入隊操作

private Node addWaiter(Node mode) {
		//構建節(jié)點,模式是共享模式
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
        	//設置前一節(jié)點為tail 
            node.prev = pred;
            //設置當前節(jié)點為尾節(jié)點
            if (compareAndSetTail(pred, node)) {
            // 前一節(jié)點的next為當前節(jié)點
                pred.next = node;
                return node;
            }
        }
        //創(chuàng)建隊列
        enq(node);
        return node;
    }

創(chuàng)建隊列,經典的for循環(huán)創(chuàng)建雙向鏈表

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
            	//節(jié)點為空 則new一個節(jié)點 設置頭節(jié)點
                if (compareAndSetHead(new Node()))
                //把這個節(jié)點的頭節(jié)點賦值給尾節(jié)點
                    tail = head;
            } else {
            // 如果尾節(jié)點存在 就將該節(jié)點的前節(jié)點指向tail
                node.prev = t;
                //設置當前節(jié)點為tail
                if (compareAndSetTail(t, node)) {
                //前一個節(jié)點的next指向當前節(jié)點,入隊操作就完成了
                    t.next = node;
                    return t;
                }
            }
        }
    }

設置waitStatus狀態(tài)及獲取waitStatus狀態(tài),waitStatus為-1時可以喚醒后續(xù)節(jié)點

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 狀態(tài)是-1 就可以喚醒后續(xù)節(jié)點
             *
             */
            return true;
        if (ws > 0) {
            /*
             * 前置任務已取消,刪掉節(jié)點
             *
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * cas設置waitstatus狀態(tài),設置為-1
             * 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

阻塞 調用LockSupport.park

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

釋放鎖的邏輯

	public void release() {
        sync.releaseShared(1);
    }
	public final boolean releaseShared(int arg) {
		//cas成功則進行釋放共享鎖
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

state狀態(tài)+1操作,cas成功,返回true

		protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //頭節(jié)點不為空并且不是尾節(jié)點
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //waitstatus為-1
                if (ws == Node.SIGNAL) {
                //將SIGNAL置為0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;         
                    //喚醒   
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)                  
                break;
        }
    }

喚醒操作

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //ws小于0就將其設置為0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //當前節(jié)點的下一個節(jié)點為空或者ws大于0
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //s不為空 則進行喚醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

喚醒下一個線程之后,要把上一個節(jié)點移除隊列

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //下一個線程進來,如果前置節(jié)點是頭節(jié)點,則將前置節(jié)點出隊
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //cas獲取資源成功
                    if (r >= 0) {
                    	//出隊操作
                        setHeadAndPropagate(node, r);
                        //將p.next移除
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

出隊操作

private void setHeadAndPropagate(Node node, int propagate) {
		//設置當前節(jié)點為head節(jié)點,前一節(jié)點的head屬性被刪除
        Node h = head; 
        setHead(node);
        //如果是傳播屬性
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
           //并且是共享模式,可以持續(xù)喚醒下一個,
           //只要資源數充足 就可以一直往下喚醒,提高并發(fā)量
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
 private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

至此,線程的阻塞喚醒核心邏輯就這么多,共享鎖與獨占鎖的區(qū)別是可以喚醒后續(xù)的線程,如果資源數充足的話,可以一直往下喚醒,提高了并發(fā)量。

到此這篇關于Java AQS信號量Semaphore的使用的文章就介紹到這了,更多相關Java信號量Semaphore內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java集合基礎知識 List/Set/Map詳解

    Java集合基礎知識 List/Set/Map詳解

    這篇文章主要介紹了Java集合基礎知識 List/Set/Map,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-03-03
  • 如何通過jstack命令dump線程信息

    如何通過jstack命令dump線程信息

    這篇文章主要介紹了如何通過jstack命令dump線程信息,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-07-07
  • 一文帶你搞懂Java中線程的創(chuàng)建方式

    一文帶你搞懂Java中線程的創(chuàng)建方式

    這篇文章主要為大家詳細介紹了Java中線程的創(chuàng)建方式的相關知識,文中的示例代碼講解詳細,具有一定的借鑒價值,感興趣的小伙伴可以了解一下
    2023-03-03
  • Java操作Redis詳細介紹

    Java操作Redis詳細介紹

    這篇文章主要介紹了Java操作Redis詳細介紹,涉及對key的操作,string數據類型,list數據類型等相關內容,具有一定參考價值,需要的朋友可以了解下。
    2017-11-11
  • Java注解使用及原理解析

    Java注解使用及原理解析

    這篇文章主要介紹了Java注解使用及原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-06-06
  • 全面詳解Spring?Bean生命周期教程示例

    全面詳解Spring?Bean生命周期教程示例

    這篇文章主要為大家介紹了Spring?Bean生命周期的全面詳解教程示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-04-04
  • 教你如何正確了解java三大特性!!!!

    教你如何正確了解java三大特性!!!!

    所有的面向對象編程語言的思路都是差不多的,而這三大特性,則是思路中的支柱點,接下來我就重點講解了一下java三大特性,感興趣的朋友跟隨腳本之家小編一起看看吧
    2021-07-07
  • java版微信公眾平臺消息接口應用示例

    java版微信公眾平臺消息接口應用示例

    這篇文章主要介紹了java版微信公眾平臺消息接口應用,結合實例形式對比分析了PHP與java應用微信公眾平臺接口的相關調用與操作技巧,需要的朋友可以參考下
    2017-07-07
  • 使用Runnable實現數據共享

    使用Runnable實現數據共享

    這篇文章主要為大家詳細介紹了如何使用Runnable實現數據共享,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • Java中的內存模型JMM詳細解讀

    Java中的內存模型JMM詳細解讀

    這篇文章主要介紹了Java中的內存模型JMM詳細解讀,Java?對內存的抽象模型如下,每個線程都有一塊自己的私有內存(也稱為工作內存),當線程使用變量時,會把主內存里面的變量復制到工作內存,線程讀寫變量時操作的是自己工作內存中的變量,需要的朋友可以參考下
    2023-12-12

最新評論