詳解Java使用雙異步后如何保證數(shù)據(jù)一致性
一、前情提要
在上一篇文章中,我們通過雙異步的方式導(dǎo)入了10萬行的Excel,有個小伙伴在評論區(qū)問我,如何保證插入后數(shù)據(jù)的一致性呢?
很簡單,通過對比Excel文件行數(shù)和入庫數(shù)量是否相等即可。
那么,如何獲取異步線程的返回值呢?
二、通過Future獲取異步返回值
我們可以通過給異步方法添加Future返回值的方式獲取結(jié)果。
FutureTask 除了實現(xiàn) Future 接口外,還實現(xiàn)了 Runnable 接口。因此,F(xiàn)utureTask 可以交給 Executor 執(zhí)行,也可以由調(diào)用線程直接執(zhí)行FutureTask.run()。
1、FutureTask 是基于 AbstractQueuedSynchronizer實現(xiàn)的
AbstractQueuedSynchronizer簡稱AQS,它是一個同步框架,它提供通用機制來原子性管理同步狀態(tài)、阻塞和喚醒線程,以及 維護被阻塞線程的隊列。 基于 AQS 實現(xiàn)的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。
基于 AQS實現(xiàn)的同步器包含兩種操作:
- acquire,阻塞調(diào)用線程,直到AQS的狀態(tài)允許這個線程繼續(xù)執(zhí)行,在FutureTask中,get()就是這個方法;
- release,改變AQS的狀態(tài),使state變?yōu)榉亲枞麪顟B(tài),在FutureTask中,可以通過run()和cancel()實現(xiàn)。
2、FutureTask執(zhí)行流程
執(zhí)行@Async異步方法;
建立新線程async-executor-X,執(zhí)行Runnable的run()方法,(FutureTask實現(xiàn)RunnableFuture,RunnableFuture實現(xiàn)Runnable);
判斷狀態(tài)state;
- 如果未新建或者不處于AQS,直接返回;
- 否則進入COMPLETING狀態(tài),執(zhí)行異步線程代碼;
如果執(zhí)行cancel()方法改變AQS的狀態(tài)時,會喚醒AQS等待隊列中的第一個線程線程async-executor-1;
線程async-executor-1被喚醒后
- 將自己從AQS隊列中移除;
- 然后喚醒next線程async-executor-2;
- 改變線程async-executor-1的state;
- 等待get()線程取值。
next等待線程被喚醒后,循環(huán)線程async-executor-1的步驟
- 被喚醒
- 從AQS隊列中移除
- 喚醒next線程
- 改變異步線程狀態(tài)
新建線程async-executor-N,監(jiān)聽異步方法的state
- 如果處于EXCEPTIONAL以上狀態(tài),拋出異常;
- 如果處于COMPLETING狀態(tài),加入AQS隊列等待;
- 如果處于NORMAL狀態(tài),返回結(jié)果;
3、get()方法執(zhí)行流程
get()方法通過判斷狀態(tài)state觀測異步線程是否已結(jié)束,如果結(jié)束直接將結(jié)果返回,否則會將等待節(jié)點扔進等待隊列自旋,阻塞住線程。
自旋直至異步線程執(zhí)行完畢,獲取另一邊的線程計算出結(jié)果或取消后,將等待隊列里的所有節(jié)點依次喚醒并移除隊列。
- 如果state小于等于COMPLETING,表示任務(wù)還在執(zhí)行中;
- 如果線程被中斷,從等待隊列中移除等待節(jié)點WaitNode,拋出中斷異常;
- 如果state大于COMPLETING;
- 如果已有等待節(jié)點WaitNode,將線程置空;
- 返回當(dāng)前狀態(tài);
- 如果任務(wù)正在執(zhí)行,讓出時間片;
- 如果還未構(gòu)造等待節(jié)點,則new一個新的等待節(jié)點;
- 如果未入隊列,CAS嘗試入隊;
- 如果有超時時間參數(shù);
- 計算超時時間;
- 如果超時,則從等待隊列中移除等待節(jié)點WaitNode,返回當(dāng)前狀態(tài)state;
- 阻塞隊列nanos毫秒。
- 否則阻塞隊列;
- 如果state大于COMPLETING;
- 如果執(zhí)行完畢,返回結(jié)果;
- 如果大于等于取消狀態(tài),則拋出異常。
很多小朋友對讀源碼,嗤之以鼻,工作3年、5年,還是沒認真讀過任何源碼,覺得讀了也沒啥用,或者讀了也看不懂~
其實,只要把源碼的執(zhí)行流程通過畫圖的形式呈現(xiàn)出來,你就會幡然醒悟,原來是這樣的~
簡而言之:
1. 如果異步線程還沒執(zhí)行完,則進入CAS自旋; 2. 其它線程獲取結(jié)果或取消后,重新喚醒CAS隊列中等待的線程; 3. 再通過get()判斷狀態(tài)state; 4. 直至返回結(jié)果或(取消、超時、異常)為止。
三、FutureTask源碼具體分析
1、FutureTask源碼
通過定義整形狀態(tài)值,判斷state大小,這個思想很有意思,值得學(xué)習(xí)。
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
public class FutureTask<V> implements RunnableFuture<V> { // 最初始的狀態(tài)是new 新建狀態(tài) private volatile int state; private static final int NEW = 0; // 新建狀態(tài) private static final int COMPLETING = 1; // 完成中 private static final int NORMAL = 2; // 正常執(zhí)行完 private static final int EXCEPTIONAL = 3; // 異常 private static final int CANCELLED = 4; // 取消 private static final int INTERRUPTING = 5; // 正在中斷 private static final int INTERRUPTED = 6; // 已中斷 public V get() throws InterruptedException, ExecutionException { int s = state; // 任務(wù)還在執(zhí)行中 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 線程被中斷,從等待隊列中移除等待節(jié)點WaitNode,拋出中斷異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 任務(wù)已執(zhí)行完畢或取消 if (s > COMPLETING) { // 如果已有等待節(jié)點WaitNode,將線程置空 if (q != null) q.thread = null; return s; } // 任務(wù)正在執(zhí)行,讓出時間片 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 還未構(gòu)造等待節(jié)點,則new一個新的等待節(jié)點 else if (q == null) q = new WaitNode(); // 未入隊列,CAS嘗試入隊 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有超時時間參數(shù) else if (timed) { // 計算超時時間 nanos = deadline - System.nanoTime(); // 如果超時,則從等待隊列中移除等待節(jié)點WaitNode,返回當(dāng)前狀態(tài)state if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞隊列nanos毫秒 LockSupport.parkNanos(this, nanos); } else // 阻塞隊列 LockSupport.park(this); } } private V report(int s) throws ExecutionException { // 獲取outcome中記錄的返回結(jié)果 Object x = outcome; // 如果執(zhí)行完畢,返回結(jié)果 if (s == NORMAL) return (V)x; // 如果大于等于取消狀態(tài),則拋出異常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } }
2、將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中;
@Async("async-executor") public void readXls(String filePath, String filename) { try { // 此代碼為簡化關(guān)鍵性代碼 List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync(); futureList.add(sumFuture); } }catch (Exception e){ logger.error("readXlsCacheAsync---插入數(shù)據(jù)異常:",e); } }
@Async("async-executor") public Future<Integer> readXlsCacheAsync() { try { // 此代碼為簡化關(guān)鍵性代碼 return new AsyncResult<>(sum); }catch (Exception e){ return new AsyncResult<>(0); } }
3、通過Future<Integer>.get()獲取返回值:
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow){ int[] futureSumArr = new int[futureList.size()]; for (int i = 0;i<futureList.size();i++) { try { Future<Integer> future = futureList.get(i); while (true) { if (future.isDone() && !future.isCancelled()) { Integer futureSum = future.get(); logger.info("獲取Future返回值成功"+"----Future:" + future + ",Result:" + futureSum); futureSumArr[i] += futureSum; break; } else { logger.info("Future正在執(zhí)行---獲取Future返回值中---等待3秒"); Thread.sleep(3000); } } } catch (Exception e) { logger.error("獲取Future返回值異常: ", e); } } boolean insertFlag = getInsertSum(futureSumArr, excelRow); logger.info("獲取所有異步線程Future的返回值成功,Excel插入結(jié)果="+insertFlag); return insertFlag; }
4、這里也可以通過新線程+Future獲取Future返回值
不過感覺多此一舉了,就當(dāng)練習(xí)Future異步取返回值了~
public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) { ExecutorService service = Executors.newSingleThreadExecutor(); final boolean[] insertFlag = {false}; service.execute(new Runnable() { public void run() { try { insertFlag[0] = getFutureResult(futureList, excelRow); } catch (Exception e) { logger.error("新線程+Future獲取Future返回值異常: ", e); insertFlag[0] = false; } } }); service.shutdown(); return new AsyncResult<>(insertFlag[0]); }
獲取異步線程結(jié)果后,我們可以通過添加事務(wù)的方式,實現(xiàn)Excel入庫操作的數(shù)據(jù)一致性。
但Future會造成主線程的阻塞,這個就很不友好了,有沒有更優(yōu)解呢?
以上就是詳解Java使用雙異步后如何保證數(shù)據(jù)一致性的詳細內(nèi)容,更多關(guān)于Java雙異步如何保證數(shù)據(jù)一致性的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在 Spring Boot 3 中接入生成式 AI的操作方法
本文介紹了如何在SpringBoot3中集成生成式AI,以O(shè)penAI的GPT模型為例,通過代碼示例展示了如何實現(xiàn),SpringBoot3的優(yōu)勢和OpenAI的生成式AI技術(shù)結(jié)合,為開發(fā)者提供了高效集成生成式AI的方法,感興趣的朋友跟隨小編一起看看吧2025-01-01Spring @Retryable注解輕松搞定循環(huán)重試功能
spring系列的spring-retry是另一個實用程序模塊,可以幫助我們以標準方式處理任何特定操作的重試。在spring-retry中,所有配置都是基于簡單注釋的。本文主要介紹了Spring@Retryable注解如何輕松搞定循環(huán)重試功能,有需要的朋友可以參考一下2023-04-04基于Spring Boot使用JpaRepository刪除數(shù)據(jù)時的注意事項
這篇文章主要介紹了Spring Boot使用JpaRepository刪除數(shù)據(jù)時的注意事項,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06手把手教你在eclipse創(chuàng)建第一個java?web項目并運行
Eclipse是用來做開發(fā)的自由集成開發(fā)環(huán)境,這也是很多java程序員會使用的開發(fā)環(huán)境,所以可以使用eclipse創(chuàng)建javaweb項目,下面這篇文章主要給大家介紹了關(guān)于如何在eclipse創(chuàng)建第一個java?web項目并運行的相關(guān)資料,需要的朋友可以參考下2023-02-02