Java并發(fā)之條件阻塞Condition的應(yīng)用代碼示例
本文研究的主要是Java并發(fā)之條件阻塞Condition的應(yīng)用示例代碼,具體如下。
Condition將Object監(jiān)視器方法(wait、notify 和 notifyAll)分解成截然不同的對(duì)象,以便通過(guò)將這些對(duì)象與任意Lock實(shí)現(xiàn)組合使用,為每個(gè)對(duì)象提供多個(gè)等待 set(wait-set)。其中,Lock 替代了synchronized方法和語(yǔ)句的使用,Condition替代了Object監(jiān)視器方法的使用。
1. Condition的基本使用
由于Condition可以用來(lái)替代wait、notify等方法,所以可以對(duì)比著之前寫(xiě)過(guò)的線程間通信的代碼來(lái)看,再來(lái)看一下原來(lái)那個(gè)問(wèn)題:
有兩個(gè)線程,子線程先執(zhí)行10次,然后主線程執(zhí)行5次,然后再切換到子線程執(zhí)行10,再主線程執(zhí)行5次……如此往返執(zhí)行50次。
之前用wait和notify來(lái)實(shí)現(xiàn)的,現(xiàn)在用Condition來(lái)改寫(xiě)一下,代碼如下:
public class ConditionCommunication {
public static void main(String[] args) {
Business bussiness = new Business();
new Thread(new Runnable() {
// 開(kāi)啟一個(gè)子線程
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
bussiness.sub(i);
}
}
}
).start();
// main方法主線程
for (int i = 1; i <= 50; i++) {
bussiness.main(i);
}
}
}
class Business {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//Condition是在具體的lock之上的
private Boolean bShouldSub = true;
public void sub(int i) {
lock.lock();
try {
while (!bShouldSub) {
try {
condition.await();
//用condition來(lái)調(diào)用await方法
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("sub thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = false;
condition.signal();
//用condition來(lái)發(fā)出喚醒信號(hào),喚醒某一個(gè)
}
finally {
lock.unlock();
}
}
public void main(int i) {
lock.lock();
try {
while (bShouldSub) {
try {
condition.await();
//用condition來(lái)調(diào)用await方法
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("main thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = true;
condition.signal();
//用condition來(lái)發(fā)出喚醒信號(hào)么,喚醒某一個(gè)
}
finally {
lock.unlock();
}
}
}
從代碼來(lái)看,Condition的使用時(shí)和Lock一起的,沒(méi)有Lock就沒(méi)法使用Condition,因?yàn)镃ondition是通過(guò)Lock來(lái)new出來(lái)的,這種用法很簡(jiǎn)單,只要掌握了synchronized和wait、notify的使用,完全可以掌握Lock和Condition的使用。
2. Condition的拔高
2.1 緩沖區(qū)的阻塞隊(duì)列
上面使用Lock和Condition來(lái)代替synchronized和Object監(jiān)視器方法實(shí)現(xiàn)了兩個(gè)線程之間的通信,現(xiàn)在再來(lái)寫(xiě)個(gè)稍微高級(jí)點(diǎn)應(yīng)用:模擬緩沖區(qū)的阻塞隊(duì)列。
什么叫緩沖區(qū)呢?舉個(gè)例子,現(xiàn)在有很多人要發(fā)消息,我是中轉(zhuǎn)站,我要幫別人把消息發(fā)出去,那么現(xiàn)在我 就需要做兩件事,一件事是接收用戶發(fā)過(guò)來(lái)的消息,并按順序放到緩沖區(qū),另一件事是從緩沖區(qū)中按順序取出用戶發(fā)過(guò)來(lái)的消息,并發(fā)送出去。
現(xiàn)在把這個(gè)實(shí)際的問(wèn)題抽象一下:緩沖區(qū)即一個(gè)數(shù)組,我們可以向數(shù)組中寫(xiě)入數(shù)據(jù),也可以從數(shù)組中把數(shù)據(jù)取走,我要做的兩件事就是開(kāi)啟兩個(gè)線程,一個(gè)存數(shù)據(jù),一個(gè)取數(shù)據(jù)。但是問(wèn)題來(lái)了,如果緩沖區(qū)滿了,說(shuō)明接收的消息太多了,即發(fā)送過(guò)來(lái)的消息太快了,我另一個(gè)線程還來(lái)不及發(fā)完,導(dǎo)致現(xiàn)在緩沖區(qū)沒(méi)地方放了,那么此時(shí)就得阻塞存數(shù)據(jù)這個(gè)線程,讓其等待;相反,如果我轉(zhuǎn)發(fā)的太快,現(xiàn)在緩沖區(qū)所有內(nèi)容都被我發(fā)完了,還沒(méi)有用戶發(fā)新的消息來(lái),那么此時(shí)就得阻塞取數(shù)據(jù)這個(gè)線程。
好了,分析完了這個(gè)緩沖區(qū)的阻塞隊(duì)列,下面就用Condition技術(shù)來(lái)實(shí)現(xiàn)一下:
class Buffer {
final Lock lock = new ReentrantLock();
//定義一個(gè)鎖
final Condition notFull = lock.newCondition();
//定義阻塞隊(duì)列滿了的Condition
final Condition notEmpty = lock.newCondition();
//定義阻塞隊(duì)列空了的Condition
final Object[] items = new Object[10];
//為了下面模擬,設(shè)置阻塞隊(duì)列的大小為10,不要設(shè)太大
int putptr, takeptr, count;
//數(shù)組下標(biāo),用來(lái)標(biāo)定位置的
//往隊(duì)列中存數(shù)據(jù)
public void put(Object x) throws InterruptedException {
lock.lock();
//上鎖
try {
while (count == items.length) {
System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時(shí)無(wú)法存數(shù)據(jù)!");
notFull.await();
//如果隊(duì)列滿了,那么阻塞存數(shù)據(jù)這個(gè)線程,等待被喚醒
}
//如果沒(méi)滿,按順序往數(shù)組中存
items[putptr] = x;
if (++putptr == items.length) //這是到達(dá)數(shù)組末端的判斷,如果到了,再回到始端
putptr = 0;
++count;
//消息數(shù)量
System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
notEmpty.signal();
//好了,現(xiàn)在隊(duì)列中有數(shù)據(jù)了,喚醒隊(duì)列空的那個(gè)線程,可以取數(shù)據(jù)啦
}
finally {
lock.unlock();
//放鎖
}
}
//從隊(duì)列中取數(shù)據(jù)
public Object take() throws InterruptedException {
lock.lock();
//上鎖
try {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!");
notEmpty.await();
//如果隊(duì)列是空,那么阻塞取數(shù)據(jù)這個(gè)線程,等待被喚醒
}
//如果沒(méi)空,按順序從數(shù)組中取
Object x = items[takeptr];
if (++takeptr == items.length) //判斷是否到達(dá)末端,如果到了,再回到始端
takeptr = 0;
--count;
//消息數(shù)量
System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
notFull.signal();
//好了,現(xiàn)在隊(duì)列中有位置了,喚醒隊(duì)列滿的那個(gè)線程,可以存數(shù)據(jù)啦
return x;
}
finally {
lock.unlock();
//放鎖
}
}
}
這個(gè)程序很經(jīng)典,我從官方JDK文檔中拿出來(lái)的,然后加了注釋。程序中定義了兩個(gè)Condition,分別針對(duì)兩個(gè)線程,等待和喚醒分別用不同的Condition來(lái)執(zhí)行,思路很清晰,程序也很健壯??梢钥紤]一個(gè)問(wèn)題,為啥要用兩個(gè)Codition呢?之所以這么設(shè)計(jì)肯定是有原因的,如果用一個(gè)Condition,現(xiàn)在假設(shè)隊(duì)列滿了,但是有2個(gè)線程A和B同時(shí)存數(shù)據(jù),那么都進(jìn)入了睡眠,好,現(xiàn)在另一個(gè)線程取走一個(gè)了,然后喚醒了其中一個(gè)線程A,那么A可以存了,存完后,A又喚醒一個(gè)線程,如果B被喚醒了,那就出問(wèn)題了,因?yàn)榇藭r(shí)隊(duì)列是滿的,B不能存的,B存的話就會(huì)覆蓋原來(lái)還沒(méi)被取走的值,就因?yàn)槭褂昧艘粋€(gè)Condition,存和取都用這個(gè)Condition來(lái)睡眠和喚醒,就亂了套。到這里,就能體會(huì)到這個(gè)Condition的用武之地了,現(xiàn)在來(lái)測(cè)試一下上面的阻塞隊(duì)列的效果:
public class BoundedBuffer {
public static void main(String[] args) {
Buffer buffer = new Buffer();
for (int i = 0; i < 5; i ++) {
//開(kāi)啟5個(gè)線程往緩沖區(qū)存數(shù)據(jù)
new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.put(new Random().nextint(1000));
//隨機(jī)存數(shù)據(jù)
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
).start();
}
for (int i = 0; i < 10; i ++) {
//開(kāi)啟10個(gè)線程從緩沖區(qū)中取數(shù)據(jù)
new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.take();
//從緩沖區(qū)取數(shù)據(jù)
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
).start();
}
}
}
我故意只開(kāi)啟5個(gè)線程存數(shù)據(jù),10個(gè)線程取數(shù)據(jù),就是想讓它出現(xiàn)取數(shù)據(jù)被阻塞的情況發(fā)生,看運(yùn)行的結(jié)果:
Thread-5 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
Thread-10 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
Thread-1 存好了值: 755
Thread-0 存好了值: 206
Thread-2 存好了值: 741
Thread-3 存好了值: 381
Thread-14 取出了值: 755
Thread-4 存好了值: 783
Thread-6 取出了值: 206
Thread-7 取出了值: 741
Thread-8 取出了值: 381
Thread-9 取出了值: 783
Thread-5 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
Thread-11 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
Thread-12 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
Thread-10 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
Thread-13 被阻塞了,暫時(shí)無(wú)法取數(shù)據(jù)!
從結(jié)果中可以看出,線程5和10搶先執(zhí)行,發(fā)現(xiàn)隊(duì)列中沒(méi)有,于是就被阻塞了,睡在那了,直到隊(duì)列中有新的值存入才可以取,但是它們兩運(yùn)氣不好,存的數(shù)據(jù)又被其他線程給搶先取走了,哈哈……可以多運(yùn)行幾次。如果想要看到存數(shù)據(jù)被阻塞,可以將取數(shù)據(jù)的線程設(shè)置少一點(diǎn),這里我就不設(shè)了。
2.2 兩個(gè)以上線程之間的喚醒
還是原來(lái)那個(gè)題目,現(xiàn)在讓三個(gè)線程來(lái)執(zhí)行,看一下題目:
有三個(gè)線程,子線程1先執(zhí)行10次,然后子線程2執(zhí)行10次,然后主線程執(zhí)行5次,然后再切換到子線程1執(zhí)行10次,子線程2執(zhí)行10次,主線程執(zhí)行5次……如此往返執(zhí)行50次。
如過(guò)不用Condition,還真不好弄,但是用Condition來(lái)做的話,就非常方便了,原理很簡(jiǎn)單,定義三個(gè)Condition,子線程1執(zhí)行完喚醒子線程2,子線程2執(zhí)行完喚醒主線程,主線程執(zhí)行完喚醒子線程1。喚醒機(jī)制和上面那個(gè)緩沖區(qū)道理差不多,下面看看代碼吧,很容易理解。
public class ThreeConditionCommunication {
public static void main(String[] args) {
Business bussiness = new Business();
new Thread(new Runnable() {
// 開(kāi)啟一個(gè)子線程
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
bussiness.sub1(i);
}
}
}
).start();
new Thread(new Runnable() {
// 開(kāi)啟另一個(gè)子線程
@Override
public void run() {
for (int i = 1; i <= 50; i++) {
bussiness.sub2(i);
}
}
}
).start();
// main方法主線程
for (int i = 1; i <= 50; i++) {
bussiness.main(i);
}
}
static class Business {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
//Condition是在具體的lock之上的
Condition condition2 = lock.newCondition();
Condition conditionMain = lock.newCondition();
private int bShouldSub = 0;
public void sub1(int i) {
lock.lock();
try {
while (bShouldSub != 0) {
try {
condition1.await();
//用condition來(lái)調(diào)用await方法
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("sub1 thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = 1;
condition2.signal();
//讓線程2執(zhí)行
}
finally {
lock.unlock();
}
}
public void sub2(int i) {
lock.lock();
try {
while (bShouldSub != 1) {
try {
condition2.await();
//用condition來(lái)調(diào)用await方法
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 10; j++) {
System.out.println("sub2 thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = 2;
conditionMain.signal();
//讓主線程執(zhí)行
}
finally {
lock.unlock();
}
}
public void main(int i) {
lock.lock();
try {
while (bShouldSub != 2) {
try {
conditionMain.await();
//用condition來(lái)調(diào)用await方法
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int j = 1; j <= 5; j++) {
System.out.println("main thread sequence of " + j
+ ", loop of " + i);
}
bShouldSub = 0;
condition1.signal();
//讓線程1執(zhí)行
}
finally {
lock.unlock();
}
}
}
}
代碼看似有點(diǎn)長(zhǎng),但是是假象,邏輯非常簡(jiǎn)單。關(guān)于線程中的Condition技術(shù)就總結(jié)這么多吧。
總結(jié)
以上就是本文關(guān)于Java并發(fā)之條件阻塞Condition的應(yīng)用代碼示例的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對(duì)本站的支持!
相關(guān)文章
SpringBoot項(xiàng)目中Date類型數(shù)據(jù)在接口返回的時(shí)間不正確的問(wèn)題解決
如果接口返回的Date類型時(shí)間與數(shù)據(jù)庫(kù)中datetime不一致,可能是由于沒(méi)有正確配置時(shí)區(qū)導(dǎo)致的,解決方法是在yaml配置文件中指定正確的日期格式和時(shí)區(qū)配置,修改配置并重啟項(xiàng)目后,可以獲得正確的時(shí)間,下面就來(lái)介紹一下2024-09-09
Spring Security內(nèi)存中認(rèn)證的實(shí)現(xiàn)
本文主要介紹了Spring Security內(nèi)存中認(rèn)證的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-11-11
Spring RedirectAttributes參數(shù)跳轉(zhuǎn)代碼實(shí)例
這篇文章主要介紹了Spring RedirectAttributes參數(shù)跳轉(zhuǎn)代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
詳解eclipse項(xiàng)目中.classpath文件的使用
這篇文章主要介紹了詳解eclipse項(xiàng)目中.classpath文件的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
Spring Batch讀取txt文件并寫(xiě)入數(shù)據(jù)庫(kù)的方法教程
這篇文章主要給大家介紹了Spring Batch讀取txt文件并寫(xiě)入數(shù)據(jù)庫(kù)的方法,SpringBatch 是一個(gè)輕量級(jí)、全面的批處理框架。這里我們用它來(lái)實(shí)現(xiàn)文件的讀取并將讀取的結(jié)果作處理,處理之后再寫(xiě)入數(shù)據(jù)庫(kù)中的功能。需要的朋友可以參考借鑒,下面來(lái)一起看看吧。2017-04-04

