Java CompletableFuture如何實(shí)現(xiàn)超時功能
由于網(wǎng)絡(luò)波動或者連接節(jié)點(diǎn)下線等種種問題,對于大多數(shù)網(wǎng)絡(luò)異步任務(wù)的執(zhí)行通常會進(jìn)行超時限制,在異步編程中是一個常見的問題。本文主要討論實(shí)現(xiàn)超時功能的基本思路以及CompletableFuture(之后簡稱CF)是如何通過代碼實(shí)現(xiàn)超時功能的。
基本思路
- 兩個任務(wù),兩個線程:原有任務(wù),超時任務(wù)
- 原有的任務(wù)正常執(zhí)行,寫入正常結(jié)果,原有任務(wù)執(zhí)行成功取消超時任務(wù)
- 超時時取消原有任務(wù),寫入結(jié)果為超時異?;蛘吣J(rèn)值
- 競態(tài)條件下保證結(jié)果寫入的原子性和只寫一次
CompletableFuture 的實(shí)現(xiàn)
1. 基本實(shí)現(xiàn)流程
// JDK9新增的超時方法 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit))); return this; } // CF的內(nèi)部類 static final class Timeout implements Runnable { final CompletableFuture<?> f; Timeout(CompletableFuture<?> f) { this.f = f; } public void run() { if (f != null && !f.isDone()) f.completeExceptionally(new TimeoutException()); } }
分析代碼得知,whenComplete方法添加了正常結(jié)束的回調(diào),取消超時任務(wù)。
超時任務(wù)通過Delayer.delay創(chuàng)建,超時時執(zhí)行Timeout::run方法,即寫入結(jié)果為TimeoutException。
下面來看下Dalayer的具體實(shí)現(xiàn):
/** * Singleton delay scheduler, used only for starting and * cancelling tasks. */ static final class Delayer { static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) { return delayer.schedule(command, delay, unit); } static final class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread t = new Thread(r); // 守護(hù)線程,當(dāng)主線程關(guān)閉時,自身也關(guān)閉 t.setDaemon(true); t.setName("CompletableFutureDelayScheduler"); return t; } } static final ScheduledThreadPoolExecutor delayer; static { (delayer = new ScheduledThreadPoolExecutor( 1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true); } }
Delayer是一個單例對象,專門用于執(zhí)行延遲任務(wù),減少了內(nèi)存占用。ScheduledThreadPoolExecutor 的配置為單線程,設(shè)置了removeOnCancelPolicy
,表示取消延遲任務(wù)時,任務(wù)從延遲隊(duì)列刪除。這里的延遲隊(duì)列為默認(rèn)的執(zhí)行器實(shí)現(xiàn):
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }
ScheduledThreadPoolExecutor 底層使用延遲隊(duì)列DelayedWorkQueue
,延遲隊(duì)列底層依賴于索引優(yōu)先隊(duì)列,刪除操作的時間復(fù)雜度為o(logn)。
下面來看下Canceller的具體實(shí)現(xiàn):
static final class Canceller implements BiConsumer<Object, Throwable> { final Future<?> f; Canceller(Future<?> f) { this.f = f; } public void accept(Object ignore, Throwable ex) { if (f != null && !f.isDone()) f.cancel(false); } }
canceller實(shí)際上是一個回調(diào)函數(shù),原有任務(wù)完成后觸發(fā),會取消相關(guān)超時任務(wù)。
2. 靜態(tài)條件分析
下面是寫入CF的實(shí)現(xiàn)代碼片段:
// 超時結(jié)束 if (f != null && !f.isDone()) f.completeExceptionally(new TimeoutException()); // 取消任務(wù) if (f != null && !f.isDone()) f.cancel(false); // CF 原有任務(wù)的寫入不由orTimeout方法控制,以下為一個示例 Thread.sleep(1000); f.complete(u);
對于CF的檢查實(shí)際上不能保證原子性,因?yàn)檫@種檢查-再計算的模式需要同步塊的保護(hù),而CF底層并沒有這種實(shí)現(xiàn)。所以,if語句檢查任務(wù)未完成,之后執(zhí)行代碼時,任務(wù)可能已經(jīng)完成了。不過這種檢查也有一定的好處,因?yàn)镃F保證了結(jié)果寫入后,isDone方法必然為true,從而避免執(zhí)行不必要的代碼。
completeExceptionally
方法和 complete
方法可能同時執(zhí)行,CF 通過CAS操作保證了結(jié)果寫入的原子性。
// 異常結(jié)果實(shí)現(xiàn) final boolean internalComplete(Object r) { // CAS from null to r return RESULT.compareAndSet(this, null, r); } // 正常結(jié)果實(shí)現(xiàn) final boolean completeValue(T t) { return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); } public boolean isDone() { return result != null; }
3. 內(nèi)存泄露 bug
在 JDK21之前的CF實(shí)現(xiàn)中,存在內(nèi)存泄露的bug,具體描述詳見 https://bugs.openjdk.org/browse/JDK-8303742 ,目前筆者僅在JDK21 中發(fā)現(xiàn)代碼已修復(fù)(不考慮非LTS版本)。作為bug,后續(xù)發(fā)布的 JDK 子版本可能會修復(fù)這個問題。
這個bug在如下代碼中:
// 取消任務(wù),JDK21之前的實(shí)現(xiàn)會檢查異常結(jié)果 if (ex == null && f != null && !f.isDone()) f.cancel(false);
當(dāng)正常任務(wù)異常結(jié)束時,不會取消延遲隊(duì)列中的任務(wù),最終會導(dǎo)致內(nèi)存泄露。若項(xiàng)目中存在多個長時間超時CF任務(wù),內(nèi)存泄露的情況會更明顯。
public class LeakDemo { public static void main(String[] args) { while (true) { new CompletableFuture<>().orTimeout(1, TimeUnit.HOURS).completeExceptionally(new Exception()); } } }
執(zhí)行以上代碼會報OOM錯誤,你可以在自己的編程環(huán)境中進(jìn)行測試。
4. JDK8如何實(shí)現(xiàn)超時任務(wù)
JDK8中CompletableFuture并不支持超時任務(wù),筆者推薦使用CFFU類庫,其是CF的增強(qiáng)類庫,支持在JDK8環(huán)境中使用高版本的功能。另一種方案使用 Guava 提供的 ListenableFuture。當(dāng)然你也可以參照J(rèn)DK21中的代碼自己實(shí)現(xiàn)。
到此這篇關(guān)于Java CompletableFuture如何實(shí)現(xiàn)超時功能的文章就介紹到這了,更多相關(guān)Java CompletableFuture超時內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA中的yml文件與properties互相轉(zhuǎn)換
這篇文章主要介紹了IDEA中的yml文件與properties互相轉(zhuǎn)換方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10IDEA中Javaweb項(xiàng)目圖片加載不出來解決方案
在IDEA中能夠正常的預(yù)覽到圖片,但是在生成項(xiàng)目的war包時,項(xiàng)目的目錄結(jié)構(gòu)卻會發(fā)生變化,所以無法訪問圖片,本文主要介紹了IDEA中Javaweb項(xiàng)目圖片加載不出來解決方案,感興趣的可以了解一下2023-10-10Spring中任務(wù)調(diào)度之解讀@Scheduled和@Schedules注解的使用
這篇文章主要介紹了Spring中任務(wù)調(diào)度之解讀@Scheduled和@Schedules注解的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-04-04SpringBoot中的application.properties無法加載問題定位技巧
這篇文章主要介紹了SpringBoot中的application.properties無法加載問題定位技巧,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05Netty搭建WebSocket服務(wù)器實(shí)戰(zhàn)教程
這篇文章主要介紹了Netty搭建WebSocket服務(wù)器實(shí)戰(zhàn),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-03-03Java經(jīng)典面試題匯總:網(wǎng)絡(luò)編程
本篇總結(jié)的是Java 網(wǎng)絡(luò)編程相關(guān)的面試題,后續(xù)會持續(xù)更新,希望我的分享可以幫助到正在備戰(zhàn)面試的實(shí)習(xí)生或者已經(jīng)工作的同行,如果發(fā)現(xiàn)錯誤還望大家多多包涵,不吝賜教,謝謝2021-07-07