欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

深入理解java線程通信

 更新時(shí)間:2019年05月29日 10:38:46   作者:crossoverJie  
開發(fā)中不免會(huì)遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場(chǎng)景?;蛘呤蔷€程 A 在執(zhí)行到某個(gè)條件通知線程 B 執(zhí)行某個(gè)操作。下面我們來一起學(xué)習(xí)如何解決吧

前言

開發(fā)中不免會(huì)遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場(chǎng)景。

或者是線程 A 在執(zhí)行到某個(gè)條件通知線程 B 執(zhí)行某個(gè)操作。

可以通過以下幾種方式實(shí)現(xiàn):

等待通知機(jī)制

等待通知模式是 Java 中比較經(jīng)典的線程通信方式。
兩個(gè)線程通過對(duì)同一對(duì)象調(diào)用等待 wait() 和通知 notify() 方法來進(jìn)行通訊。

如兩個(gè)線程交替打印奇偶數(shù):

public class TwoThreadWaitNotify {
private int start = 1;
private boolean flag = false;
public static void main(String[] args) {
TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();
Thread t1 = new Thread(new OuNum(twoThread));
t1.setName("A");
Thread t2 = new Thread(new JiNum(twoThread));
t2.setName("B");
t1.start();
t2.start();
}
/**
* 偶數(shù)線程
*/
public static class OuNum implements Runnable {
private TwoThreadWaitNotify number;
public OuNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("偶數(shù)線程搶到鎖了");
if (number.flag) {
System.out.println(Thread.currentThread().getName() + "+-+偶數(shù)" + number.start);
number.start++;
number.flag = false;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 奇數(shù)線程
*/
public static class JiNum implements Runnable {
private TwoThreadWaitNotify number;
public JiNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("奇數(shù)線程搶到鎖了");
if (!number.flag) {
System.out.println(Thread.currentThread().getName() + "+-+奇數(shù)" + number.start);
number.start++;
number.flag = true;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}


輸出結(jié)果:

t2+-+奇數(shù)93
t1+-+偶數(shù)94
t2+-+奇數(shù)95
t1+-+偶數(shù)96
t2+-+奇數(shù)97
t1+-+偶數(shù)98
t2+-+奇數(shù)99
t1+-+偶數(shù)100

這里的線程 A 和線程 B 都對(duì)同一個(gè)對(duì)象 TwoThreadWaitNotify.class 獲取鎖,A 線程調(diào)用了同步對(duì)象的 wait() 方法釋放了鎖并進(jìn)入 WAITING 狀態(tài)。

B 線程調(diào)用了 notify() 方法,這樣 A 線程收到通知之后就可以從 wait() 方法中返回。

這里利用了 TwoThreadWaitNotify.class 對(duì)象完成了通信。

有一些需要注意:

  • wait() 、nofify() 、nofityAll() 調(diào)用的前提都是獲得了對(duì)象的鎖(也可稱為對(duì)象監(jiān)視器)。
  • 調(diào)用 wait() 方法后線程會(huì)釋放鎖,進(jìn)入 WAITING 狀態(tài),該線程也會(huì)被移動(dòng)到等待隊(duì)列中。
  • 調(diào)用 notify() 方法會(huì)將等待隊(duì)列中的線程移動(dòng)到同步隊(duì)列中,線程狀態(tài)也會(huì)更新為 BLOCKED
  • 從 wait() 方法返回的前提是調(diào)用 notify() 方法的線程釋放鎖,wait() 方法的線程獲得鎖。

等待通知有著一個(gè)經(jīng)典范式:

線程 A 作為消費(fèi)者:

1.獲取對(duì)象的鎖。

2.進(jìn)入 while(判斷條件),并調(diào)用 wait() 方法。

3.當(dāng)條件滿足跳出循環(huán)執(zhí)行具體處理邏輯。

線程 B 作為生產(chǎn)者:

1.獲取對(duì)象鎖。

2.更改與線程 A 共用的判斷條件。

3.調(diào)用 notify() 方法。

偽代碼如下:

//Thread A
synchronized(Object){
while(條件){
Object.wait();
}
//do something
}
//Thread B
synchronized(Object){
條件=false;//改變條件
Object.notify();
}

join() 方法

private static void join() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
t1.start();
t2.start();
//等待線程1終止
t1.join();
//等待線程2終止
t2.join();
LOGGER.info("main over");
}

輸出結(jié)果:

2018-03-16 20:21:30.967 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 20:21:30.967 [Thread-0] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:21:34.972 [main] INFO c.c.actual.ThreadCommunication - main over

t1.join() 時(shí)會(huì)一直阻塞到 t1 執(zhí)行完畢,所以最終主線程會(huì)等待 t1 和 t2 線程執(zhí)行完畢。

其實(shí)從源碼可以看出,join() 也是利用的等待通知機(jī)制:

核心邏輯:

while (isAlive()) {
wait(0);
}

在 join 線程完成后會(huì)調(diào)用 notifyAll() 方法,是在 JVM 實(shí)現(xiàn)中調(diào)用,所以這里看不出來。

volatile 共享內(nèi)存

因?yàn)?Java 是采用共享內(nèi)存的方式進(jìn)行線程通信的,所以可以采用以下方式用主線程關(guān)閉 A 線程:

public class Volatile implements Runnable{
private static volatile boolean flag = true ;
@Override
public void run() {
while (flag){
System.out.println(Thread.currentThread().getName() + "正在運(yùn)行。。。");
}
System.out.println(Thread.currentThread().getName() +"執(zhí)行完畢");
}
public static void main(String[] args) throws InterruptedException {
Volatile aVolatile = new Volatile();
new Thread(aVolatile,"thread A").start();
System.out.println("main 線程正在運(yùn)行") ;
TimeUnit.MILLISECONDS.sleep(100) ;
aVolatile.stopThread();
}
private void stopThread(){
flag = false ;
}
}

輸出結(jié)果:

thread A正在運(yùn)行。。。
thread A正在運(yùn)行。。。
thread A正在運(yùn)行。。。
thread A正在運(yùn)行。。。
thread A執(zhí)行完畢

這里的 flag 存放于主內(nèi)存中,所以主線程和線程 A 都可以看到。

flag 采用 volatile 修飾主要是為了內(nèi)存可見性,更多內(nèi)容可以查看這里。

CountDownLatch 并發(fā)工具

CountDownLatch 可以實(shí)現(xiàn) join 相同的功能,但是更加的靈活。

private static void countDownLatch() throws Exception{
int thread = 3 ;
long start = System.currentTimeMillis();
final CountDownLatch countDown = new CountDownLatch(thread);
for (int i= 0 ;i<thread ; i++){
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
Thread.sleep(2000);
countDown.countDown();
LOGGER.info("thread end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
countDown.await();
long stop = System.currentTimeMillis();
LOGGER.info("main over total time={}",stop-start);
}

輸出結(jié)果:

2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012

CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 實(shí)現(xiàn)的,更多實(shí)現(xiàn)參考 ReentrantLock 實(shí)現(xiàn)原理

  • 初始化一個(gè) CountDownLatch 時(shí)告訴并發(fā)的線程,然后在每個(gè)線程處理完畢之后調(diào)用 countDown() 方法。
  • 該方法會(huì)將 AQS 內(nèi)置的一個(gè) state 狀態(tài) -1 。
  • 最終在主線程調(diào)用 await() 方法,它會(huì)阻塞直到state == 0的時(shí)候返回。

CyclicBarrier 并發(fā)工具

private static void cyclicBarrier() throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
Thread.sleep(5000);
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
LOGGER.info("main thread");
}

CyclicBarrier 中文名叫做屏障或者是柵欄,也可以用于線程間通信。

它可以等待 N 個(gè)線程都達(dá)到某個(gè)狀態(tài)后繼續(xù)運(yùn)行的效果。

1.首先初始化線程參與者。

2.調(diào)用await()將會(huì)在所有參與者線程都調(diào)用之前等待。

3.直到所有參與者都調(diào)用了await()后,所有線程從await()返回繼續(xù)后續(xù)邏輯。

運(yùn)行結(jié)果:

2018-03-18 22:40:00.731 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end do something

可以看出由于其中一個(gè)線程休眠了五秒,所有其余所有的線程都得等待這個(gè)線程調(diào)用 await() 。

該工具可以實(shí)現(xiàn) CountDownLatch 同樣的功能,但是要更加靈活。甚至可以調(diào)用 reset() 方法重置 CyclicBarrier (需要自行捕獲 BrokenBarrierException 處理) 然后重新執(zhí)行。

線程響應(yīng)中斷

public class StopThread implements Runnable {
@Override
public void run() {
while ( !Thread.currentThread().isInterrupted()) {
// 線程執(zhí)行具體邏輯
System.out.println(Thread.currentThread().getName() + "運(yùn)行中。。");
}
System.out.println(Thread.currentThread().getName() + "退出。。");
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new StopThread(), "thread A");
thread.start();
System.out.println("main 線程正在運(yùn)行") ;
TimeUnit.MILLISECONDS.sleep(10) ;
thread.interrupt();
}
}

輸出結(jié)果:

thread A運(yùn)行中。。
thread A運(yùn)行中。。
thread A退出。。

可以采用中斷線程的方式來通信,調(diào)用了 thread.interrupt() 方法其實(shí)就是將 thread 中的一個(gè)標(biāo)志屬性置為了 true。

并不是說調(diào)用了該方法就可以中斷線程,如果不對(duì)這個(gè)標(biāo)志進(jìn)行響應(yīng)其實(shí)是沒有什么作用(這里對(duì)這個(gè)標(biāo)志進(jìn)行了判斷)。

但是如果拋出了 InterruptedException 異常,該標(biāo)志就會(huì)被 JVM 重置為 false。

線程池 awaitTermination() 方法

如果是用線程池來管理線程,可以使用以下方式來讓主線程等待線程池中所有任務(wù)執(zhí)行完畢:

private static void executorService() throws Exception{
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.shutdown();
while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
LOGGER.info("線程還在執(zhí)行。。。");
}
LOGGER.info("main over");
}

輸出結(jié)果:

2018-03-16 20:18:01.273 [pool-1-thread-2] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO c.c.actual.ThreadCommunication - 線程還在執(zhí)行。。。
2018-03-16 20:18:03.278 [main] INFO c.c.actual.ThreadCommunication - 線程還在執(zhí)行。。。
2018-03-16 20:18:04.278 [main] INFO c.c.actual.ThreadCommunication - main over

使用這個(gè) awaitTermination() 方法的前提需要關(guān)閉線程池,如調(diào)用了 shutdown() 方法。

調(diào)用了 shutdown() 之后線程池會(huì)停止接受新任務(wù),并且會(huì)平滑的關(guān)閉線程池中現(xiàn)有的任務(wù)。

管道通信

public static void piped() throws IOException {
//面向于字符 PipedInputStream 面向于字節(jié)
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
//輸入輸出流建立連接
writer.connect(reader);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
for (int i = 0; i < 10; i++) {
writer.write(i+"");
Thread.sleep(10);
}
} catch (Exception e) {
} finally {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
int msg = 0;
try {
while ((msg = reader.read()) != -1) {
LOGGER.info("msg={}", (char) msg);
}
} catch (Exception e) {
}
}
});
t1.start();
t2.start();
}

輸出結(jié)果:

2018-03-16 19:56:43.014 [Thread-0] INFO c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9

Java 雖說是基于內(nèi)存通信的,但也可以使用管道通信。

需要注意的是,輸入流和輸出流需要首先建立連接。這樣線程 B 就可以收到線程 A 發(fā)出的消息了。

實(shí)際開發(fā)中可以靈活根據(jù)需求選擇最適合的線程通信方式。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論