欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

利用Redis實(shí)現(xiàn)延時處理的方法實(shí)例

 更新時間:2019年03月10日 15:45:57   作者:我一定會有貓的  
這篇文章主要給大家介紹了關(guān)于利用Redis實(shí)現(xiàn)延時處理的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧

背景

在開發(fā)中,往往會遇到一些關(guān)于延時任務(wù)的需求。例如

•生成訂單30分鐘未支付,則自動取消

•生成訂單60秒后,給用戶發(fā)短信

對上述的任務(wù),我們給一個專業(yè)的名字來形容,那就是延時任務(wù)。

最近需要做一個延時處理的功能,主要是從kafka中消費(fèi)消息后根據(jù)消息中的某個延時字段來進(jìn)行延時處理,在實(shí)際的實(shí)現(xiàn)過程中有一些需要注意的地方,記錄如下。

實(shí)現(xiàn)過程

說到j(luò)ava中的定時功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下幾點(diǎn):

  • Timer使用的是絕對時間,系統(tǒng)時間的改變會對Timer產(chǎn)生一定的影響;而ScheduledThreadPoolExecutor使用的是相對時間,所以不會有這個問題。
  • Timer使用單線程來處理任務(wù),長時間運(yùn)行的任務(wù)會導(dǎo)致其他任務(wù)的延時處理,而ScheduledThreadPoolExecutor可以自定義線程數(shù)量。
  • Timer沒有對運(yùn)行時異常進(jìn)行處理,一旦某個任務(wù)觸發(fā)運(yùn)行時異常,會導(dǎo)致整個Timer崩潰,而ScheduledThreadPoolExecutor對運(yùn)行時異常做了捕獲(可以在 afterExecute() 回調(diào)方法中進(jìn)行處理),所以更加安全。

1、ScheduledThreadPoolExecutor決定了用ScheduledThreadPoolExecutor來進(jìn)行實(shí)現(xiàn),接下來就是代碼編寫啦(大體流程代碼)。

主要的延時實(shí)現(xiàn)如下:

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new 
ThreadPoolExecutor.AbortPolicy());
//從消息中取出延遲時間及相關(guān)信息的代碼略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
  @Override
  public void run() {
   //具體操作邏輯
  }},0,delayTime, TimeUnit.SECONDS);

其中NamedThreadFactory是我自定義的一個線程工廠,主要給線程池定義名稱及相關(guān)日志打印便于后續(xù)的問題分析,這里就不多做介紹了。拒絕策略也是采用默認(rèn)的拒絕策略。

然后測試了一下,滿足目標(biāo)需求的功能,可以做到延遲指定時間后執(zhí)行,至此似乎功能就被完成了。

大家可能疑問,這也太簡單了有什么好說的,但是這種方式實(shí)現(xiàn)簡單是簡單但是存在一個潛在的問題,問題在哪呢,讓我們看一下ScheduledThreadPoolExecutor的源碼:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
 super(corePoolSize, Integer.MAX_VALUE, 0, 
 TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

ScheduledThreadPoolExecutor由于它自身的延時和周期的特性,默認(rèn)使用了DelayWorkQueue,而并不像我們平時使用的SingleThreadExecutor等構(gòu)造是可以使用自己定義的LinkedBlockingQueue并且設(shè)置隊列大小,問題就出在這里。

DelayWrokQueue是一個無界隊列,而我們的目標(biāo)數(shù)據(jù)源是kafka,也就是一個高并發(fā)高吞吐的消息隊列,很大可能在某一時間段有大量的消息過來從而導(dǎo)致OOM,在使用多線程時我們是肯定要考慮到OOM的可能性的,因?yàn)镺OM帶來的后果往往比較嚴(yán)重,系統(tǒng)OOM臨時的解決辦法一般只能是重啟,可能會導(dǎo)致用戶數(shù)據(jù)丟失等不可能挽回的問題,所以從編碼設(shè)計階段要采用盡可能穩(wěn)妥的手段來避免這些問題。

2、采用redis和線程結(jié)合

這一次換了思路,采用redis來幫助我們做緩沖,從而避免消息過多OOM的問題。

相關(guān)redis zset api:

//添加元素
ZADD key score member [[score member] [score member] …]
//根據(jù)分值及限制數(shù)量查詢
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//從zset中刪除指定成員
ZREM key member [member …]

我們采用redis基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)的zset結(jié)構(gòu),采用score來存儲我們目標(biāo)發(fā)送時間的數(shù)值,整體處理流程如下:

  • 第一步數(shù)據(jù)存儲:9:10分從kafka接收了一條a的訂單消息,要求30分鐘后進(jìn)行發(fā)貨通知,那我們就將當(dāng)前時間加上30分鐘然后轉(zhuǎn)為時間戳作為a的score,key為a的訂單號存入redis中。代碼如下:
public void onMessage(String topic, String message) {
  String orderId;
		int delayTime = 0;
  try {
   Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
   }.getType());
   if (msgMap.isEmpty()) {
    return;
   }
   LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	 orderId = msgMap.get("orderId");
   if(StringUtils.isNotEmpty(orderId)){
    delayTime = Integer.parseInt(msgMap.get("delayTime"));
    Calendar calendar = Calendar.getInstance();
    //計算出預(yù)計發(fā)送時間
    calendar.add(Calendar.MINUTE, delayTime);
    long sendTime = calendar.getTimeInMillis();
    RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
    LOGGER.info("orderId:{}---放入redis中等待發(fā)送---sendTime:{}", ---orderId:{}, sendTime);
   }
  } catch (Exception e) {
   LOGGER.info("onMessage 延時發(fā)送異常:{}", e);
  }
 }
  • 第二步數(shù)據(jù)處理:另起一個線程具體調(diào)度時間根據(jù)業(yè)務(wù)需求來定,我這里3分鐘執(zhí)行一次,內(nèi)部邏輯:從redis中取出一定量的zset數(shù)據(jù),如何取呢,使用zset的zrangeByScore方法,根據(jù)數(shù)據(jù)的score進(jìn)行排序,當(dāng)然可以帶上時間段,這里從0到現(xiàn)在,來進(jìn)行消費(fèi),需要注意的一點(diǎn)是,在取出數(shù)據(jù)后我們需要用zrem方法將取出的數(shù)據(jù)從zset中刪除,防止其他線程重復(fù)消費(fèi)數(shù)據(jù)。在此之后進(jìn)行接下來的發(fā)貨通知等相關(guān)邏輯。代碼如下:
public void run(){
  //獲取批量大小
  int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
  try {
   //批量獲取離發(fā)送時間最近的orderNum條數(shù)據(jù)
	 Calendar calendar = Calendar.getInstance();
	 long now = calendar.getTimeInMillis();
	 //獲取無限早到現(xiàn)在的事件key(防止上次批量數(shù)量小于放入數(shù)量,存在歷史數(shù)據(jù)未消費(fèi)情況)
	 Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
	 LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
   if (CollectionUtils.isNotEmpty(orders)){
    //刪除key 防止重復(fù)發(fā)送
    for (String orderId : orderIds) {
     RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
    }
	  //接下來執(zhí)行發(fā)送等業(yè)務(wù)邏輯     
   }
  } catch (Exception e) {
   LOGGER.warn("task.run exception:{}", e);
  }
 }

至此完成了依賴redis和線程完成了延時發(fā)送的功能。

結(jié)語

那么對上面兩種不同的實(shí)現(xiàn)方式進(jìn)行一下優(yōu)缺點(diǎn)比較:

  • 第一種方式實(shí)現(xiàn)簡單,不依賴外部組件,能夠快速的實(shí)現(xiàn)目標(biāo)功能,但缺點(diǎn)也很明顯,需要在特定的場景下使用,如果是我這種消息量大的情況下使用很可能是有問題,當(dāng)然在數(shù)據(jù)源消息不多的情況下不失為好的選擇。
  • 第二種方式實(shí)現(xiàn)稍微復(fù)雜一點(diǎn),但是能夠適應(yīng)消息量大的場景,采用redis的zset作為了“中間件”的效果,并且?guī)椭覀冞M(jìn)行延時的功能實(shí)現(xiàn)能夠較好的適應(yīng)高并發(fā)場景,缺點(diǎn)在于在編寫的過程中需要考慮實(shí)際的因素較多,例如線程的執(zhí)行周期時間,發(fā)送可能會有一定時間的延遲,批量數(shù)據(jù)大小的設(shè)置等等。

綜上是本人這次延時功能的實(shí)現(xiàn)過程的兩種實(shí)現(xiàn)方式的總結(jié),具體采用哪種方式還需大家根據(jù)實(shí)際情況選擇,希望能給大家?guī)韼椭?。ps:由于本人的技術(shù)能力有限,文章中可能出現(xiàn)技術(shù)描述不準(zhǔn)確或者錯誤的情況懇請各位大佬指出,我立馬進(jìn)行改正,避免誤導(dǎo)大家,謝謝!

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。

相關(guān)文章

  • java.mail實(shí)現(xiàn)發(fā)送郵件

    java.mail實(shí)現(xiàn)發(fā)送郵件

    這篇文章主要為大家詳細(xì)介紹了java.mail實(shí)現(xiàn)發(fā)送郵件,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-02-02
  • javaWeb使用驗(yàn)證碼實(shí)現(xiàn)簡單登錄

    javaWeb使用驗(yàn)證碼實(shí)現(xiàn)簡單登錄

    這篇文章主要為大家詳細(xì)介紹了javaWeb使用驗(yàn)證碼實(shí)現(xiàn)簡單登錄,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-08-08
  • 前端與RabbitMQ實(shí)時消息推送未讀消息小紅點(diǎn)實(shí)現(xiàn)示例

    前端與RabbitMQ實(shí)時消息推送未讀消息小紅點(diǎn)實(shí)現(xiàn)示例

    這篇文章主要為大家介紹了前端與RabbitMQ實(shí)時消息推送未讀消息小紅點(diǎn)實(shí)現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • spring?boot項目中如何使用nacos作為配置中心

    spring?boot項目中如何使用nacos作為配置中心

    這篇文章主要介紹了spring?boot項目中如何使用nacos作為配置中心問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • Java如何判斷一個整數(shù)有多少位

    Java如何判斷一個整數(shù)有多少位

    這篇文章主要介紹了Java如何判斷一個整數(shù)有多少位問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Java matches類,Pattern類及matcher類用法示例

    Java matches類,Pattern類及matcher類用法示例

    這篇文章主要介紹了Java matches類,Pattern類及matcher類用法,結(jié)合實(shí)例形式分析了java matches類,Pattern類及matcher類針對字符串常見操作技巧與相關(guān)注意事項,需要的朋友可以參考下
    2019-03-03
  • Java類的加載連接和初始化實(shí)例分析

    Java類的加載連接和初始化實(shí)例分析

    這篇文章主要介紹了Java類的加載連接和初始化,結(jié)合具體實(shí)例形式分析了java類的加載、連接、初始化相關(guān)原理與實(shí)現(xiàn)技巧,需要的朋友可以參考下
    2019-07-07
  • Java開發(fā)學(xué)習(xí)之Bean的作用域和生命周期詳解

    Java開發(fā)學(xué)習(xí)之Bean的作用域和生命周期詳解

    這篇文章主要介紹了淺談Spring中Bean的作用域,生命周期和注解,從創(chuàng)建到消亡的完整過程,例如人從出生到死亡的整個過程就是一個生命周期。本文將通過示例為大家詳細(xì)講講,感興趣的可以學(xué)習(xí)一下
    2022-06-06
  • java中this的用法示例(關(guān)鍵字this)

    java中this的用法示例(關(guān)鍵字this)

    這篇文章主要介紹了java中this的用法示例(關(guān)鍵字this),需要的朋友可以參考下
    2014-03-03
  • Java 中的字符串常量池詳解

    Java 中的字符串常量池詳解

    本文主要介紹Java中的字符串常量池的知識,這里整理了相關(guān)資料及簡單示例代碼幫助大家學(xué)習(xí)理解此部分的知識,有需要的小伙伴可以參考下
    2016-09-09

最新評論