欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程之異步Future機制的原理和實現(xiàn)

 更新時間:2016年08月24日 15:31:23   投稿:lijiao  
這篇文章主要為大家詳細介紹了Java多線程之異步Future機制的原理和實現(xiàn),感興趣的小伙伴們可以參考一下

項目中經(jīng)常有些任務需要異步(提交到線程池中)去執(zhí)行,而主線程往往需要知道異步執(zhí)行產(chǎn)生的結果,這時我們要怎么做呢?用runnable是無法實現(xiàn)的,我們需要用callable看下面的代碼:

 import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AddTask implements Callable<Integer> {

 private int a,b;
 
 public AddTask(int a, int b) {
 this.a = a;
 this.b = b;
 }
 
 @Override
 public Integer call throws Exception {
 Integer result = a + b;
 return result;
 }
 
 public static void main(String[] args) throws InterruptedException, ExecutionException {
 ExecutorService executor = Executors.newSingleThreadExecutor;
 //JDK目前為止返回的都是FutureTask的實例
 Future<Integer> future = executor.submit(new AddTask(1, 2));
 Integer result = future.get;// 只有當future的狀態(tài)是已完成時(future.isDone = true),get方法才會返回
 }
} 

雖然可以實現(xiàn)獲取異步執(zhí)行結果的需求,但是我們發(fā)現(xiàn)這個Future其實很不好用,因為它沒有提供通知的機制,也就是說我們不知道future什么時候完成(如果我們需要輪詢isDone()來判斷的話感覺就沒有用這個的必要了)??聪耲ava.util.concurrent.future.Future 的接口方法:

 public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled;
  boolean isDone;
  V get throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
} 

由此可見JDK的Future機制其實并不好用,如果能給這個future加個監(jiān)聽器,讓它在完成時通知監(jiān)聽器的話就比較好用了,就像下面這個IFuture:

 package future;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * The result of an asynchronous operation.
 * 
 * @author lixiaohui
 * @param <V> 執(zhí)行結果的類型參數(shù)
 */
public interface IFuture<V> extends Future<V> { 
 boolean isSuccess; // 是否成功 
 V getNow; //立即返回結果(不管Future是否處于完成狀態(tài))
 Throwable cause; //若執(zhí)行失敗時的原因
    boolean isCancellable; //是否可以取消
 IFuture<V> await throws InterruptedException; //等待future的完成
 boolean await(long timeoutMillis) throws InterruptedException; // 超時等待future的完成
 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
    IFuture<V> awaitUninterruptibly; //等待future的完成,不響應中斷
    boolean awaitUninterruptibly(long timeoutMillis);//超時等待future的完成,不響應中斷
 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
 IFuture<V> addListener(IFutureListener<V> l); //當future完成時,會通知這些加進來的監(jiān)聽器
 IFuture<V> removeListener(IFutureListener<V> l);
 
} 

接下來就一起來實現(xiàn)這個IFuture,在這之前要說明下Object.wait,Object.notifyAll方法,因為整個Future實現(xiàn)的原���的核心就是這兩個方法.看看JDK里面的解釋:

 public class Object {
  /**
   * Causes the current thread to wait until another thread invokes the
   * {@link java.lang.Object#notify} method or the
   * {@link java.lang.Object#notifyAll} method for this object.
   * In other words, this method behaves exactly as if it simply
   * performs the call {@code wait(0)}.
   * 調用該方法后,當前線程會釋放對象監(jiān)視器鎖,并讓出CPU使用權。直到別的線程調用notify/notifyAll
   */
  public final void wait throws InterruptedException {
    wait(0);
  }

  /**
   * Wakes up all threads that are waiting on this object's monitor. A
   * thread waits on an object's monitor by calling one of the
   * {@code wait} methods.
   * <p>
   * The awakened threads will not be able to proceed until the current
   * thread relinquishes the lock on this object. The awakened threads
   * will compete in the usual manner with any other threads that might
   * be actively competing to synchronize on this object; for example,
   * the awakened threads enjoy no reliable privilege or disadvantage in
   * being the next thread to lock this object.
   */
  public final native void notifyAll;
} 

知道這個后,我們要自己實現(xiàn)Future也就有了思路,當線程調用了IFuture.await等一系列的方法時,如果Future還未完成,那么就調用future.wait 方法使線程進入WAITING狀態(tài)。而當別的線程設置Future為完成狀態(tài)(注意這里的完成狀態(tài)包括正常結束和異常結束)時,就需要調用future.notifyAll方法來喚醒之前因為調用過wait方法而處于WAITING狀態(tài)的那些線程。完整的實現(xiàn)如下(代碼應該沒有很難理解的地方,我是參考netty的Future機制的。有興趣的可以去看看netty的源碼):

 package future;

import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * <pre>
 * 正常結束時, 若執(zhí)行的結果不為null, 則result為執(zhí)行結果; 若執(zhí)行結果為null, 則result = {@link AbstractFuture#SUCCESS_SIGNAL}
 * 異常結束時, result為 {@link CauseHolder} 的實例;若是被取消而導致的異常結束, 則result為 {@link CancellationException} 的實例, 否則為其它異常的實例
 * 以下情況會使異步操作由未完成狀態(tài)轉至已完成狀態(tài), 也就是在以下情況發(fā)生時調用notifyAll方法:
 * <ul>
 * <li>異步操作被取消時(cancel方法)</li>
 * <li>異步操作正常結束時(setSuccess方法)</li>
 * <li>異步操作異常結束時(setFailure方法)</li>
 * </ul>
 * </pre>
 * 
 * @author lixiaohui
 *
 * @param <V>
 * 異步執(zhí)行結果的類型
 */
public class AbstractFuture<V> implements IFuture<V> {

 protected volatile Object result; // 需要保證其可見性
    /**
     * 監(jiān)聽器集
     */
 protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>;

 /**
 * 當任務正常執(zhí)行結果為null時, 即客戶端調用{@link AbstractFuture#setSuccess(null)}時, 
 * result引用該對象
 */
 private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal;

 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
 if (isDone) { // 已完成了不能取消
  return false;
 }

 synchronized (this) {
  if (isDone) { // double check
  return false;
  }
  result = new CauseHolder(new CancellationException);
  notifyAll; // isDone = true, 通知等待在該對象的wait的線程
 }
 notifyListeners; // 通知監(jiān)聽器該異步操作已完成
 return true;
 }
 
 @Override
 public boolean isCancellable {
 return result == null;
 }
 
 @Override
 public boolean isCancelled {
 return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
 }

 @Override
 public boolean isDone {
 return result != null;
 }

 @Override
 public V get throws InterruptedException, ExecutionException {
 await; // 等待執(zhí)行結果

 Throwable cause = cause;
 if (cause == null) { // 沒有發(fā)生異常,異步操作正常結束
  return getNow;
 }
 if (cause instanceof CancellationException) { // 異步操作被取消了
  throw (CancellationException) cause;
 }
 throw new ExecutionException(cause); // 其他異常
 }

 @Override
 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 if (await(timeout, unit)) {// 超時等待執(zhí)行結果
  Throwable cause = cause;
  if (cause == null) {// 沒有發(fā)生異常,異步操作正常結束
  return getNow;
  }
  if (cause instanceof CancellationException) {// 異步操作被取消了
  throw (CancellationException) cause;
  }
  throw new ExecutionException(cause);// 其他異常
 }
 // 時間到了異步操作還沒有結束, 拋出超時異常
 throw new TimeoutException;
 }

 @Override
 public boolean isSuccess {
 return result == null ? false : !(result instanceof CauseHolder);
 }

 @SuppressWarnings("unchecked")
 @Override
 public V getNow {
 return (V) (result == SUCCESS_SIGNAL ? null : result);
 }

 @Override
 public Throwable cause {
 if (result != null && result instanceof CauseHolder) {
  return ((CauseHolder) result).cause;
 }
 return null;
 }

 @Override
 public IFuture<V> addListener(IFutureListener<V> listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }
 if (isDone) { // 若已完成直接通知該監(jiān)聽器
  notifyListener(listener);
  return this;
 }
 synchronized (this) {
  if (!isDone) {
  listeners.add(listener);
  return this;
  }
 }
 notifyListener(listener);
 return this;
 }

 @Override
 public IFuture<V> removeListener(IFutureListener<V> listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }

 if (!isDone) {
  listeners.remove(listener);
 }

 return this;
 }

 @Override
 public IFuture<V> await throws InterruptedException {
 return await0(true);
 }

 
 private IFuture<V> await0(boolean interruptable) throws InterruptedException {
 if (!isDone) { // 若已完成就直接返回了
  // 若允許終端且被中斷了則拋出中斷異常
  if (interruptable && Thread.interrupted) {
  throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted.");
  }

  boolean interrupted = false;
  synchronized (this) {
  while (!isDone) {
   try {
   wait; // 釋放鎖進入waiting狀態(tài),等待其它線程調用本對象的notify/notifyAll方法
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }
  }
  }
  if (interrupted) {
  // 為什么這里要設中斷標志位?因為從wait方法返回后, 中斷標志是被clear了的, 
  // 這里重新設置以便讓其它代碼知道這里被中斷了。
  Thread.currentThread.interrupt;
  }
 }
 return this;
 }
 
 @Override
 public boolean await(long timeoutMillis) throws InterruptedException {
 return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
 }
 
 @Override
 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
 return await0(unit.toNanos(timeout), true);
 }

 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
 if (isDone) {
  return true;
 }

 if (timeoutNanos <= 0) {
  return isDone;
 }

 if (interruptable && Thread.interrupted) {
  throw new InterruptedException(toString);
 }

 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime;
 long waitTime = timeoutNanos;
 boolean interrupted = false;

 try {
  synchronized (this) {
  if (isDone) {
   return true;
  }

  if (waitTime <= 0) {
   return isDone;
  }

  for (;;) {
   try {
   wait(waitTime / 1000000, (int) (waitTime % 1000000));
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }

   if (isDone) {
   return true;
   } else {
   waitTime = timeoutNanos - (System.nanoTime - startTime);
   if (waitTime <= 0) {
    return isDone;
   }
   }
  }
  }
 } finally {
  if (interrupted) {
  Thread.currentThread.interrupt;
  }
 }
 }

 @Override
 public IFuture<V> awaitUninterruptibly {
 try {
  return await0(false);
 } catch (InterruptedException e) { // 這里若拋異常了就無法處理了
  throw new java.lang.InternalError;
 }
 }
 
 @Override
 public boolean awaitUninterruptibly(long timeoutMillis) {
 try {
  return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }

 @Override
 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
 try {
  return await0(unit.toNanos(timeout), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }

 protected IFuture<V> setFailure(Throwable cause) {
 if (setFailure0(cause)) {
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }

 private boolean setFailure0(Throwable cause) {
 if (isDone) {
  return false;
 }

 synchronized (this) {
  if (isDone) {
  return false;
  }
  result = new CauseHolder(cause);
  notifyAll;
 }

 return true;
 }

 protected IFuture<V> setSuccess(Object result) {
 if (setSuccess0(result)) { // 設置成功后通知監(jiān)聽器
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }

 private boolean setSuccess0(Object result) {
 if (isDone) {
  return false;
 }

 synchronized (this) {
  if (isDone) {
  return false;
  }
  if (result == null) { // 異步操作正常執(zhí)行完畢的結果是null
  this.result = SUCCESS_SIGNAL;
  } else {
  this.result = result;
  }
  notifyAll;
 }
 return true;
 }

 private void notifyListeners {
 for (IFutureListener<V> l : listeners) {
  notifyListener(l);
 }
 }

 private void notifyListener(IFutureListener<V> l) {
 try {
  l.operationCompleted(this);
 } catch (Exception e) {
  e.printStackTrace;
 }
 }

 private static class SuccessSignal {

 }

 private static final class CauseHolder {
 final Throwable cause;

 CauseHolder(Throwable cause) {
  this.cause = cause;
 }
 }
} 

那么要怎么使用這個呢,有了上面的骨架實現(xiàn),我們就可以定制各種各樣的異步結果了。下面模擬一下一個延時的任務:

 package future.test;

import future.IFuture;
import future.IFutureListener;

/**
 * 延時加法
 * @author lixiaohui
 *
 */
public class DelayAdder {
 
 public static void main(String[] args) {
 new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> {
  
  @Override
  public void operationCompleted(IFuture<Integer> future) throws Exception {
  System.out.println(future.getNow);
  }
  
 });
 }
 /**
 * 延遲加
 * @param delay 延時時長 milliseconds
 * @param a 加數(shù)
 * @param b 加數(shù)
 * @return 異步結果
 */
 public DelayAdditionFuture add(long delay, int a, int b) {
 DelayAdditionFuture future = new DelayAdditionFuture; 
 new Thread(new DelayAdditionTask(delay, a, b, future)).start;
 return future;
 }
 
 private class DelayAdditionTask implements Runnable {

 private long delay;
 
 private int a, b;
 
 private DelayAdditionFuture future;
 
 public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
  super;
  this.delay = delay;
  this.a = a;
  this.b = b;
  this.future = future;
 }

 @Override
 public void run {
  try {
  Thread.sleep(delay);
  Integer i = a + b;
  // TODO 這里設置future為完成狀態(tài)(正常執(zhí)行完畢)
  future.setSuccess(i);
  } catch (InterruptedException e) {
  // TODO 這里設置future為完成狀態(tài)(異常執(zhí)行完畢)
  future.setFailure(e.getCause);
  }
 }
 
 }
} package future.test;

import future.AbstractFuture;
import future.IFuture;
//只是把兩個方法對外暴露
public class DelayAdditionFuture extends AbstractFuture<Integer> {
 
 @Override
 public IFuture<Integer> setSuccess(Object result) {
 return super.setSuccess(result);
 }
 
 @Override
 public IFuture<Integer> setFailure(Throwable cause) {
 return super.setFailure(cause);
 }
 
} 

可以看到客戶端不用主動去詢問future是否完成,而是future完成時自動回調operationcompleted方法,客戶端只需在回調里實現(xiàn)邏輯即可。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

  • arthas?jprofiler做復雜鏈路的調用分析

    arthas?jprofiler做復雜鏈路的調用分析

    這篇文章主要為大家介紹了arthas?jprofiler做復雜鏈路的調用分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-06-06
  • Java中HashMap的常見用法詳解

    Java中HashMap的常見用法詳解

    這篇文章主要介紹了Java中HashMap的常見用法詳解,HashMap是Java中的一個常用子類,它是java.util.HashMap<k,v>集合,實現(xiàn)了Map<k,v>接口, HashMap可以存儲鍵值對,通過鍵來快速訪問值,在HashMap中,鍵是唯一的,而值可以重復,需要的朋友可以參考下
    2023-09-09
  • Java continue break制作簡單聊天室程序

    Java continue break制作簡單聊天室程序

    這篇文章主要為大家詳細介紹了Java continue break制作簡單聊天室程序,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • Java反射機制,反射相關API,反射API使用方式(反射獲取實體類字段名和注解值)

    Java反射機制,反射相關API,反射API使用方式(反射獲取實體類字段名和注解值)

    這篇文章主要介紹了Java反射機制,反射相關API,反射API使用方式(反射獲取實體類字段名和注解值),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • Spring Boot詳解各類請求和響應的處理方法

    Spring Boot詳解各類請求和響應的處理方法

    平時只是在用SpringBoot框架,但并沒有詳細研究過請求和響應執(zhí)行的一個具體過程,所以本文主要來梳理一下SpringBoot請求和響應的處理過程
    2022-07-07
  • Netty分布式server啟動流程Nio創(chuàng)建源碼分析

    Netty分布式server啟動流程Nio創(chuàng)建源碼分析

    這篇文章主要介紹了Netty分布式server啟動流程Nio創(chuàng)建源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-03-03
  • 詳解Spring Boot 屬性配置和使用

    詳解Spring Boot 屬性配置和使用

    本篇文章主要介紹了詳解Spring Boot 屬性配置和使用,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06
  • Java生成短8位UUID的實現(xiàn)方案

    Java生成短8位UUID的實現(xiàn)方案

    在Java中,UUID通常用于生成全局唯一的標識符,標準的UUID是128位的,由32個十六進制數(shù)字組成,并通過特定的算法保證其在全球范圍內的唯一性,本文給大家介紹了一個簡單的Java方法,用于生成一個較短的8位UUID,需要的朋友可以參考下
    2025-01-01
  • java Collections 排序--多條件排序實例

    java Collections 排序--多條件排序實例

    這篇文章主要介紹了java Collections 排序--多條件排序實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-09-09
  • Java中compareTo()和compare()方法使用及區(qū)別詳解

    Java中compareTo()和compare()方法使用及區(qū)別詳解

    這篇文章主要介紹了Java中compareTo()和compare()方法使用及區(qū)別的相關資料,compareTo()方法用于定義類的自然排序,適用于具有單一、固定排序方式的場景,compare()方法提供自定義排序的靈活性,適用于需要根據(jù)不同規(guī)則對對象進行排序的場景,需要的朋友可以參考下
    2025-01-01

最新評論