Java多線程之異步Future機(jī)制的原理和實(shí)現(xiàn)
項(xiàng)目中經(jīng)常有些任務(wù)需要異步(提交到線程池中)去執(zhí)行,而主線程往往需要知道異步執(zhí)行產(chǎn)生的結(jié)果,這時(shí)我們要怎么做呢?用runnable是無(wú)法實(shí)現(xiàn)的,我們需要用callable看下面的代碼:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AddTask implements Callable<Integer> { private int a,b; public AddTask(int a, int b) { this.a = a; this.b = b; } @Override public Integer call throws Exception { Integer result = a + b; return result; } public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor; //JDK目前為止返回的都是FutureTask的實(shí)例 Future<Integer> future = executor.submit(new AddTask(1, 2)); Integer result = future.get;// 只有當(dāng)future的狀態(tài)是已完成時(shí)(future.isDone = true),get方法才會(huì)返回 } }
雖然可以實(shí)現(xiàn)獲取異步執(zhí)行結(jié)果的需求,但是我們發(fā)現(xiàn)這個(gè)Future其實(shí)很不好用,因?yàn)樗鼪]有提供通知的機(jī)制,也就是說(shuō)我們不知道future什么時(shí)候完成(如果我們需要輪詢isDone()來(lái)判斷的話感覺就沒有用這個(gè)的必要了)。看下java.util.concurrent.future.Future 的接口方法:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled; boolean isDone; V get throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
由此可見JDK的Future機(jī)制其實(shí)并不好用,如果能給這個(gè)future加個(gè)監(jiān)聽器,讓它在完成時(shí)通知監(jiān)聽器的話就比較好用了,就像下面這個(gè)IFuture:
package future; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * The result of an asynchronous operation. * * @author lixiaohui * @param <V> 執(zhí)行結(jié)果的類型參數(shù) */ public interface IFuture<V> extends Future<V> { boolean isSuccess; // 是否成功 V getNow; //立即返回結(jié)果(不管Future是否處于完成狀態(tài)) Throwable cause; //若執(zhí)行失敗時(shí)的原因 boolean isCancellable; //是否可以取消 IFuture<V> await throws InterruptedException; //等待future的完成 boolean await(long timeoutMillis) throws InterruptedException; // 超時(shí)等待future的完成 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException; IFuture<V> awaitUninterruptibly; //等待future的完成,不響應(yīng)中斷 boolean awaitUninterruptibly(long timeoutMillis);//超時(shí)等待future的完成,不響應(yīng)中斷 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit); IFuture<V> addListener(IFutureListener<V> l); //當(dāng)future完成時(shí),會(huì)通知這些加進(jìn)來(lái)的監(jiān)聽器 IFuture<V> removeListener(IFutureListener<V> l); }
接下來(lái)就一起來(lái)實(shí)現(xiàn)這個(gè)IFuture,在這之前要說(shuō)明下Object.wait,Object.notifyAll方法,因?yàn)檎麄€(gè)Future實(shí)現(xiàn)的原���的核心就是這兩個(gè)方法.看看JDK里面的解釋:
public class Object { /** * Causes the current thread to wait until another thread invokes the * {@link java.lang.Object#notify} method or the * {@link java.lang.Object#notifyAll} method for this object. * In other words, this method behaves exactly as if it simply * performs the call {@code wait(0)}. * 調(diào)用該方法后,當(dāng)前線程會(huì)釋放對(duì)象監(jiān)視器鎖,并讓出CPU使用權(quán)。直到別的線程調(diào)用notify/notifyAll */ public final void wait throws InterruptedException { wait(0); } /** * Wakes up all threads that are waiting on this object's monitor. A * thread waits on an object's monitor by calling one of the * {@code wait} methods. * <p> * The awakened threads will not be able to proceed until the current * thread relinquishes the lock on this object. The awakened threads * will compete in the usual manner with any other threads that might * be actively competing to synchronize on this object; for example, * the awakened threads enjoy no reliable privilege or disadvantage in * being the next thread to lock this object. */ public final native void notifyAll; }
知道這個(gè)后,我們要自己實(shí)現(xiàn)Future也就有了思路,當(dāng)線程調(diào)用了IFuture.await等一系列的方法時(shí),如果Future還未完成,那么就調(diào)用future.wait 方法使線程進(jìn)入WAITING狀態(tài)。而當(dāng)別的線程設(shè)置Future為完成狀態(tài)(注意這里的完成狀態(tài)包括正常結(jié)束和異常結(jié)束)時(shí),就需要調(diào)用future.notifyAll方法來(lái)喚醒之前因?yàn)檎{(diào)用過(guò)wait方法而處于WAITING狀態(tài)的那些線程。完整的實(shí)現(xiàn)如下(代碼應(yīng)該沒有很難理解的地方,我是參考netty的Future機(jī)制的。有興趣的可以去看看netty的源碼):
package future; import java.util.Collection; import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * <pre> * 正常結(jié)束時(shí), 若執(zhí)行的結(jié)果不為null, 則result為執(zhí)行結(jié)果; 若執(zhí)行結(jié)果為null, 則result = {@link AbstractFuture#SUCCESS_SIGNAL} * 異常結(jié)束時(shí), result為 {@link CauseHolder} 的實(shí)例;若是被取消而導(dǎo)致的異常結(jié)束, 則result為 {@link CancellationException} 的實(shí)例, 否則為其它異常的實(shí)例 * 以下情況會(huì)使異步操作由未完成狀態(tài)轉(zhuǎn)至已完成狀態(tài), 也就是在以下情況發(fā)生時(shí)調(diào)用notifyAll方法: * <ul> * <li>異步操作被取消時(shí)(cancel方法)</li> * <li>異步操作正常結(jié)束時(shí)(setSuccess方法)</li> * <li>異步操作異常結(jié)束時(shí)(setFailure方法)</li> * </ul> * </pre> * * @author lixiaohui * * @param <V> * 異步執(zhí)行結(jié)果的類型 */ public class AbstractFuture<V> implements IFuture<V> { protected volatile Object result; // 需要保證其可見性 /** * 監(jiān)聽器集 */ protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>; /** * 當(dāng)任務(wù)正常執(zhí)行結(jié)果為null時(shí), 即客戶端調(diào)用{@link AbstractFuture#setSuccess(null)}時(shí), * result引用該對(duì)象 */ private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal; @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone) { // 已完成了不能取消 return false; } synchronized (this) { if (isDone) { // double check return false; } result = new CauseHolder(new CancellationException); notifyAll; // isDone = true, 通知等待在該對(duì)象的wait的線程 } notifyListeners; // 通知監(jiān)聽器該異步操作已完成 return true; } @Override public boolean isCancellable { return result == null; } @Override public boolean isCancelled { return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } @Override public boolean isDone { return result != null; } @Override public V get throws InterruptedException, ExecutionException { await; // 等待執(zhí)行結(jié)果 Throwable cause = cause; if (cause == null) { // 沒有發(fā)生異常,異步操作正常結(jié)束 return getNow; } if (cause instanceof CancellationException) { // 異步操作被取消了 throw (CancellationException) cause; } throw new ExecutionException(cause); // 其他異常 } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) {// 超時(shí)等待執(zhí)行結(jié)果 Throwable cause = cause; if (cause == null) {// 沒有發(fā)生異常,異步操作正常結(jié)束 return getNow; } if (cause instanceof CancellationException) {// 異步操作被取消了 throw (CancellationException) cause; } throw new ExecutionException(cause);// 其他異常 } // 時(shí)間到了異步操作還沒有結(jié)束, 拋出超時(shí)異常 throw new TimeoutException; } @Override public boolean isSuccess { return result == null ? false : !(result instanceof CauseHolder); } @SuppressWarnings("unchecked") @Override public V getNow { return (V) (result == SUCCESS_SIGNAL ? null : result); } @Override public Throwable cause { if (result != null && result instanceof CauseHolder) { return ((CauseHolder) result).cause; } return null; } @Override public IFuture<V> addListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (isDone) { // 若已完成直接通知該監(jiān)聽器 notifyListener(listener); return this; } synchronized (this) { if (!isDone) { listeners.add(listener); return this; } } notifyListener(listener); return this; } @Override public IFuture<V> removeListener(IFutureListener<V> listener) { if (listener == null) { throw new NullPointerException("listener"); } if (!isDone) { listeners.remove(listener); } return this; } @Override public IFuture<V> await throws InterruptedException { return await0(true); } private IFuture<V> await0(boolean interruptable) throws InterruptedException { if (!isDone) { // 若已完成就直接返回了 // 若允許終端且被中斷了則拋出中斷異常 if (interruptable && Thread.interrupted) { throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted."); } boolean interrupted = false; synchronized (this) { while (!isDone) { try { wait; // 釋放鎖進(jìn)入waiting狀態(tài),等待其它線程調(diào)用本對(duì)象的notify/notifyAll方法 } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } } } if (interrupted) { // 為什么這里要設(shè)中斷標(biāo)志位?因?yàn)閺膚ait方法返回后, 中斷標(biāo)志是被clear了的, // 這里重新設(shè)置以便讓其它代碼知道這里被中斷了。 Thread.currentThread.interrupt; } } return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true); } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone) { return true; } if (timeoutNanos <= 0) { return isDone; } if (interruptable && Thread.interrupted) { throw new InterruptedException(toString); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime; long waitTime = timeoutNanos; boolean interrupted = false; try { synchronized (this) { if (isDone) { return true; } if (waitTime <= 0) { return isDone; } for (;;) { try { wait(waitTime / 1000000, (int) (waitTime % 1000000)); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true; } } if (isDone) { return true; } else { waitTime = timeoutNanos - (System.nanoTime - startTime); if (waitTime <= 0) { return isDone; } } } } } finally { if (interrupted) { Thread.currentThread.interrupt; } } } @Override public IFuture<V> awaitUninterruptibly { try { return await0(false); } catch (InterruptedException e) { // 這里若拋異常了就無(wú)法處理了 throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeoutMillis) { try { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) { throw new java.lang.InternalError; } } protected IFuture<V> setFailure(Throwable cause) { if (setFailure0(cause)) { notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setFailure0(Throwable cause) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } result = new CauseHolder(cause); notifyAll; } return true; } protected IFuture<V> setSuccess(Object result) { if (setSuccess0(result)) { // 設(shè)置成功后通知監(jiān)聽器 notifyListeners; return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(Object result) { if (isDone) { return false; } synchronized (this) { if (isDone) { return false; } if (result == null) { // 異步操作正常執(zhí)行完畢的結(jié)果是null this.result = SUCCESS_SIGNAL; } else { this.result = result; } notifyAll; } return true; } private void notifyListeners { for (IFutureListener<V> l : listeners) { notifyListener(l); } } private void notifyListener(IFutureListener<V> l) { try { l.operationCompleted(this); } catch (Exception e) { e.printStackTrace; } } private static class SuccessSignal { } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this.cause = cause; } } }
那么要怎么使用這個(gè)呢,有了上面的骨架實(shí)現(xiàn),我們就可以定制各種各樣的異步結(jié)果了。下面模擬一下一個(gè)延時(shí)的任務(wù):
package future.test; import future.IFuture; import future.IFutureListener; /** * 延時(shí)加法 * @author lixiaohui * */ public class DelayAdder { public static void main(String[] args) { new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> { @Override public void operationCompleted(IFuture<Integer> future) throws Exception { System.out.println(future.getNow); } }); } /** * 延遲加 * @param delay 延時(shí)時(shí)長(zhǎng) milliseconds * @param a 加數(shù) * @param b 加數(shù) * @return 異步結(jié)果 */ public DelayAdditionFuture add(long delay, int a, int b) { DelayAdditionFuture future = new DelayAdditionFuture; new Thread(new DelayAdditionTask(delay, a, b, future)).start; return future; } private class DelayAdditionTask implements Runnable { private long delay; private int a, b; private DelayAdditionFuture future; public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) { super; this.delay = delay; this.a = a; this.b = b; this.future = future; } @Override public void run { try { Thread.sleep(delay); Integer i = a + b; // TODO 這里設(shè)置future為完成狀態(tài)(正常執(zhí)行完畢) future.setSuccess(i); } catch (InterruptedException e) { // TODO 這里設(shè)置future為完成狀態(tài)(異常執(zhí)行完畢) future.setFailure(e.getCause); } } } } package future.test; import future.AbstractFuture; import future.IFuture; //只是把兩個(gè)方法對(duì)外暴露 public class DelayAdditionFuture extends AbstractFuture<Integer> { @Override public IFuture<Integer> setSuccess(Object result) { return super.setSuccess(result); } @Override public IFuture<Integer> setFailure(Throwable cause) { return super.setFailure(cause); } }
可以看到客戶端不用主動(dòng)去詢問future是否完成,而是future完成時(shí)自動(dòng)回調(diào)operationcompleted方法,客戶端只需在回調(diào)里實(shí)現(xiàn)邏輯即可。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
arthas?jprofiler做復(fù)雜鏈路的調(diào)用分析
這篇文章主要為大家介紹了arthas?jprofiler做復(fù)雜鏈路的調(diào)用分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Java continue break制作簡(jiǎn)單聊天室程序
這篇文章主要為大家詳細(xì)介紹了Java continue break制作簡(jiǎn)單聊天室程序,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10Java反射機(jī)制,反射相關(guān)API,反射API使用方式(反射獲取實(shí)體類字段名和注解值)
這篇文章主要介紹了Java反射機(jī)制,反射相關(guān)API,反射API使用方式(反射獲取實(shí)體類字段名和注解值),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07Spring Boot詳解各類請(qǐng)求和響應(yīng)的處理方法
平時(shí)只是在用SpringBoot框架,但并沒有詳細(xì)研究過(guò)請(qǐng)求和響應(yīng)執(zhí)行的一個(gè)具體過(guò)程,所以本文主要來(lái)梳理一下SpringBoot請(qǐng)求和響應(yīng)的處理過(guò)程2022-07-07Netty分布式server啟動(dòng)流程N(yùn)io創(chuàng)建源碼分析
這篇文章主要介紹了Netty分布式server啟動(dòng)流程N(yùn)io創(chuàng)建源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03

Java中compareTo()和compare()方法使用及區(qū)別詳解