欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java如何處理延遲任務過程解析

 更新時間:2019年10月16日 08:55:58   作者:巡山小妖N  
這篇文章主要介紹了Java如何處理延遲任務過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

1、利用延遲隊列

延時隊列,第一他是個隊列,所以具有對列功能第二就是延時,這就是延時對列,功能也就是將任務放在該延時對列中,只有到了延時時刻才能從該延時對列中獲取任務否則獲取不到……

應用場景比較多,比如延時1分鐘發(fā)短信,延時1分鐘再次執(zhí)行等,下面先看看延時隊列demo之后再看延時隊列在項目中的使用:

簡單的延時隊列要有三部分:第一實現(xiàn)了Delayed接口的消息體、第二消費消息的消費者、第三存放消息的延時隊列,那下面就來看看延時隊列demo。

一、消息體

package com.delqueue; 
 
import java.util.concurrent.Delayed; 
import java.util.concurrent.TimeUnit; 
 
/** 
 * 消息體定義 實現(xiàn)Delayed接口就是實現(xiàn)兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期…… */ 
public class Message implements Delayed { 
  private int id; 
  private String body; // 消息內(nèi)容 
  private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。 
 
  public int getId() { 
    return id; 
  } 
 
  public String getBody() { 
    return body; 
  } 
 
  public long getExcuteTime() { 
    return excuteTime; 
  } 
 
  public Message(int id, String body, long delayTime) { 
    this.id = id; 
    this.body = body; 
    this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); 
  } 
 
  // 自定義實現(xiàn)比較方法返回 1 0 -1三個參數(shù) 
  @Override 
  public int compareTo(Delayed delayed) { 
    Message msg = (Message) delayed; 
    return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1 
        : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0); 
  } 
 
  // 延遲任務是否到時就是按照這個方法判斷如果返回的是負數(shù)則說明到期否則還沒到期 
  @Override 
  public long getDelay(TimeUnit unit) { 
    return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); 
  } 
}

二、消息消費者

package com.delqueue; 
 
import java.util.concurrent.DelayQueue; 
 
public class Consumer implements Runnable { 
  // 延時隊列 ,消費者從其中獲取消息進行消費 
  private DelayQueue<Message> queue; 
 
  public Consumer(DelayQueue<Message> queue) { 
    this.queue = queue; 
  } 
 
  @Override 
  public void run() { 
    while (true) { 
      try { 
        Message take = queue.take(); 
        System.out.println("消費消息id:" + take.getId() + " 消息體:" + take.getBody()); 
      } catch (InterruptedException e) { 
        e.printStackTrace(); 
      } 
    } 
  } 
}

三、延時隊列

package com.delqueue; 
 
import java.util.concurrent.DelayQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
 
public class DelayQueueTest { 
   public static void main(String[] args) {  
      // 創(chuàng)建延時隊列  
      DelayQueue<Message> queue = new DelayQueue<Message>();  
      // 添加延時消息,m1 延時3s  
      Message m1 = new Message(1, "world", 3000);  
      // 添加延時消息,m2 延時10s  
      Message m2 = new Message(2, "hello", 10000);  
      //將延時消息放到延時隊列中 
      queue.offer(m2);  
      queue.offer(m1);  
      // 啟動消費線程 消費添加到延時隊列中的消息,前提是任務到了延期時間  
      ExecutorService exec = Executors.newFixedThreadPool(1); 
      exec.execute(new Consumer(queue)); 
      exec.shutdown(); 
    }  
}

將消息體放入延遲隊列中,在啟動消費者線程去消費延遲隊列中的消息,如果延遲隊列中的消息到了延遲時間則可以從中取出消息否則無法取出消息也就無法消費。

這就是延遲隊列demo,下面我們來說說在真實環(huán)境下的使用。

使用場景描述:

在打車軟件中對訂單進行派單的流程,當有訂單的時候給該訂單篩選司機,然后給當訂單綁定司機,但是有時運氣沒那么好,訂單進來后第一次沒有篩選到合適的司機,但我們也不能就此結束派單,而是將該訂單的信息放到延時隊列中過個2秒鐘在進行一次,其實這個2秒鐘就是一個延遲,所以這里我們就可以使用延時隊列來實現(xiàn)……

下面看看簡單的流程圖:

下面來看看具體代碼實現(xiàn):

在項目中有如下幾個類:第一 、任務類 第二、按照任務類組裝的消息體類 第三、延遲隊列管理類

任務類即執(zhí)行篩選司機、綁單、push消息的任務類

package com.test.delayqueue; 
/** 
 * 具體執(zhí)行相關業(yè)務的業(yè)務類 
 * @author whd 
 * @date 2017年9月25日 上午12:49:32 
 */ 
public class DelayOrderWorker implements Runnable { 
 
  @Override 
  public void run() { 
    // TODO Auto-generated method stub 
    //相關業(yè)務邏輯處理 
    System.out.println(Thread.currentThread().getName()+" do something ……"); 
  } 
}

消息體類,在延時隊列中這個實現(xiàn)了Delayed接口的消息類是比不可少的,實現(xiàn)接口時有一個getDelay(TimeUnit unit)方法,這個方法就是判斷是否到期的

這里定義的是一個泛型類,所以可以將我們上面的任務類作為其中的task,這樣就將任務類分裝成了一個消息體

package com.test.delayqueue; 
 
import java.util.concurrent.Delayed; 
import java.util.concurrent.TimeUnit; 
 
/** 
 * 延時隊列中的消息體將任務封裝為消息體 
 * 
 * @author whd 
 * @date 2017年9月25日 上午12:48:30 
 * @param <T> 
 */ 
public class DelayOrderTask<T extends Runnable> implements Delayed { 
  private final long time; 
  private final T task; // 任務類,也就是之前定義的任務類 
 
  /** 
   * @param timeout 
   *      超時時間(秒) 
   * @param task 
   *      任務 
   */ 
  public DelayOrderTask(long timeout, T task) { 
    this.time = System.nanoTime() + timeout; 
    this.task = task; 
  } 
 
  @Override 
  public int compareTo(Delayed o) { 
    // TODO Auto-generated method stub 
    DelayOrderTask other = (DelayOrderTask) o; 
    long diff = time - other.time; 
    if (diff > 0) { 
      return 1; 
    } else if (diff < 0) { 
      return -1; 
    } else { 
      return 0; 
    } 
  } 
 
  @Override 
  public long getDelay(TimeUnit unit) { 
    // TODO Auto-generated method stub 
    return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); 
  } 
 
  @Override 
  public int hashCode() { 
    return task.hashCode(); 
  } 
 
  public T getTask() { 
    return task; 
  } 
}

延時隊列管理類,這個類主要就是將任務類封裝成消息并并添加到延時隊列中,以及輪詢延時隊列從中取出到時的消息體,在獲取任務類放到線程池中執(zhí)行任務

package com.test.delayqueue; 
 
import java.util.Map; 
import java.util.concurrent.DelayQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicLong; 
 
/** 
 * 延時隊列管理類,用來添加任務、執(zhí)行任務 
 * 
 * @author whd 
 * @date 2017年9月25日 上午12:44:59 
 */ 
public class DelayOrderQueueManager { 
  private final static int DEFAULT_THREAD_NUM = 5; 
  private static int thread_num = DEFAULT_THREAD_NUM; 
  // 固定大小線程池 
  private ExecutorService executor; 
  // 守護線程 
  private Thread daemonThread; 
  // 延時隊列 
  private DelayQueue<DelayOrderTask<?>> delayQueue; 
  private static final AtomicLong atomic = new AtomicLong(0); 
  private final long n = 1; 
  private static DelayOrderQueueManager instance = new DelayOrderQueueManager(); 
 
  private DelayOrderQueueManager() { 
    executor = Executors.newFixedThreadPool(thread_num); 
    delayQueue = new DelayQueue<>(); 
    init(); 
  } 
 
  public static DelayOrderQueueManager getInstance() { 
    return instance; 
  } 
 
  /** 
   * 初始化 
   */ 
  public void init() { 
    daemonThread = new Thread(() -> { 
      execute(); 
    }); 
    daemonThread.setName("DelayQueueMonitor"); 
    daemonThread.start(); 
  } 
 
  private void execute() { 
    while (true) { 
      Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); 
      System.out.println("當前存活線程數(shù)量:" + map.size()); 
      int taskNum = delayQueue.size(); 
      System.out.println("當前延時任務數(shù)量:" + taskNum); 
      try { 
        // 從延時隊列中獲取任務 
        DelayOrderTask<?> delayOrderTask = delayQueue.take(); 
        if (delayOrderTask != null) { 
          Runnable task = delayOrderTask.getTask(); 
          if (null == task) { 
            continue; 
          } 
          // 提交到線程池執(zhí)行task 
          executor.execute(task); 
        } 
      } catch (Exception e) { 
        e.printStackTrace(); 
      } 
    } 
  } 
 
  /** 
   * 添加任務 
   * 
   * @param task 
   * @param time 
   *      延時時間 
   * @param unit 
   *      時間單位 
   */ 
  public void put(Runnable task, long time, TimeUnit unit) { 
    // 獲取延時時間 
    long timeout = TimeUnit.NANOSECONDS.convert(time, unit); 
    // 將任務封裝成實現(xiàn)Delayed接口的消息體 
    DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task); 
    // 將消息體放到延時隊列中 
    delayQueue.put(delayOrder); 
  } 
 
  /** 
   * 刪除任務 
   * 
   * @param task 
   * @return 
   */ 
  public boolean removeTask(DelayOrderTask task) { 
 
    return delayQueue.remove(task); 
  } 
}

測試類

package com.delqueue; 
 
import java.util.concurrent.TimeUnit; 
 
import com.test.delayqueue.DelayOrderQueueManager; 
import com.test.delayqueue.DelayOrderWorker; 
 
public class Test { 
  public static void main(String[] args) { 
    DelayOrderWorker work1 = new DelayOrderWorker();// 任務1 
    DelayOrderWorker work2 = new DelayOrderWorker();// 任務2 
    DelayOrderWorker work3 = new DelayOrderWorker();// 任務3 
    // 延遲隊列管理類,將任務轉(zhuǎn)化消息體并將消息體放入延遲對列中等待執(zhí)行 
    DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance(); 
    manager.put(work1, 3000, TimeUnit.MILLISECONDS); 
    manager.put(work2, 6000, TimeUnit.MILLISECONDS); 
    manager.put(work3, 9000, TimeUnit.MILLISECONDS); 
  } 
 
}

OK 這就是項目中的具體使用情況,當然具體內(nèi)容被忽略,整體框架就是這樣,還有這里使用java的延時隊列但是這種方式是有問題的如果如果down機則會出現(xiàn)任務丟失,所以也可以考慮使用mq、redis來實現(xiàn)

2、mq實現(xiàn)延遲消息

在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現(xiàn)延遲隊列功能。同時插件依賴Erlang/OPT 18.0及以上。

插件源碼地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下載地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安裝:

進入插件安裝目錄

{rabbitmq-server}/plugins/(可以查看一下當前已存在的插件)

下載插件

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下載的文件名稱不規(guī)則就手動重命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

關閉插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性
x-delayed-message是插件提供的類型,并不是rabbitmq本身的,發(fā)送消息的時候通過在header添加”x-delay”參數(shù)來控制消息的延時時間

直接在maven工程的pom.xml文件中加入

接下來在 application.properties 文件中加入redis配置:

也很簡單,代碼如下:

實現(xiàn)消息發(fā)送

x-delay

在這里我設置的延遲時間是3秒。

消息消費者

直接在main方法里運行Spring Boot程序,Spring Boot會自動解析 MessageReceiver 類的。

接下來只需要用Junit運行一下發(fā)送消息的接口即可。

運行完后,可以看到如下信息:

消息發(fā)送時間:2018-05-03 12:44:53
3秒鐘后,Spring Boot控制臺會輸出:
消息接收時間:2018-05-03 12:44:56
接收到的消息:hello i am delay msg

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

相關文章

  • Spring boot集中異常處理方法實例

    Spring boot集中異常處理方法實例

    這篇文章主要介紹了Spring boot集中異常處理方法實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-04-04
  • JAVA spark創(chuàng)建DataFrame的方法

    JAVA spark創(chuàng)建DataFrame的方法

    這篇文章主要介紹了JAVA spark創(chuàng)建DataFrame的方法,幫助大家更好的理解和學習spark,感興趣的朋友可以了解下
    2020-08-08
  • SpringBoot thymeleaf eclipse熱部署方案操作步驟

    SpringBoot thymeleaf eclipse熱部署方案操作步驟

    今天小編就為大家分享一篇關于SpringBoot thymeleaf eclipse熱部署方案操作步驟,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • spring boot validation參數(shù)校驗實例分析

    spring boot validation參數(shù)校驗實例分析

    這篇文章主要介紹了spring boot validation參數(shù)校驗,結合實例形式分析了spring boot validation進行數(shù)據(jù)有效性驗證的相關操作技巧,需要的朋友可以參考下
    2019-11-11
  • Java使用Ajax異步上傳文件

    Java使用Ajax異步上傳文件

    使用Ajax上傳文件的應用場景頗多,比如上傳用戶頭像、博客文章中插入圖片、對認證用戶相關身份進行校驗等等很多很多。本文提供一個簡單的示例供大家參考
    2021-05-05
  • Java編碼輔助工具Mapstruct用法詳解

    Java編碼輔助工具Mapstruct用法詳解

    這篇文章主要介紹了Java編碼輔助工具Mapstruct用法詳解,手動編碼setter/getter各個對應屬性,會顯得臃腫繁瑣。通過Mapstruct框架可簡單方便地完成這一工作。,需要的朋友可以參考下
    2019-06-06
  • 基于java.lang.IllegalArgumentException異常報錯問題及解決

    基于java.lang.IllegalArgumentException異常報錯問題及解決

    這篇文章主要介紹了基于java.lang.IllegalArgumentException異常報錯問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • 在SpringBoot中實現(xiàn)一個訂單號生成系統(tǒng)的示例代碼

    在SpringBoot中實現(xiàn)一個訂單號生成系統(tǒng)的示例代碼

    在Spring Boot中設計一個訂單號生成系統(tǒng),主要考慮到生成的訂單號需要滿足的幾個要求:唯一性、可擴展性、以及可能的業(yè)務相關性,本文給大家介紹了幾種常見的解決方案及相應的示例代碼,需要的朋友可以參考下
    2024-02-02
  • java實現(xiàn)上傳文件類型檢測過程解析

    java實現(xiàn)上傳文件類型檢測過程解析

    這篇文章主要介紹了java實現(xiàn)上傳文件類型檢測過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-12-12
  • 詳解Guava Cache本地緩存在Spring Boot應用中的實踐

    詳解Guava Cache本地緩存在Spring Boot應用中的實踐

    Guava Cache是一個全內(nèi)存的本地緩存實現(xiàn),本文將講述如何將 Guava Cache緩存應用到 Spring Boot應用中。具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-01-01

最新評論