詳解Java中CountDownLatch異步轉(zhuǎn)同步工具類
使用場景
由于公司業(yè)務(wù)需求,需要對接socket、MQTT等消息隊列。
眾所周知 socket 是雙向通信,socket的回復(fù)是人為定義的,客戶端推送消息給服務(wù)端,服務(wù)端的回復(fù)是兩條線。無法像http請求有回復(fù)。
下發(fā)指令給硬件時,需要校驗此次數(shù)據(jù)下發(fā)是否成功。
用戶體驗而言,點擊按鈕就要知道此次的下發(fā)成功或失敗。
如上圖模型,
第一種方案使用Tread.sleep
優(yōu)點:占用資源小,放棄當(dāng)前cpu資源
缺點: 回復(fù)速度快,休眠時間過長,仍然需要等待休眠結(jié)束才能返回,響應(yīng)速度是固定的,無法及時響應(yīng)第二種方案使用CountDownLatch
package com.lzy.demo.delay; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CountDownLatchPool { //countDonw池 private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>(); //延遲隊列 private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>(); private volatile static boolean flag =false; //單線程池 private final static ExecutorService t = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1)); public static void addCountDownLatch(Integer messageId) { CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId,new CountDownLatch(1) ); if(countDownLatch == null){ countDownLatch = countDownLatchMap.get(messageId); } try { addDelayQueue(messageId); countDownLatch.await(3L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("阻塞等待結(jié)束~~~~~~"); } public static void removeCountDownLatch(Integer messageId){ CountDownLatch countDownLatch = countDownLatchMap.get(messageId); if(countDownLatch == null) return; countDownLatch.countDown(); countDownLatchMap.remove(messageId); System.out.println("清除Map數(shù)據(jù)"+countDownLatchMap); } private static void addDelayQueue(Integer messageId){ delayQueue.add(new MessageDelayQueueUtil(messageId)); clearMessageId(); } private static void clearMessageId(){ synchronized (CountDownLatchPool.class){ if(flag){ return; } flag = true; } t.execute(()->{ while (delayQueue.size() > 0){ System.out.println("進入線程并開始執(zhí)行"); try { MessageDelayQueueUtil take = delayQueue.take(); Integer messageId1 = take.getMessageId(); removeCountDownLatch(messageId1); System.out.println("清除隊列數(shù)據(jù)"+messageId1); } catch (InterruptedException e) { e.printStackTrace(); } } flag = false; System.out.println("結(jié)束end----"); }); } public static void main(String[] args) throws InterruptedException { /* 測試超時清空map new Thread(()->addCountDownLatch(1)).start(); new Thread(()->addCountDownLatch(2)).start(); new Thread(()->addCountDownLatch(3)).start(); */ //提前創(chuàng)建線程,清空countdown new Thread(()->{ try { Thread.sleep(500L); removeCountDownLatch(1); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); //開始阻塞 addCountDownLatch(1); //通過調(diào)整上面的sleep我們發(fā)現(xiàn)阻塞市場取決于countDownLatch.countDown()執(zhí)行時間 System.out.println("阻塞結(jié)束----"); } } class MessageDelayQueueUtil implements Delayed { private Integer messageId; private long avaibleTime; public Integer getMessageId() { return messageId; } public void setMessageId(Integer messageId) { this.messageId = messageId; } public long getAvaibleTime() { return avaibleTime; } public void setAvaibleTime(long avaibleTime) { this.avaibleTime = avaibleTime; } public MessageDelayQueueUtil(Integer messageId){ this.messageId = messageId; //avaibleTime = 當(dāng)前時間+ delayTime //重試3次,每次3秒+1秒的延遲 this.avaibleTime=3000*3+1000 + System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { long diffTime= avaibleTime- System.currentTimeMillis(); return unit.convert(diffTime,TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { //compareTo用在DelayedUser的排序 return (int)(this.avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime()); } }
由于socket并不確定每次都會有數(shù)據(jù)返回,所以map的數(shù)據(jù)會越來越大,最終導(dǎo)致內(nèi)存溢出
需定時清除map內(nèi)的無效數(shù)據(jù)。
可以使用DelayedQuene延遲隊列來處理,相當(dāng)于給對象添加一個過期時間
使用方法 addCountDownLatch 等待消息,異步回調(diào)消息清空removeCountDownLatch
到此這篇關(guān)于詳解Java中CountDownLatch異步轉(zhuǎn)同步工具類的文章就介紹到這了,更多相關(guān)CountDownLatch異步轉(zhuǎn)同步工具類內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實現(xiàn)浮點數(shù)轉(zhuǎn)人民幣的小例子
java實現(xiàn)浮點數(shù)轉(zhuǎn)人民幣的小例子,需要的朋友可以參考一下2013-03-03Sentinel Dashboard限流規(guī)則保存方式
這篇文章主要介紹了Sentinel Dashboard限流規(guī)則保存方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Java中DataInputStream和DataOutputStream的使用方法
這篇文章主要介紹了Java中DataInputStream和DataOutputStream的使用方法,通過創(chuàng)建對象展開具體的內(nèi)容介紹,需要的小伙伴可以參考一下2022-05-05java static塊和構(gòu)造函數(shù)的實例詳解
這篇文章主要介紹了java static塊和構(gòu)造函數(shù)的實例詳解的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握J(rèn)ava static關(guān)鍵字的函數(shù)方法,需要的朋友可以參考下2017-09-09