Java的Netty進階之Future和Promise詳解
一、java.util.concurrent.Future源碼解析
java.util.concurrent.Future代表異步計算的結(jié)果,是JDK自帶接口;提供了檢查計算是否完成、等待計算完成以及檢索計算結(jié)果的方法,只有當(dāng)計算完成時,才能使用get方法獲取結(jié)果,必要時進行阻塞,直到它準備好為止。通過cancel方法執(zhí)行取消,額外提供了其它方法來確定任務(wù)時正常完成還是被取消,一旦計算完成,就不能取消計算。如果為了可取消性而想使用Future,但不提供可用的結(jié)果,則可以聲明Future<?>形式的類型并且作為基礎(chǔ)任務(wù)的結(jié)果返回null。
示例用法(源碼提供):
interface ArchiveSearcher { String search(String target); } class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(String target) throws InterruptedException { Callable<String> task = () -> searcher.search(target); Future<String> future = executor.submit(task); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
FutureTask類是Future、Runnable接口的一種實現(xiàn),因此可以被Executor執(zhí)行,例如:上面submit提交方法可以用下面的代碼替換:
FutureTask<String> future = new FutureTask<>(task); executor.execute(future);
public interface Future<V> { /** * 嘗試關(guān)閉執(zhí)行中的任務(wù),如果任務(wù)已經(jīng)執(zhí)行完成,則嘗試將會失敗, * @param mayInterruptIfRunning {@code true} 如果任務(wù)正在執(zhí)行,是否應(yīng)該中斷任務(wù) */ boolean cancel(boolean mayInterruptIfRunning); /** * 如果此任務(wù)在正常完成之前被取消,則返回true */ boolean isCancelled(); /** * 如果當(dāng)前任務(wù)完成,返回true */ boolean isDone(); /** * 如果需要,等待計算完成,然后獲取其結(jié)果 * @return 計算結(jié)果 * @throws CancellationException 如果計算被取消 * @throws ExecutionException 如果計算拋出異常 * @throws InterruptedException 如果等待時當(dāng)前線程被打斷 */ V get() throws InterruptedException, ExecutionException; /** * 如果需要,最多等待給定的時間以完成計算,然后獲取其結(jié)果。 * @param timeout 最大等待時間 * @param unit 時間單位 * @return 計算結(jié)果 * @throws CancellationException 如果計算被取消 * @throws ExecutionException 如果計算拋出異常 * @throws InterruptedException 如果在等待時當(dāng)前線程被打斷 * @throws TimeoutException 如果等待時間超時 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
二、io.netty.util.concurrent.Future源碼解析
異步操作結(jié)果
public interface Future<V> extends java.util.concurrent.Future<V> { /** * 當(dāng)且僅當(dāng)I/O操作完成時,返回true */ boolean isSuccess(); /** * 當(dāng)且僅當(dāng)可以通過cancel方法取消操作時,返回true */ boolean isCancellable(); /** * 如果I/O操作失敗,則返回I/O操作失敗的原因。 */ Throwable cause(); /** * 添加指定的監(jiān)聽器到Future,當(dāng)future異步計算完成會通知指定的監(jiān)聽器。 */ Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); /** * 添加指定的多個監(jiān)聽器到Future,當(dāng)future異步計算完成會通知指定的監(jiān)聽器。 */ Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** * 刪除future異步計算中第一次出現(xiàn)的監(jiān)聽器,被刪除的監(jiān)聽器在future異步計算完成后將不會被通知 */ Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); /** * 刪除future異步計算中前面出現(xiàn)的多個監(jiān)聽器,被刪除的監(jiān)聽器在future異步計算完成后將不會被通知 */ Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); /** * 等待future異步計算完成,如果future異步計算失敗,則拋出失敗原因 */ Future<V> sync() throws InterruptedException; /** * 等待future異步計算完成,如果future異步計算失敗,則拋出失敗原因 */ Future<V> syncUninterruptibly(); /** * 等待future異步計算完成 */ Future<V> await() throws InterruptedException; /** * 等待future異步計算順利完成,此方法如果捕獲InterruptedException異常將會默認丟棄 */ Future<V> awaitUninterruptibly(); /** * 在指定的時間內(nèi)等待future異步計算完成 */ boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** * 在指定的時間內(nèi)等待future異步計算完成 */ boolean await(long timeoutMillis) throws InterruptedException; /** * 在指定的時間內(nèi)等待future異步計算完成,如果發(fā)生InterruptedException異常將會默默丟棄掉 */ boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** * 在指定的時間內(nèi)等待future異步計算完成,如果發(fā)生InterruptedException異常將會默默丟棄掉 */ boolean awaitUninterruptibly(long timeoutMillis); /** * 無阻塞返回結(jié)果,如果future異步計算還未完成,則返回null */ V getNow(); /** * {@inheritDoc} * * 如果取消任務(wù)成功,future異步計算將會拋出CancellationException異常 */ @Override boolean cancel(boolean mayInterruptIfRunning); }
三、io.netty.channel.ChannelFuture源碼解析
ChannelFuture是異步Channel I/O操作的結(jié)果。
Netty中的所有I/O操作都是異步的,這就意味著任何I/O操作都將立即返回,并且不能保證I/O操作在調(diào)用結(jié)束時已經(jīng)完成,將會返回一個代表I/O操作結(jié)果或狀態(tài)信息的ChannelFuture實例。
ChannelFuture代表完成或未完成的異步計算,當(dāng)一個I/O操作開始時,將會創(chuàng)建一個future實例對象。這個新的future對象是未完成初始化的,它是處于即未完成、失敗,也沒有被關(guān)閉的狀態(tài),因為I/O操作還未完成。如果I/O操作完成,并且成功、或者失敗、或者被關(guān)閉任務(wù),future異步計算將會被更具體的信息標(biāo)記,例如故障的原因。
請注意,即使失敗和取消也屬于已完成狀態(tài)。
+---------------------------+ | Completed successfully | +---------------------------+ +----> isDone() = true | +--------------------------+ | | isSuccess() = true | | Uncompleted | | +===========================+ +--------------------------+ | | Completed with failure | | isDone() = false | | +---------------------------+ | isSuccess() = false |----+----> isDone() = true | | isCancelled() = false | | | cause() = non-null | | cause() = null | | +===========================+ +--------------------------+ | | Completed by cancellation | | +---------------------------+ +----> isDone() = true | | isCancelled() = true | +---------------------------+
ChannelFuture提供了各種方法,可以檢查I/O操作是否已完成,等待完成,并檢索I/O操作的結(jié)果。它還允許您添加ChannelFutureListener監(jiān)聽器,以便在I/O操作完成是受到通知。首選是addListener(GenericFutureListener) 而不是await()方法。
建議盡可能選擇addListener(GenericFutureListener)而不是await(),以便在I/O操作完成時得到通知并執(zhí)行任何后續(xù)任務(wù)。
addListener(GenericFutureListener) 是非阻塞的。它只需要將指定的ChannelFutureListener添加到ChannelFuture中,當(dāng)I/O操作關(guān)聯(lián)的future異步計算完成時將會通知監(jiān)聽器。ChannelFutureListener產(chǎn)生了最佳的性能和資源利用率,因為它根本不阻塞。但是如果您不習(xí)慣事件驅(qū)動的編程,那么實現(xiàn)順序邏輯可能會很棘手。
相比之下,await()是一個阻塞操作。一旦被調(diào)用,調(diào)用方線程就會阻塞,直到操作完成。使用await()更容易實現(xiàn)順序邏輯,但是調(diào)用方線程在I/O操作完成之前會產(chǎn)生線程不必要的阻塞,并且線程間通知的成本相對較高。此外在特定情況下可能會出現(xiàn)死鎖,如下所述。不要在ChannelHandler內(nèi)部調(diào)用await()。
ChannelHandler中的事件處理程序方法通常由I/O線程調(diào)用。如果await()被事件處理程序調(diào)用,也就是被I/O操作調(diào)用的事件處理程序,I/O操作可能永遠不會完成,因為await()可以阻塞它所調(diào)用的時間處理程序,這是一個死鎖。
// BAD - NEVER DO THIS @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelFuture future = ctx.channel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... } // GOOD @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelFuture future = ctx.channel().close(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { // Perform post-closure operation // ... } }); }
盡管存在上述缺點,但是在某些情況下調(diào)用await()更方便。在這種情況下,請確保不要在I/O線程中調(diào)用await()。否則,將引發(fā)BlockingOperationException以防止死鎖。
不要混淆I/O超時和await等待超時。
使用 await(long), await(long, TimeUnit), awaitUninterruptibly(long), 或 awaitUninterruptibly(long, TimeUnit) 指定的超時值與I/O超時完全無關(guān)。如果I/O操作超時,則未來將標(biāo)記為“已完成但出現(xiàn)故障”,例如:應(yīng)該通過特定于傳輸?shù)倪x項配置連接超時:
// BAD - NEVER DO THIS Bootstrap b = ...; ChannelFuture f = b.connect(...); f.awaitUninterruptibly(10, TimeUnit.SECONDS); if (f.isCancelled()) { // Connection attempt cancelled by user } else if (!f.isSuccess()) { // You might get a NullPointerException here because the future // might not be completed yet. f.cause().printStackTrace(); } else { // Connection established successfully } // GOOD Bootstrap b = ...; // Configure the connect timeout option. b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); ChannelFuture f = b.connect(...); f.awaitUninterruptibly(); // Now we are sure the future is completed. assert f.isDone(); if (f.isCancelled()) { // Connection attempt cancelled by user } else if (!f.isSuccess()) { f.cause().printStackTrace(); } else { // Connection established successfully }
public interface ChannelFuture extends Future<Void> { /** * 返回一個Channel信道,在該通道中執(zhí)行與此future異步計算關(guān)聯(lián)的I/O操作 */ Channel channel(); @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); /** * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the * following methods: * <ul> * <li>{@link #addListener(GenericFutureListener)}</li> * <li>{@link #addListeners(GenericFutureListener[])}</li> * <li>{@link #await()}</li> * <li>{@link #await(long, TimeUnit)} ()}</li> * <li>{@link #await(long)} ()}</li> * <li>{@link #awaitUninterruptibly()}</li> * <li>{@link #sync()}</li> * <li>{@link #syncUninterruptibly()}</li> * </ul> */ boolean isVoid(); }
四、io.netty.util.concurrent.Promise源碼解析
可寫的特殊Future異步計算
public interface Promise<V> extends Future<V> { /** * 將此Future標(biāo)記為成功,并通知所有的監(jiān)聽器 * * 如果它已經(jīng)成功或失敗,它將拋出{@link IllegalStateException}. */ Promise<V> setSuccess(V result); /** * 將此Future標(biāo)記為成功,并通知所有的監(jiān)聽器 * * @return {@code true} 當(dāng)且僅當(dāng)將當(dāng)前future標(biāo)記為了成功; * {@code false} 因為當(dāng)前future已經(jīng)被標(biāo)記為了成功或者失??; */ boolean trySuccess(V result); /** * 標(biāo)記此future為失敗,并通知所有的監(jiān)聽器 * * 如果它已經(jīng)成功或者失敗,它將拋出 {@link IllegalStateException}. */ Promise<V> setFailure(Throwable cause); /** * 標(biāo)記此future為失敗,并通知所有的監(jiān)聽器 * * @return {@code true} 當(dāng)且僅當(dāng)成功的將這個future標(biāo)記為失敗; * {@code false} 因為這個future已經(jīng)被標(biāo)記為成功或者失敗 */ boolean tryFailure(Throwable cause); /** * 標(biāo)記future異步計算無法取消任務(wù) * * @return {@code true} 當(dāng)且僅當(dāng)成功標(biāo)記future異步計算無法取消任務(wù)或者已經(jīng)被標(biāo)記為無法取消任務(wù); * {@code false} 如果當(dāng)前future異步計算任務(wù)已經(jīng)被取消; */ boolean setUncancellable(); @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }
五、io.netty.channel.ChannelPromise源碼解析
ChannelPromise接口擴展了Promise和ChannelFuture,綁定了Channel,可以進行異步I/O操作,也可以監(jiān)聽Channel的I/O操作。
public interface ChannelPromise extends ChannelFuture, Promise<Void> { @Override Channel channel(); @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause); @Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); /** * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself. */ ChannelPromise unvoid(); }
六、io.netty.util.concurrent.AbstractFuture源碼解析
public abstract class AbstractFuture<V> implements Future<V> { @Override public V get() throws InterruptedException, ExecutionException { //等待future異步計算完成 await(); //如果I/O操作已經(jīng)失敗,則返回I/O操作失敗的原因 Throwable cause = cause(); if (cause == null) { //無阻塞返回執(zhí)行結(jié)果,如果future還未執(zhí)行完,則返回null return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { //等待future異步計算指定的時間計算完成 if (await(timeout, unit)) { //如果I/O操作已經(jīng)失敗,則返回I/O操作失敗的原因 Throwable cause = cause(); if (cause == null) { //無阻塞返回執(zhí)行結(jié)果,如果future還未執(zhí)行完,則返回null return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } throw new TimeoutException(); } }
到此這篇關(guān)于Java的Netty進階之Future和Promise詳解的文章就介紹到這了,更多相關(guān)Future和Promise詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java之while與do-while循環(huán)的用法詳解
在上一篇文章中,給大家講解了循環(huán)的概念,并重點給大家講解了for循環(huán)的使用。但在Java中,除了for循環(huán)之外,還有while、do-while、foreach等循環(huán)形式。這篇文章給大家講解while循環(huán)的使用2023-05-05使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解
這篇文章主要介紹了使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解,web結(jié)果由html+js+css組成,html結(jié)構(gòu)都有一定的規(guī)范,數(shù)據(jù)動態(tài)交互可以通過js實現(xiàn)。,需要的朋友可以參考下2019-06-06spring基于注解配置實現(xiàn)事務(wù)控制操作
這篇文章主要介紹了spring基于注解配置實現(xiàn)事務(wù)控制操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09詳解Java CompletableFuture使用方法以及與FutureTask的區(qū)別
CompletableFuture實現(xiàn)了CompletionStage接口和Future接口,前者是對后者的一個擴展,增加了異步回調(diào)、流式處理、多個Future組合處理的能力,使Java在處理多任務(wù)的協(xié)同工作時更加順暢便利2021-10-10詳解Spring Cloud Finchley版中Consul多實例注冊的問題處理
這篇文章主要介紹了詳解Spring Cloud Finchley版中Consul多實例注冊的問題處理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08