Java的Netty進(jìn)階之Future和Promise詳解
一、java.util.concurrent.Future源碼解析
java.util.concurrent.Future代表異步計(jì)算的結(jié)果,是JDK自帶接口;提供了檢查計(jì)算是否完成、等待計(jì)算完成以及檢索計(jì)算結(jié)果的方法,只有當(dāng)計(jì)算完成時(shí),才能使用get方法獲取結(jié)果,必要時(shí)進(jìn)行阻塞,直到它準(zhǔn)備好為止。通過cancel方法執(zhí)行取消,額外提供了其它方法來確定任務(wù)時(shí)正常完成還是被取消,一旦計(jì)算完成,就不能取消計(jì)算。如果為了可取消性而想使用Future,但不提供可用的結(jié)果,則可以聲明Future<?>形式的類型并且作為基礎(chǔ)任務(wù)的結(jié)果返回null。
示例用法(源碼提供):
interface ArchiveSearcher {
String search(String target);
}
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(String target) throws InterruptedException {
Callable<String> task = () -> searcher.search(target);
Future<String> future = executor.submit(task);
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) {
cleanup();
return;
}
}
}
FutureTask類是Future、Runnable接口的一種實(shí)現(xiàn),因此可以被Executor執(zhí)行,例如:上面submit提交方法可以用下面的代碼替換:
FutureTask<String> future = new FutureTask<>(task); executor.execute(future);
public interface Future<V> {
/**
* 嘗試關(guān)閉執(zhí)行中的任務(wù),如果任務(wù)已經(jīng)執(zhí)行完成,則嘗試將會失敗,
* @param mayInterruptIfRunning {@code true} 如果任務(wù)正在執(zhí)行,是否應(yīng)該中斷任務(wù)
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 如果此任務(wù)在正常完成之前被取消,則返回true
*/
boolean isCancelled();
/**
* 如果當(dāng)前任務(wù)完成,返回true
*/
boolean isDone();
/**
* 如果需要,等待計(jì)算完成,然后獲取其結(jié)果
* @return 計(jì)算結(jié)果
* @throws CancellationException 如果計(jì)算被取消
* @throws ExecutionException 如果計(jì)算拋出異常
* @throws InterruptedException 如果等待時(shí)當(dāng)前線程被打斷
*/
V get() throws InterruptedException, ExecutionException;
/**
* 如果需要,最多等待給定的時(shí)間以完成計(jì)算,然后獲取其結(jié)果。
* @param timeout 最大等待時(shí)間
* @param unit 時(shí)間單位
* @return 計(jì)算結(jié)果
* @throws CancellationException 如果計(jì)算被取消
* @throws ExecutionException 如果計(jì)算拋出異常
* @throws InterruptedException 如果在等待時(shí)當(dāng)前線程被打斷
* @throws TimeoutException 如果等待時(shí)間超時(shí)
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}二、io.netty.util.concurrent.Future源碼解析
異步操作結(jié)果
public interface Future<V> extends java.util.concurrent.Future<V> {
/**
* 當(dāng)且僅當(dāng)I/O操作完成時(shí),返回true
*/
boolean isSuccess();
/**
* 當(dāng)且僅當(dāng)可以通過cancel方法取消操作時(shí),返回true
*/
boolean isCancellable();
/**
* 如果I/O操作失敗,則返回I/O操作失敗的原因。
*/
Throwable cause();
/**
* 添加指定的監(jiān)聽器到Future,當(dāng)future異步計(jì)算完成會通知指定的監(jiān)聽器。
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* 添加指定的多個(gè)監(jiān)聽器到Future,當(dāng)future異步計(jì)算完成會通知指定的監(jiān)聽器。
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* 刪除future異步計(jì)算中第一次出現(xiàn)的監(jiān)聽器,被刪除的監(jiān)聽器在future異步計(jì)算完成后將不會被通知
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* 刪除future異步計(jì)算中前面出現(xiàn)的多個(gè)監(jiān)聽器,被刪除的監(jiān)聽器在future異步計(jì)算完成后將不會被通知
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* 等待future異步計(jì)算完成,如果future異步計(jì)算失敗,則拋出失敗原因
*/
Future<V> sync() throws InterruptedException;
/**
* 等待future異步計(jì)算完成,如果future異步計(jì)算失敗,則拋出失敗原因
*/
Future<V> syncUninterruptibly();
/**
* 等待future異步計(jì)算完成
*/
Future<V> await() throws InterruptedException;
/**
* 等待future異步計(jì)算順利完成,此方法如果捕獲InterruptedException異常將會默認(rèn)丟棄
*/
Future<V> awaitUninterruptibly();
/**
* 在指定的時(shí)間內(nèi)等待future異步計(jì)算完成
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 在指定的時(shí)間內(nèi)等待future異步計(jì)算完成
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* 在指定的時(shí)間內(nèi)等待future異步計(jì)算完成,如果發(fā)生InterruptedException異常將會默默丟棄掉
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* 在指定的時(shí)間內(nèi)等待future異步計(jì)算完成,如果發(fā)生InterruptedException異常將會默默丟棄掉
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* 無阻塞返回結(jié)果,如果future異步計(jì)算還未完成,則返回null
*/
V getNow();
/**
* {@inheritDoc}
*
* 如果取消任務(wù)成功,future異步計(jì)算將會拋出CancellationException異常
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);
}三、io.netty.channel.ChannelFuture源碼解析
ChannelFuture是異步Channel I/O操作的結(jié)果。
Netty中的所有I/O操作都是異步的,這就意味著任何I/O操作都將立即返回,并且不能保證I/O操作在調(diào)用結(jié)束時(shí)已經(jīng)完成,將會返回一個(gè)代表I/O操作結(jié)果或狀態(tài)信息的ChannelFuture實(shí)例。
ChannelFuture代表完成或未完成的異步計(jì)算,當(dāng)一個(gè)I/O操作開始時(shí),將會創(chuàng)建一個(gè)future實(shí)例對象。這個(gè)新的future對象是未完成初始化的,它是處于即未完成、失敗,也沒有被關(guān)閉的狀態(tài),因?yàn)镮/O操作還未完成。如果I/O操作完成,并且成功、或者失敗、或者被關(guān)閉任務(wù),future異步計(jì)算將會被更具體的信息標(biāo)記,例如故障的原因。
請注意,即使失敗和取消也屬于已完成狀態(tài)。
+---------------------------+
| Completed successfully |
+---------------------------+
+----> isDone() = true |
+--------------------------+ | | isSuccess() = true |
| Uncompleted | | +===========================+
+--------------------------+ | | Completed with failure |
| isDone() = false | | +---------------------------+
| isSuccess() = false |----+----> isDone() = true |
| isCancelled() = false | | | cause() = non-null |
| cause() = null | | +===========================+
+--------------------------+ | | Completed by cancellation |
| +---------------------------+
+----> isDone() = true |
| isCancelled() = true |
+---------------------------+
ChannelFuture提供了各種方法,可以檢查I/O操作是否已完成,等待完成,并檢索I/O操作的結(jié)果。它還允許您添加ChannelFutureListener監(jiān)聽器,以便在I/O操作完成是受到通知。首選是addListener(GenericFutureListener) 而不是await()方法。
建議盡可能選擇addListener(GenericFutureListener)而不是await(),以便在I/O操作完成時(shí)得到通知并執(zhí)行任何后續(xù)任務(wù)。
addListener(GenericFutureListener) 是非阻塞的。它只需要將指定的ChannelFutureListener添加到ChannelFuture中,當(dāng)I/O操作關(guān)聯(lián)的future異步計(jì)算完成時(shí)將會通知監(jiān)聽器。ChannelFutureListener產(chǎn)生了最佳的性能和資源利用率,因?yàn)樗静蛔枞?。但是如果您不?xí)慣事件驅(qū)動的編程,那么實(shí)現(xiàn)順序邏輯可能會很棘手。
相比之下,await()是一個(gè)阻塞操作。一旦被調(diào)用,調(diào)用方線程就會阻塞,直到操作完成。使用await()更容易實(shí)現(xiàn)順序邏輯,但是調(diào)用方線程在I/O操作完成之前會產(chǎn)生線程不必要的阻塞,并且線程間通知的成本相對較高。此外在特定情況下可能會出現(xiàn)死鎖,如下所述。不要在ChannelHandler內(nèi)部調(diào)用await()。
ChannelHandler中的事件處理程序方法通常由I/O線程調(diào)用。如果await()被事件處理程序調(diào)用,也就是被I/O操作調(diào)用的事件處理程序,I/O操作可能永遠(yuǎn)不會完成,因?yàn)閍wait()可以阻塞它所調(diào)用的時(shí)間處理程序,這是一個(gè)死鎖。
// BAD - NEVER DO THIS
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.awaitUninterruptibly();
// Perform post-closure operation
// ...
}
// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
}
盡管存在上述缺點(diǎn),但是在某些情況下調(diào)用await()更方便。在這種情況下,請確保不要在I/O線程中調(diào)用await()。否則,將引發(fā)BlockingOperationException以防止死鎖。
不要混淆I/O超時(shí)和await等待超時(shí)。
使用 await(long), await(long, TimeUnit), awaitUninterruptibly(long), 或 awaitUninterruptibly(long, TimeUnit) 指定的超時(shí)值與I/O超時(shí)完全無關(guān)。如果I/O操作超時(shí),則未來將標(biāo)記為“已完成但出現(xiàn)故障”,例如:應(yīng)該通過特定于傳輸?shù)倪x項(xiàng)配置連接超時(shí):
// BAD - NEVER DO THIS
Bootstrap b = ...;
ChannelFuture f = b.connect(...);
f.awaitUninterruptibly(10, TimeUnit.SECONDS);
if (f.isCancelled()) {
// Connection attempt cancelled by user
} else if (!f.isSuccess()) {
// You might get a NullPointerException here because the future
// might not be completed yet.
f.cause().printStackTrace();
} else {
// Connection established successfully
}
// GOOD
Bootstrap b = ...;
// Configure the connect timeout option.
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
ChannelFuture f = b.connect(...);
f.awaitUninterruptibly();
// Now we are sure the future is completed.
assert f.isDone();
if (f.isCancelled()) {
// Connection attempt cancelled by user
} else if (!f.isSuccess()) {
f.cause().printStackTrace();
} else {
// Connection established successfully
}
public interface ChannelFuture extends Future<Void> {
/**
* 返回一個(gè)Channel信道,在該通道中執(zhí)行與此future異步計(jì)算關(guān)聯(lián)的I/O操作
*/
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
}
四、io.netty.util.concurrent.Promise源碼解析
可寫的特殊Future異步計(jì)算
public interface Promise<V> extends Future<V> {
/**
* 將此Future標(biāo)記為成功,并通知所有的監(jiān)聽器
*
* 如果它已經(jīng)成功或失敗,它將拋出{@link IllegalStateException}.
*/
Promise<V> setSuccess(V result);
/**
* 將此Future標(biāo)記為成功,并通知所有的監(jiān)聽器
*
* @return {@code true} 當(dāng)且僅當(dāng)將當(dāng)前future標(biāo)記為了成功;
* {@code false} 因?yàn)楫?dāng)前future已經(jīng)被標(biāo)記為了成功或者失?。?
*/
boolean trySuccess(V result);
/**
* 標(biāo)記此future為失敗,并通知所有的監(jiān)聽器
*
* 如果它已經(jīng)成功或者失敗,它將拋出 {@link IllegalStateException}.
*/
Promise<V> setFailure(Throwable cause);
/**
* 標(biāo)記此future為失敗,并通知所有的監(jiān)聽器
*
* @return {@code true} 當(dāng)且僅當(dāng)成功的將這個(gè)future標(biāo)記為失??;
* {@code false} 因?yàn)檫@個(gè)future已經(jīng)被標(biāo)記為成功或者失敗
*/
boolean tryFailure(Throwable cause);
/**
* 標(biāo)記future異步計(jì)算無法取消任務(wù)
*
* @return {@code true} 當(dāng)且僅當(dāng)成功標(biāo)記future異步計(jì)算無法取消任務(wù)或者已經(jīng)被標(biāo)記為無法取消任務(wù);
* {@code false} 如果當(dāng)前future異步計(jì)算任務(wù)已經(jīng)被取消;
*/
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
五、io.netty.channel.ChannelPromise源碼解析
ChannelPromise接口擴(kuò)展了Promise和ChannelFuture,綁定了Channel,可以進(jìn)行異步I/O操作,也可以監(jiān)聽Channel的I/O操作。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
@Override
Channel channel();
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
ChannelPromise unvoid();
}
六、io.netty.util.concurrent.AbstractFuture源碼解析
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
//等待future異步計(jì)算完成
await();
//如果I/O操作已經(jīng)失敗,則返回I/O操作失敗的原因
Throwable cause = cause();
if (cause == null) {
//無阻塞返回執(zhí)行結(jié)果,如果future還未執(zhí)行完,則返回null
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
//等待future異步計(jì)算指定的時(shí)間計(jì)算完成
if (await(timeout, unit)) {
//如果I/O操作已經(jīng)失敗,則返回I/O操作失敗的原因
Throwable cause = cause();
if (cause == null) {
//無阻塞返回執(zhí)行結(jié)果,如果future還未執(zhí)行完,則返回null
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}
到此這篇關(guān)于Java的Netty進(jìn)階之Future和Promise詳解的文章就介紹到這了,更多相關(guān)Future和Promise詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 深入理解Netty?FastThreadLocal優(yōu)缺點(diǎn)及實(shí)現(xiàn)邏輯
- SpringBoot 整合 Netty 多端口監(jiān)聽的操作方法
- SpringBoot整合Netty的流程步驟
- Springboot+WebSocket+Netty實(shí)現(xiàn)在線聊天/群聊系統(tǒng)
- spring?cloud?gateway中netty線程池小優(yōu)化
- Spring?Cloud?Gateway中netty線程池優(yōu)化示例詳解
- Netty學(xué)習(xí)之理解selector原理示例
- springboot之springboot與netty整合方案
相關(guān)文章
Java之while與do-while循環(huán)的用法詳解
在上一篇文章中,給大家講解了循環(huán)的概念,并重點(diǎn)給大家講解了for循環(huán)的使用。但在Java中,除了for循環(huán)之外,還有while、do-while、foreach等循環(huán)形式。這篇文章給大家講解while循環(huán)的使用2023-05-05
使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解
這篇文章主要介紹了使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解,web結(jié)果由html+js+css組成,html結(jié)構(gòu)都有一定的規(guī)范,數(shù)據(jù)動態(tài)交互可以通過js實(shí)現(xiàn)。,需要的朋友可以參考下2019-06-06
spring基于注解配置實(shí)現(xiàn)事務(wù)控制操作
這篇文章主要介紹了spring基于注解配置實(shí)現(xiàn)事務(wù)控制操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
詳解Java CompletableFuture使用方法以及與FutureTask的區(qū)別
CompletableFuture實(shí)現(xiàn)了CompletionStage接口和Future接口,前者是對后者的一個(gè)擴(kuò)展,增加了異步回調(diào)、流式處理、多個(gè)Future組合處理的能力,使Java在處理多任務(wù)的協(xié)同工作時(shí)更加順暢便利2021-10-10
java 如何實(shí)現(xiàn)正確的刪除集合中的元素
這篇文章主要介紹了java 如何實(shí)現(xiàn)正確的刪除集合中的元素,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
詳解Spring Cloud Finchley版中Consul多實(shí)例注冊的問題處理
這篇文章主要介紹了詳解Spring Cloud Finchley版中Consul多實(shí)例注冊的問題處理,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08

