Java并發(fā) 結合源碼分析AQS原理
前言:
如果說J.U.C包下的核心是什么?那我想答案只有一個就是AQS。那么AQS是什么呢?接下來讓我們一起揭開AQS的神秘面紗
AQS是什么?
AQS是AbstractQueuedSynchronizer的簡稱。為什么說它是核心呢?是因為它提供了一個基于FIFO的隊列和state變量來構建鎖和其他同步裝置的基礎框架。下面是其底層的數(shù)據(jù)結構。
AQS的特點
1、其內使用Node實現(xiàn)FIFO(FirstInFirstOut)隊列??捎糜跇嫿ㄦi或者其他同步裝置的基礎框架
2、且利用了一個int類表示狀態(tài)。在AQS中維護了一個volatile int state,通常表示有線程訪問資源的狀態(tài),當state>1的時候表示線程重入的數(shù)量,主要有三個方法控制:getState(),setState(),CompareAndSetState()。后面的源碼分析多用到這幾個方法
3、使用方法是繼承,子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài)(acquire和release)的方法操縱狀態(tài)。
4、同時實現(xiàn)排它鎖和共享鎖模式。實際上AQS功能主要分為兩類:獨占(只有一個線程能執(zhí)行)和共享(多個線程同時執(zhí)行),它的子類要么使用獨占功能要么使用共享功能,而ReentrantLock是通過兩個內部類來實現(xiàn)獨占和共享
CountDownLatch如何借助AQS實現(xiàn)計數(shù)功能?
先來說一下CountDownLatch,CountDownLatch是一個同步輔助類,通過它可以來完成類似阻塞當前線程的功能,即一個或多個線程一起等待,直到其他線程執(zhí)行的操作完成。要實現(xiàn)上面的功能,CountDownLatch是通過一個給定的原子操作的計數(shù)器來實現(xiàn)。調用該類的await()方法的線程會一直處于阻塞狀態(tài),直到其他線程調用countDown()方法使得計數(shù)器的值變?yōu)?之后線程才會執(zhí)行,這個計數(shù)器是不能被重置的。通常這個類會用在程序執(zhí)行需要等待某個條件完成的場景,比如說并行計算,可將一個數(shù)據(jù)量很大的計算拆分成一個個子任務,當子任務完成之后,再將最終的結果匯總。每次訪問CountDownLatch只能有一個線程,但是這個線程在使用完countDown()方法之后能多個線程能繼續(xù)運行,而調用await()方法的線程就一定要計數(shù)器為0才會運行
下面來分析CountDownLatch的源碼以及如何使用AQS框架
public class CountDownLatch {
/**
* CountDownLatch 實現(xiàn)同步控制
* 底層是使用AQS的state來代表count
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//初始化內部類實際上是設置AQS的state
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//嘗試獲取共享是看當前的state是否為0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/*嘗試釋放共享鎖則是遞減計數(shù)直到state==0就返回false代表資源已經釋放完全否則就會使用CAS來讓state減一*/
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* 初始化CountDownLatch,實際上是初始化內部類,實際上是設置AQS的state,count不能小于0
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 這里實際上是調用了AQS里的acquireSharedInterruptibly方法,完成的功能就是先去查看線程是否被中斷,中斷則拋出異常,沒有被中斷就會嘗試獲取共享資源。 * 注意在syn內部類中重寫了tryAcquireShared,也就是當state為0就返回1,這時候就會將當前線程放入AQS的隊列中去,也就是這時候線程可以不再阻塞而是嘗試去獲取鎖
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 原理同上面方法,但是加了一個時間參數(shù)來設置等待的時間
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 這里傳入?yún)?shù)為1,同樣上面內部類一樣重寫了AQS的tryReleaseShared方法,使用這個重寫的方法來讓計數(shù)器原子操作的減一
*/
public void countDown() {
sync.releaseShared(1);
}
/**
* 就是獲取AQS的state
*/
public long getCount() {
return sync.getCount();
}
/**
* 轉換成字符串的方法
*/
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
由上面代碼可看見CountDownLatch實現(xiàn)了AQS的共享鎖,原理是操作state來實現(xiàn)計數(shù),并且重寫了tryAcquireShared(),tryReleaseShared()等方法
Semaphore是如何借助AQS實現(xiàn)控制并發(fā)訪問線程個數(shù)?
Semaphore的功能類似于操作系統(tǒng)的信號量,可以很方便的控制某個資源同時被幾個線程訪問,即做并發(fā)訪問控制,與CountDownLatch類似,同樣是實現(xiàn)獲取和釋放兩個方法。Semaphore的使用場景:常用于僅能提供訪問的資源,比如數(shù)據(jù)庫的連接數(shù)最大只有30,而應用程序的并發(fā)數(shù)可能遠遠大于30,這時候就可以使用Semaphore來控制同時訪問的線程數(shù)。當Semaphore控制線程數(shù)到1的時候就和我們單線程一樣了。同樣Semaphore說是信號量的意思,我們這里就可以把它理解為十字路口的紅綠燈,可以控制車流量(這里是控制線程數(shù))
下面來分析Semaphore的源碼以及如何使用AQS框
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** 所有機制都通過AbstractQueuedSynchronizer子類實現(xiàn) */
private final Sync sync;
/**
* 同樣是通過內部類來實現(xiàn)AQS主要功能,使用state來表示許可證數(shù)量
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
/*
* 不公平的獲取方式,會有一個搶占鎖的情況,即線程執(zhí)行順序會亂
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/*
* 釋放資源
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* 不公平的sync版本,使用的就是sync定義的不公平鎖
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平版本,獲取鎖的線程順序就是線程啟動的順序。具體是使用hasQueuedPredecessors()方法判斷“當前線程”是不是CLH隊列中的第一個線程。 * 若不是的話,則返回-1,是就設置獲取許可證,并檢查許可證數(shù)量是否足夠
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
/**
* 默認使用不公平的版本,如果需要公平的,則需要兩個參數(shù)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 分析同CountDownLatch中的類似方法,具體的實現(xiàn)都是內部類中的獲取方法,這里是獲取一個許可
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
*功能同上,但是這里不會檢測線程是否被中斷
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* 嘗試獲取
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* 在一段時間內一直嘗試獲取許可
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 當前線程釋放一個許可證
*/
public void release() {
sync.releaseShared(1);
}
/**
* 可以規(guī)定一個線程獲得許可證的數(shù)量
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* 同樣可以規(guī)定一個線程釋放許可證的數(shù)量
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/**
* 當前的許可還剩幾個
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* 銷毀所有許可
*/
public int drainPermits() {
return sync.drainPermits();
}
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}}
上面對于Semaphore的一些重要內部類和常用方法進行了解釋,與CountDownLatch很類似,實現(xiàn)的都是共享的功能,即Semaphore允許得到許可證的線程同時執(zhí)行,而CountDownLatch允許調用countDown()方法的線程同時執(zhí)行。并且都是通過內部類實現(xiàn)的。相信看到這里,你能越來越看見AQS為什么被稱作JUC包的核心。下面就來介紹一下ReentrantLock
ReentrantLock是如何借助AQS實現(xiàn)鎖機制
ReentrantLock是可重入鎖,前面博客中寫到synchronized實現(xiàn)的鎖也是可重入的。不過synchronized是基于JVM指令實現(xiàn),而ReentrantLock是使用Java代碼實現(xiàn)的。ReentrantLock重點就是需要我們手動聲明加鎖和釋放鎖,如果手工忘記釋放鎖,很有可能就會導致死鎖,即資源永遠都被鎖住,其他線程無法得到,當前線程也釋放不出去。ReentrantLock實現(xiàn)的是自旋鎖,通過循環(huán)調用CAS操作實現(xiàn)加鎖,避免了線程進入內核態(tài)的阻塞狀態(tài),所以性能較好。ReentrantLock內部同樣實現(xiàn)了公平鎖和非公平鎖。事實上Synchronized能做的ReentrantLock都能做,但是反過來就不一樣了、
經過前面的源碼分析我們發(fā)現(xiàn)核心的都在當前類的內部類里,而當前類的一些方法不過是使用的內部類以及AQS的方法罷了,所以下面我們就來分析ReentrantLock中的三個內部類。
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** 同步的機制都是通過內部類來實現(xiàn)的 */
private final Sync sync;
/**
* 在ReentrantLock中state表示的是線程重入鎖的次數(shù),當state為0時才能釋放鎖
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 這個抽象方法提供給公平鎖和不公平鎖來單獨實現(xiàn),父類不實現(xiàn)
*/
abstract void lock();
/**
* 首先得到當前線程,而后獲取state,如果state為0,也就是沒有線程獲得當前鎖,那么就設置當前線程擁有當前鎖的獨占訪問權,并且返回true。 * 如果state不為0,那么就看當前線程是否是已經獲得過鎖的線程,如果是就讓state+=acquire,acquire一般是1,即表示線程重入并且返回true。 * 上面兩個條件都不滿足就代表是鎖被其他線程獲取了,當前線程獲取不到,所以返回false
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 先判斷當前線程等不等于擁有鎖的線程,不等于就會拋異常,也就是釋放不了。 * 等于之后就看state-releases是否為0,當為0的時候就代表釋放完全。 * 可以設置鎖的狀態(tài)為沒有線程擁有,從而讓鎖能被其他線程競爭,否則就設置state,代表線程重入該鎖,并且線程還沒釋放完全。
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/*
*該方法檢驗當前線程是否是鎖的獨占者
*/
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
/*
*該方法是創(chuàng)建一個條件鎖,本文不做具體分析
*/
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* 使得該類從流中能重構實例,并且會重置為解鎖狀態(tài)
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
/**
* Sync 的不公平版本
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 將state從0更新到1成功的話就讓當前線程獲取鎖,否則就會嘗試獲得鎖和獲取當前節(jié)點的前一節(jié)點,并判斷這一個節(jié)點是否為頭節(jié)點,即當前線程是不是頭節(jié)點的直接后繼。 * 如果兩個中有一個失敗則線程中斷,進入阻塞狀態(tài)
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Sync 的公平版本
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* 嘗試獲得鎖和獲取當前節(jié)點的前一節(jié)點,并判斷這一個節(jié)點是否為頭節(jié)點,即當前線程是不是頭節(jié)點的直接后繼,如果兩個中有一個失敗則線程中斷,進入阻塞狀態(tài)。 * 也就是一定按照隊列中線程的順序來實現(xiàn)
*/
final void lock() {
acquire(1);
}
/**
* 跟不公平的版本相比其實是在state為0的時候檢查當前線程是不是在隊列的頭部節(jié)點的直接后繼,來達到公平的概念
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
ReentrantLock和上面兩個類最不同的莫過于ReentrantLock使用的是獨占功能,即每次只能有一個線程來獲取ReentrantLock類。ReentrantLock類下還有很多方法,這里就不一一介紹,但是本質都是內部類中的實現(xiàn)以及AQS的一些調用
總結:
AQS只是一個基礎的框架,里面最核心的就是維護了state變量和CHL隊列,而其他的類全部都是通過繼承的方法進行擴展,雖然沒有直接說源碼,但是通過上面三個主要類的源碼分析再去看AQS已經不是難事。繼承主要改變的就是獲取和釋放的方法,通過這兩個方法來對state和隊列進行操作達到我們能夠進行的并發(fā)控制的功能,事實上J.U.C包下的類和能夠實現(xiàn)的功能遠不止這三個,后面會選擇重點的來介紹。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
SpringBoot MainApplication類文件的位置詳解
這篇文章主要介紹了SpringBoot MainApplication類文件的位置詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
maven利用tomcat插件部署遠程Linux服務器的步驟詳解
Maven已經是Java的項目管理常用方式,下面這篇文章主要給大家介紹了關于maven利用tomcat插件部署遠程Linux服務器的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面隨著小編來一起學習學習吧。2017-11-11

