欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java的Netty進階之Future和Promise詳解

 更新時間:2023年11月16日 08:56:25   作者:立小研先森  
這篇文章主要介紹了Java的Netty進階之Future和Promise詳解,Netty 是基于 Java NIO 的異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,使用 Netty 可以快速開發(fā)網(wǎng)絡(luò)應(yīng)用,Netty 提供了高層次的抽象來簡化 TCP 和 UDP 服務(wù)器的編程,但是你仍然可以使用底層的 API,需要的朋友可以參考下

一、java.util.concurrent.Future源碼解析

java.util.concurrent.Future代表異步計算的結(jié)果,是JDK自帶接口;提供了檢查計算是否完成、等待計算完成以及檢索計算結(jié)果的方法,只有當(dāng)計算完成時,才能使用get方法獲取結(jié)果,必要時進行阻塞,直到它準備好為止。通過cancel方法執(zhí)行取消,額外提供了其它方法來確定任務(wù)時正常完成還是被取消,一旦計算完成,就不能取消計算。如果為了可取消性而想使用Future,但不提供可用的結(jié)果,則可以聲明Future<?>形式的類型并且作為基礎(chǔ)任務(wù)的結(jié)果返回null。

示例用法(源碼提供):

interface ArchiveSearcher {
    String search(String target);
}

class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...

    void showSearch(String target) throws InterruptedException {
        Callable<String> task = () -> searcher.search(target);
        Future<String> future = executor.submit(task);
        displayOtherThings(); // do other things while searching      
        try {
            displayText(future.get()); // use future      
        } catch (ExecutionException ex) {
            cleanup();
            return;
        }
    }
}

FutureTask類是Future、Runnable接口的一種實現(xiàn),因此可以被Executor執(zhí)行,例如:上面submit提交方法可以用下面的代碼替換:

 FutureTask<String> future = new FutureTask<>(task);  
 executor.execute(future);
public interface Future<V> {
    /**
     * 嘗試關(guān)閉執(zhí)行中的任務(wù),如果任務(wù)已經(jīng)執(zhí)行完成,則嘗試將會失敗,
     * @param mayInterruptIfRunning {@code true} 如果任務(wù)正在執(zhí)行,是否應(yīng)該中斷任務(wù)
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 如果此任務(wù)在正常完成之前被取消,則返回true
     */
    boolean isCancelled();
    /**
     * 如果當(dāng)前任務(wù)完成,返回true
     */
    boolean isDone();
    /**
     * 如果需要,等待計算完成,然后獲取其結(jié)果
     * @return 計算結(jié)果
     * @throws CancellationException 如果計算被取消
     * @throws ExecutionException 如果計算拋出異常
     * @throws InterruptedException 如果等待時當(dāng)前線程被打斷
     */
    V get() throws InterruptedException, ExecutionException;
    /**
     * 如果需要,最多等待給定的時間以完成計算,然后獲取其結(jié)果。
     * @param timeout 最大等待時間
     * @param unit 時間單位
     * @return 計算結(jié)果
     * @throws CancellationException 如果計算被取消
     * @throws ExecutionException 如果計算拋出異常
     * @throws InterruptedException 如果在等待時當(dāng)前線程被打斷
     * @throws TimeoutException 如果等待時間超時
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

二、io.netty.util.concurrent.Future源碼解析

異步操作結(jié)果

public interface Future<V> extends java.util.concurrent.Future<V> {
    /**
     * 當(dāng)且僅當(dāng)I/O操作完成時,返回true
     */
    boolean isSuccess();
    /**
     * 當(dāng)且僅當(dāng)可以通過cancel方法取消操作時,返回true
     */
    boolean isCancellable();
    /**
     * 如果I/O操作失敗,則返回I/O操作失敗的原因。
     */
    Throwable cause();
    /**
     * 添加指定的監(jiān)聽器到Future,當(dāng)future異步計算完成會通知指定的監(jiān)聽器。
     */
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    /**
     * 添加指定的多個監(jiān)聽器到Future,當(dāng)future異步計算完成會通知指定的監(jiān)聽器。
     */
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    /**
     * 刪除future異步計算中第一次出現(xiàn)的監(jiān)聽器,被刪除的監(jiān)聽器在future異步計算完成后將不會被通知
     */
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    /**
     * 刪除future異步計算中前面出現(xiàn)的多個監(jiān)聽器,被刪除的監(jiān)聽器在future異步計算完成后將不會被通知
     */
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    /**
     * 等待future異步計算完成,如果future異步計算失敗,則拋出失敗原因
     */
    Future<V> sync() throws InterruptedException;
    /**
     * 等待future異步計算完成,如果future異步計算失敗,則拋出失敗原因
     */
    Future<V> syncUninterruptibly();
    /**
     * 等待future異步計算完成
     */
    Future<V> await() throws InterruptedException;
    /**
     * 等待future異步計算順利完成,此方法如果捕獲InterruptedException異常將會默認丟棄
     */
    Future<V> awaitUninterruptibly();
    /**
     * 在指定的時間內(nèi)等待future異步計算完成
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    /**
     * 在指定的時間內(nèi)等待future異步計算完成
     */
    boolean await(long timeoutMillis) throws InterruptedException;
    /**
     * 在指定的時間內(nèi)等待future異步計算完成,如果發(fā)生InterruptedException異常將會默默丟棄掉
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    /**
     * 在指定的時間內(nèi)等待future異步計算完成,如果發(fā)生InterruptedException異常將會默默丟棄掉
     */
    boolean awaitUninterruptibly(long timeoutMillis);
    /**
     * 無阻塞返回結(jié)果,如果future異步計算還未完成,則返回null
     */
    V getNow();
    /**
     * {@inheritDoc}
     *
     * 如果取消任務(wù)成功,future異步計算將會拋出CancellationException異常
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

三、io.netty.channel.ChannelFuture源碼解析

ChannelFuture是異步Channel I/O操作的結(jié)果。

Netty中的所有I/O操作都是異步的,這就意味著任何I/O操作都將立即返回,并且不能保證I/O操作在調(diào)用結(jié)束時已經(jīng)完成,將會返回一個代表I/O操作結(jié)果或狀態(tài)信息的ChannelFuture實例。

ChannelFuture代表完成或未完成的異步計算,當(dāng)一個I/O操作開始時,將會創(chuàng)建一個future實例對象。這個新的future對象是未完成初始化的,它是處于即未完成、失敗,也沒有被關(guān)閉的狀態(tài),因為I/O操作還未完成。如果I/O操作完成,并且成功、或者失敗、或者被關(guān)閉任務(wù),future異步計算將會被更具體的信息標(biāo)記,例如故障的原因。

請注意,即使失敗和取消也屬于已完成狀態(tài)。

                          +---------------------------+
                                       | Completed successfully    |
                                       +---------------------------+
                                  +---->      isDone() = true      |
  +--------------------------+    |    |   isSuccess() = true      |
  |        Uncompleted       |    |    +===========================+
  +--------------------------+    |    | Completed with failure    |
  |      isDone() = false    |    |    +---------------------------+
  |   isSuccess() = false    |----+---->      isDone() = true      |
  | isCancelled() = false    |    |    |       cause() = non-null  |
  |       cause() = null     |    |    +===========================+
  +--------------------------+    |    | Completed by cancellation |
                                  |    +---------------------------+
                                  +---->      isDone() = true      |
                                       | isCancelled() = true      |
                                       +---------------------------+

ChannelFuture提供了各種方法,可以檢查I/O操作是否已完成,等待完成,并檢索I/O操作的結(jié)果。它還允許您添加ChannelFutureListener監(jiān)聽器,以便在I/O操作完成是受到通知。首選是addListener(GenericFutureListener) 而不是await()方法。

建議盡可能選擇addListener(GenericFutureListener)而不是await(),以便在I/O操作完成時得到通知并執(zhí)行任何后續(xù)任務(wù)。

addListener(GenericFutureListener) 是非阻塞的。它只需要將指定的ChannelFutureListener添加到ChannelFuture中,當(dāng)I/O操作關(guān)聯(lián)的future異步計算完成時將會通知監(jiān)聽器。ChannelFutureListener產(chǎn)生了最佳的性能和資源利用率,因為它根本不阻塞。但是如果您不習(xí)慣事件驅(qū)動的編程,那么實現(xiàn)順序邏輯可能會很棘手。

相比之下,await()是一個阻塞操作。一旦被調(diào)用,調(diào)用方線程就會阻塞,直到操作完成。使用await()更容易實現(xiàn)順序邏輯,但是調(diào)用方線程在I/O操作完成之前會產(chǎn)生線程不必要的阻塞,并且線程間通知的成本相對較高。此外在特定情況下可能會出現(xiàn)死鎖,如下所述。不要在ChannelHandler內(nèi)部調(diào)用await()。

ChannelHandler中的事件處理程序方法通常由I/O線程調(diào)用。如果await()被事件處理程序調(diào)用,也就是被I/O操作調(diào)用的事件處理程序,I/O操作可能永遠不會完成,因為await()可以阻塞它所調(diào)用的時間處理程序,這是一個死鎖。

  // BAD - NEVER DO THIS
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
      ChannelFuture future = ctx.channel().close();
      future.awaitUninterruptibly();
      // Perform post-closure operation
      // ...
  }
 
  // GOOD
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
      ChannelFuture future = ctx.channel().close();
      future.addListener(new ChannelFutureListener() {
          public void operationComplete(ChannelFuture future) {
              // Perform post-closure operation
              // ...
          }
      });
  }

盡管存在上述缺點,但是在某些情況下調(diào)用await()更方便。在這種情況下,請確保不要在I/O線程中調(diào)用await()。否則,將引發(fā)BlockingOperationException以防止死鎖。

不要混淆I/O超時和await等待超時。

使用 await(long), await(long, TimeUnit), awaitUninterruptibly(long), 或 awaitUninterruptibly(long, TimeUnit) 指定的超時值與I/O超時完全無關(guān)。如果I/O操作超時,則未來將標(biāo)記為“已完成但出現(xiàn)故障”,例如:應(yīng)該通過特定于傳輸?shù)倪x項配置連接超時:

// BAD - NEVER DO THIS
  Bootstrap b = ...;
  ChannelFuture f = b.connect(...);
  f.awaitUninterruptibly(10, TimeUnit.SECONDS);
  if (f.isCancelled()) {
      // Connection attempt cancelled by user
  } else if (!f.isSuccess()) {
      // You might get a NullPointerException here because the future
      // might not be completed yet.
      f.cause().printStackTrace();
  } else {
      // Connection established successfully
  }
 
  // GOOD
  Bootstrap b = ...;
  // Configure the connect timeout option.
  b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
  ChannelFuture f = b.connect(...);
  f.awaitUninterruptibly();
 
  // Now we are sure the future is completed.
  assert f.isDone();
 
  if (f.isCancelled()) {
      // Connection attempt cancelled by user
  } else if (!f.isSuccess()) {
      f.cause().printStackTrace();
  } else {
      // Connection established successfully
  }
public interface ChannelFuture extends Future<Void> {

    /**
     * 返回一個Channel信道,在該通道中執(zhí)行與此future異步計算關(guān)聯(lián)的I/O操作
     */
    Channel channel();

    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

    /**
     * Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
     * following methods:
     * <ul>
     *     <li>{@link #addListener(GenericFutureListener)}</li>
     *     <li>{@link #addListeners(GenericFutureListener[])}</li>
     *     <li>{@link #await()}</li>
     *     <li>{@link #await(long, TimeUnit)} ()}</li>
     *     <li>{@link #await(long)} ()}</li>
     *     <li>{@link #awaitUninterruptibly()}</li>
     *     <li>{@link #sync()}</li>
     *     <li>{@link #syncUninterruptibly()}</li>
     * </ul>
     */
    boolean isVoid();
}

四、io.netty.util.concurrent.Promise源碼解析

可寫的特殊Future異步計算

public interface Promise<V> extends Future<V> {

    /**
     * 將此Future標(biāo)記為成功,并通知所有的監(jiān)聽器
     *
     * 如果它已經(jīng)成功或失敗,它將拋出{@link IllegalStateException}.
     */
    Promise<V> setSuccess(V result);

    /**
     * 將此Future標(biāo)記為成功,并通知所有的監(jiān)聽器
     *
     * @return {@code true} 當(dāng)且僅當(dāng)將當(dāng)前future標(biāo)記為了成功;
     *          {@code false} 因為當(dāng)前future已經(jīng)被標(biāo)記為了成功或者失??;
     */
    boolean trySuccess(V result);

    /**
     * 標(biāo)記此future為失敗,并通知所有的監(jiān)聽器
     *
     * 如果它已經(jīng)成功或者失敗,它將拋出 {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * 標(biāo)記此future為失敗,并通知所有的監(jiān)聽器
     *
     * @return {@code true} 當(dāng)且僅當(dāng)成功的將這個future標(biāo)記為失敗;
     *         {@code false} 因為這個future已經(jīng)被標(biāo)記為成功或者失敗
     */
    boolean tryFailure(Throwable cause);

    /**
     * 標(biāo)記future異步計算無法取消任務(wù) 
     *
     * @return {@code true} 當(dāng)且僅當(dāng)成功標(biāo)記future異步計算無法取消任務(wù)或者已經(jīng)被標(biāo)記為無法取消任務(wù);
     *         {@code false} 如果當(dāng)前future異步計算任務(wù)已經(jīng)被取消;
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

五、io.netty.channel.ChannelPromise源碼解析

ChannelPromise接口擴展了Promise和ChannelFuture,綁定了Channel,可以進行異步I/O操作,也可以監(jiān)聽Channel的I/O操作。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     */
    ChannelPromise unvoid();
}

六、io.netty.util.concurrent.AbstractFuture源碼解析

public abstract class AbstractFuture<V> implements Future<V> {

    @Override
    public V get() throws InterruptedException, ExecutionException {
        //等待future異步計算完成
        await();
				//如果I/O操作已經(jīng)失敗,則返回I/O操作失敗的原因
        Throwable cause = cause();
        if (cause == null) {
            //無阻塞返回執(zhí)行結(jié)果,如果future還未執(zhí)行完,則返回null
            return getNow();
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
      //等待future異步計算指定的時間計算完成
        if (await(timeout, unit)) {
          //如果I/O操作已經(jīng)失敗,則返回I/O操作失敗的原因
            Throwable cause = cause();
            if (cause == null) {
              //無阻塞返回執(zhí)行結(jié)果,如果future還未執(zhí)行完,則返回null
                return getNow();
            }
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            throw new ExecutionException(cause);
        }
        throw new TimeoutException();
    }
}

到此這篇關(guān)于Java的Netty進階之Future和Promise詳解的文章就介紹到這了,更多相關(guān)Future和Promise詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • jmeter調(diào)試錯誤全集(入門必備)

    jmeter調(diào)試錯誤全集(入門必備)

    在使用jmeter做接口測試的過程中大家是不是經(jīng)常會遇到很多問題,本文就介紹了jmeter調(diào)試錯誤全集,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • Java之while與do-while循環(huán)的用法詳解

    Java之while與do-while循環(huán)的用法詳解

    在上一篇文章中,給大家講解了循環(huán)的概念,并重點給大家講解了for循環(huán)的使用。但在Java中,除了for循環(huán)之外,還有while、do-while、foreach等循環(huán)形式。這篇文章給大家講解while循環(huán)的使用
    2023-05-05
  • Struts2 漏洞分析及如何提前預(yù)防

    Struts2 漏洞分析及如何提前預(yù)防

    2016年4月26日,Struts2發(fā)布一份安全公告,CVE編號 CVE-2016-3081。這是自2012年Struts2命令執(zhí)行漏洞大規(guī)模爆發(fā)之后,該服務(wù)時隔四年再次爆發(fā)大規(guī)模漏洞。該漏洞也是今年目前爆出的最嚴重安全漏洞。本文分析了漏洞的原理危害影響防護等內(nèi)容。
    2016-05-05
  • Spark使用IDEA編寫wordcount的示例演示

    Spark使用IDEA編寫wordcount的示例演示

    這篇文章主要介紹了Spark使用IDEA編寫wordcount的示例演示,本文通過示例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-07-07
  • Java技術(shù)匯總

    Java技術(shù)匯總

    本篇文章主要對Java基本知識點和技術(shù)點的一些看法和介紹,具有很好的參考價值。下面跟著小編一起來看下吧
    2017-03-03
  • 使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解

    使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解

    這篇文章主要介紹了使用java技術(shù)抓取網(wǎng)站上彩票雙色球信息詳解,web結(jié)果由html+js+css組成,html結(jié)構(gòu)都有一定的規(guī)范,數(shù)據(jù)動態(tài)交互可以通過js實現(xiàn)。,需要的朋友可以參考下
    2019-06-06
  • spring基于注解配置實現(xiàn)事務(wù)控制操作

    spring基于注解配置實現(xiàn)事務(wù)控制操作

    這篇文章主要介紹了spring基于注解配置實現(xiàn)事務(wù)控制操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • 詳解Java CompletableFuture使用方法以及與FutureTask的區(qū)別

    詳解Java CompletableFuture使用方法以及與FutureTask的區(qū)別

    CompletableFuture實現(xiàn)了CompletionStage接口和Future接口,前者是對后者的一個擴展,增加了異步回調(diào)、流式處理、多個Future組合處理的能力,使Java在處理多任務(wù)的協(xié)同工作時更加順暢便利
    2021-10-10
  • java 如何實現(xiàn)正確的刪除集合中的元素

    java 如何實現(xiàn)正確的刪除集合中的元素

    這篇文章主要介紹了java 如何實現(xiàn)正確的刪除集合中的元素,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • 詳解Spring Cloud Finchley版中Consul多實例注冊的問題處理

    詳解Spring Cloud Finchley版中Consul多實例注冊的問題處理

    這篇文章主要介紹了詳解Spring Cloud Finchley版中Consul多實例注冊的問題處理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08

最新評論