Java CompletableFuture如何實(shí)現(xiàn)超時(shí)功能
由于網(wǎng)絡(luò)波動(dòng)或者連接節(jié)點(diǎn)下線等種種問題,對(duì)于大多數(shù)網(wǎng)絡(luò)異步任務(wù)的執(zhí)行通常會(huì)進(jìn)行超時(shí)限制,在異步編程中是一個(gè)常見的問題。本文主要討論實(shí)現(xiàn)超時(shí)功能的基本思路以及CompletableFuture(之后簡(jiǎn)稱CF)是如何通過代碼實(shí)現(xiàn)超時(shí)功能的。
基本思路
- 兩個(gè)任務(wù),兩個(gè)線程:原有任務(wù),超時(shí)任務(wù)
- 原有的任務(wù)正常執(zhí)行,寫入正常結(jié)果,原有任務(wù)執(zhí)行成功取消超時(shí)任務(wù)
- 超時(shí)時(shí)取消原有任務(wù),寫入結(jié)果為超時(shí)異?;蛘吣J(rèn)值
- 競(jìng)態(tài)條件下保證結(jié)果寫入的原子性和只寫一次
CompletableFuture 的實(shí)現(xiàn)
1. 基本實(shí)現(xiàn)流程
// JDK9新增的超時(shí)方法 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit))); return this; } // CF的內(nèi)部類 static final class Timeout implements Runnable { final CompletableFuture<?> f; Timeout(CompletableFuture<?> f) { this.f = f; } public void run() { if (f != null && !f.isDone()) f.completeExceptionally(new TimeoutException()); } }
分析代碼得知,whenComplete方法添加了正常結(jié)束的回調(diào),取消超時(shí)任務(wù)。
超時(shí)任務(wù)通過Delayer.delay創(chuàng)建,超時(shí)時(shí)執(zhí)行Timeout::run方法,即寫入結(jié)果為TimeoutException。
下面來看下Dalayer的具體實(shí)現(xiàn):
/** * Singleton delay scheduler, used only for starting and * cancelling tasks. */ static final class Delayer { static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) { return delayer.schedule(command, delay, unit); } static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); // 守護(hù)線程,當(dāng)主線程關(guān)閉時(shí),自身也關(guān)閉 t.setDaemon(true); t.setName("CompletableFutureDelayScheduler"); return t; } } static final ScheduledThreadPoolExecutor delayer; static { (delayer = new ScheduledThreadPoolExecutor( 1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true); } }
Delayer是一個(gè)單例對(duì)象,專門用于執(zhí)行延遲任務(wù),減少了內(nèi)存占用。ScheduledThreadPoolExecutor 的配置為單線程,設(shè)置了removeOnCancelPolicy
,表示取消延遲任務(wù)時(shí),任務(wù)從延遲隊(duì)列刪除。這里的延遲隊(duì)列為默認(rèn)的執(zhí)行器實(shí)現(xiàn):
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }
ScheduledThreadPoolExecutor 底層使用延遲隊(duì)列DelayedWorkQueue
,延遲隊(duì)列底層依賴于索引優(yōu)先隊(duì)列,刪除操作的時(shí)間復(fù)雜度為o(logn)。
下面來看下Canceller的具體實(shí)現(xiàn):
static final class Canceller implements BiConsumer<Object, Throwable> { final Future<?> f; Canceller(Future<?> f) { this.f = f; } public void accept(Object ignore, Throwable ex) { if (f != null && !f.isDone()) f.cancel(false); } }
canceller實(shí)際上是一個(gè)回調(diào)函數(shù),原有任務(wù)完成后觸發(fā),會(huì)取消相關(guān)超時(shí)任務(wù)。
2. 靜態(tài)條件分析
下面是寫入CF的實(shí)現(xiàn)代碼片段:
// 超時(shí)結(jié)束 if (f != null && !f.isDone()) f.completeExceptionally(new TimeoutException()); // 取消任務(wù) if (f != null && !f.isDone()) f.cancel(false); // CF 原有任務(wù)的寫入不由orTimeout方法控制,以下為一個(gè)示例 Thread.sleep(1000); f.complete(u);
對(duì)于CF的檢查實(shí)際上不能保證原子性,因?yàn)檫@種檢查-再計(jì)算的模式需要同步塊的保護(hù),而CF底層并沒有這種實(shí)現(xiàn)。所以,if語句檢查任務(wù)未完成,之后執(zhí)行代碼時(shí),任務(wù)可能已經(jīng)完成了。不過這種檢查也有一定的好處,因?yàn)镃F保證了結(jié)果寫入后,isDone方法必然為true,從而避免執(zhí)行不必要的代碼。
completeExceptionally
方法和 complete
方法可能同時(shí)執(zhí)行,CF 通過CAS操作保證了結(jié)果寫入的原子性。
// 異常結(jié)果實(shí)現(xiàn) final boolean internalComplete(Object r) { // CAS from null to r return RESULT.compareAndSet(this, null, r); } // 正常結(jié)果實(shí)現(xiàn) final boolean completeValue(T t) { return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); } public boolean isDone() { return result != null; }
3. 內(nèi)存泄露 bug
在 JDK21之前的CF實(shí)現(xiàn)中,存在內(nèi)存泄露的bug,具體描述詳見 https://bugs.openjdk.org/browse/JDK-8303742 ,目前筆者僅在JDK21 中發(fā)現(xiàn)代碼已修復(fù)(不考慮非LTS版本)。作為bug,后續(xù)發(fā)布的 JDK 子版本可能會(huì)修復(fù)這個(gè)問題。
這個(gè)bug在如下代碼中:
// 取消任務(wù),JDK21之前的實(shí)現(xiàn)會(huì)檢查異常結(jié)果 if (ex == null && f != null && !f.isDone()) f.cancel(false);
當(dāng)正常任務(wù)異常結(jié)束時(shí),不會(huì)取消延遲隊(duì)列中的任務(wù),最終會(huì)導(dǎo)致內(nèi)存泄露。若項(xiàng)目中存在多個(gè)長(zhǎng)時(shí)間超時(shí)CF任務(wù),內(nèi)存泄露的情況會(huì)更明顯。
public class LeakDemo { public static void main(String[] args) { while (true) { new CompletableFuture<>().orTimeout(1, TimeUnit.HOURS).completeExceptionally(new Exception()); } } }
執(zhí)行以上代碼會(huì)報(bào)OOM錯(cuò)誤,你可以在自己的編程環(huán)境中進(jìn)行測(cè)試。
4. JDK8如何實(shí)現(xiàn)超時(shí)任務(wù)
JDK8中CompletableFuture并不支持超時(shí)任務(wù),筆者推薦使用CFFU類庫,其是CF的增強(qiáng)類庫,支持在JDK8環(huán)境中使用高版本的功能。另一種方案使用 Guava 提供的 ListenableFuture。當(dāng)然你也可以參照J(rèn)DK21中的代碼自己實(shí)現(xiàn)。
到此這篇關(guān)于Java CompletableFuture如何實(shí)現(xiàn)超時(shí)功能的文章就介紹到這了,更多相關(guān)Java CompletableFuture超時(shí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA中的yml文件與properties互相轉(zhuǎn)換
這篇文章主要介紹了IDEA中的yml文件與properties互相轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10IDEA中Javaweb項(xiàng)目圖片加載不出來解決方案
在IDEA中能夠正常的預(yù)覽到圖片,但是在生成項(xiàng)目的war包時(shí),項(xiàng)目的目錄結(jié)構(gòu)卻會(huì)發(fā)生變化,所以無法訪問圖片,本文主要介紹了IDEA中Javaweb項(xiàng)目圖片加載不出來解決方案,感興趣的可以了解一下2023-10-10Spring中任務(wù)調(diào)度之解讀@Scheduled和@Schedules注解的使用
這篇文章主要介紹了Spring中任務(wù)調(diào)度之解讀@Scheduled和@Schedules注解的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04SpringBoot中的application.properties無法加載問題定位技巧
這篇文章主要介紹了SpringBoot中的application.properties無法加載問題定位技巧,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Netty搭建WebSocket服務(wù)器實(shí)戰(zhàn)教程
這篇文章主要介紹了Netty搭建WebSocket服務(wù)器實(shí)戰(zhàn),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-03-03Java經(jīng)典面試題匯總:網(wǎng)絡(luò)編程
本篇總結(jié)的是Java 網(wǎng)絡(luò)編程相關(guān)的面試題,后續(xù)會(huì)持續(xù)更新,希望我的分享可以幫助到正在備戰(zhàn)面試的實(shí)習(xí)生或者已經(jīng)工作的同行,如果發(fā)現(xiàn)錯(cuò)誤還望大家多多包涵,不吝賜教,謝謝2021-07-07JSON 與對(duì)象、集合之間的轉(zhuǎn)換的示例
在開發(fā)過程中,經(jīng)常需要和別的系統(tǒng)交換數(shù)據(jù),數(shù)據(jù)交換的格式有XML、JSON等,JSON作為一個(gè)輕量級(jí)的數(shù)據(jù)格式比xml效率要高,本篇文章主要介紹了JSON 與 對(duì)象 、集合 之間的轉(zhuǎn)換,有興趣的可以了解一下。2017-01-01