Java中常見的并發(fā)控制手段淺析
前言
單實(shí)例的并發(fā)控制,主要是針對JVM內(nèi),我們常規(guī)的手段即可滿足需求,常見的手段大概有下面這些
- 同步代碼塊
- CAS自旋
- 鎖
- 阻塞隊(duì)列,令牌桶等
1.1 同步代碼塊
通過同步代碼塊,來確保同一時(shí)刻只會(huì)有一個(gè)線程執(zhí)行對應(yīng)的業(yè)務(wù)邏輯,常見的使用姿勢如下
public synchronized doProcess() { // 同步代碼塊,只會(huì)有一個(gè)線程執(zhí)行 }
一般推薦使用最小區(qū)間使用原則,盡量不要直接在方法上加synchronized,比如經(jīng)典的雙重判定單例模式
public class Single { private static volatile Single instance; private Single() {} public static Single getInstance() { if (instance == null) { synchronized(Single.class) { if (instance == null) instance = new Single(); } } return instance; } }
1.2 CAS自旋方式
比如AtomicXXX原子類中的很多實(shí)現(xiàn),就是借助unsafe的CAS來實(shí)現(xiàn)的,如下
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } // unsafe 實(shí)現(xiàn) // cas + 自選,不斷的嘗試更新設(shè)置,直到成功為止 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
1.3 鎖
jdk本身提供了不少的鎖,為了實(shí)現(xiàn)單實(shí)例的并發(fā)控制,我們需要選擇寫鎖;如果支持多讀,單實(shí)例寫,則可以考慮讀寫鎖;一般使用姿勢也比較簡單
private void doSome(ReentrantReadWriteLock.WriteLock writeLock) { try { writeLock.lock(); System.out.println("持有鎖成功 " + Thread.currentThread().getName()); Thread.sleep(1000); System.out.println("執(zhí)行完畢! " + Thread.currentThread().getName()); writeLock.unlock(); } catch (Exception e) { e.printStackTrace(); } } @Test public void lock() throws InterruptedException { ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); Thread.sleep(20000); }
1.4 阻塞隊(duì)列
借助同步阻塞隊(duì)列,也可以實(shí)現(xiàn)并發(fā)控制的效果,比如隊(duì)列中初始化n個(gè)元素,每次消費(fèi)從隊(duì)列中獲取一個(gè)元素,如果拿不到則阻塞;執(zhí)行完畢之后,重新塞入一個(gè)元素,這樣就可以實(shí)現(xiàn)一個(gè)簡單版的并發(fā)控制
demo版演示,下面指定隊(duì)列長度為2,表示最大并發(fā)數(shù)控制為2;設(shè)置為1時(shí),可以實(shí)現(xiàn)單線程的訪問控制
AtomicInteger cnt = new AtomicInteger(); private void consumer(LinkedBlockingQueue<Integer> queue) { try { // 同步阻塞拿去數(shù)據(jù) int val = queue.take(); Thread.sleep(2000); System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 添加數(shù)據(jù) System.out.println("結(jié)束 " + Thread.currentThread()); queue.offer(cnt.getAndAdd(1)); } } @Test public void blockQueue() throws InterruptedException { LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2); queue.add(cnt.getAndAdd(1)); queue.add(cnt.getAndAdd(1)); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); Thread.sleep(10000); }
1.5 信號(hào)量Semaphore
上面隊(duì)列的實(shí)現(xiàn)方式,可以使用信號(hào)量Semaphore來完成,通過設(shè)置信號(hào)量,來控制并發(fā)數(shù)
private void semConsumer(Semaphore semaphore) { try { //同步阻塞,嘗試獲取信號(hào) semaphore.acquire(1); System.out.println("成功拿到信號(hào),執(zhí)行: " + Thread.currentThread()); Thread.sleep(2000); System.out.println("執(zhí)行完畢,釋放信號(hào): " + Thread.currentThread()); semaphore.release(1); } catch (Exception e) { e.printStackTrace(); } } @Test public void semaphore() throws InterruptedException { Semaphore semaphore = new Semaphore(2); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); Thread.sleep(20_000); }
1.6 計(jì)數(shù)器CountDownLatch
計(jì)數(shù),應(yīng)用場景更偏向于多線程的協(xié)同,比如多個(gè)線程執(zhí)行完畢之后,再處理某些事情;不同于上面的并發(fā)數(shù)的控制,它和柵欄一樣,更多的是行為結(jié)果的統(tǒng)一
這種場景下的使用姿勢一般如下
重點(diǎn):countDownLatch 計(jì)數(shù)為0時(shí)放行
@Test public void countDown() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(() -> { try { System.out.println("do something in " + Thread.currentThread()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); new Thread(() -> { try { System.out.println("do something in t2: " + Thread.currentThread()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); countDownLatch.await(); System.out.printf("結(jié)束"); }
1.7 柵欄 CyclicBarrier
CyclicBarrier的作用與上面的CountDownLatch相似,區(qū)別在于正向計(jì)數(shù)+1, 只有達(dá)到條件才放行; 且支持通過調(diào)用reset()重置計(jì)數(shù),而CountDownLatch則不行
一個(gè)簡單的demo
private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) { // 等待達(dá)到條件才放行 try { System.out.println("準(zhǔn)備執(zhí)行: " + Thread.currentThread() + " at: " + LocalDateTime.now()); Thread.sleep(sleep); int index = barrier.await(); System.out.println("開始執(zhí)行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now()); } catch (Exception e) { e.printStackTrace(); } } @Test public void testCyclicBarrier() throws InterruptedException { // 到達(dá)兩個(gè)工作線程才能繼續(xù)往后面執(zhí)行 CyclicBarrier barrier = new CyclicBarrier(2); // 三秒之后,下面兩個(gè)線程的才會(huì)輸出 開始執(zhí)行 new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start(); new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start(); Thread.sleep(4000); // 重置,可以再次使用 barrier.reset(); new Thread(() -> cyclicBarrierLogic(barrier, 1)).start(); new Thread(() -> cyclicBarrierLogic(barrier, 1)).start(); Thread.sleep(10000); }
1.8 guava令牌桶
guava封裝了非常簡單的并發(fā)控制工具類RateLimiter,作為單機(jī)的并發(fā)控制首選
一個(gè)控制qps為2的簡單demo如下:
private void guavaProcess(RateLimiter rateLimiter) { try { // 同步阻塞方式獲取 System.out.println("準(zhǔn)備執(zhí)行: " + Thread.currentThread() + " > " + LocalDateTime.now()); rateLimiter.acquire(); System.out.println("執(zhí)行中: " + Thread.currentThread() + " > " + LocalDateTime.now()); } catch (Exception e) { e.printStackTrace(); } } @Test public void testGuavaRate() throws InterruptedException { // 1s 中放行兩個(gè)請求 RateLimiter rateLimiter = RateLimiter.create(2.0d); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); Thread.sleep(20_000); }
輸出:
準(zhǔn)備執(zhí)行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
準(zhǔn)備執(zhí)行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
準(zhǔn)備執(zhí)行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
準(zhǔn)備執(zhí)行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
準(zhǔn)備執(zhí)行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
準(zhǔn)備執(zhí)行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
準(zhǔn)備執(zhí)行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
執(zhí)行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
執(zhí)行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
執(zhí)行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
執(zhí)行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
執(zhí)行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
執(zhí)行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
執(zhí)行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219
1.9 滑動(dòng)窗口TimeWindow
沒有找到通用的滑動(dòng)窗口jar包,一般來講滑動(dòng)窗口更適用于平滑的限流,解決瞬時(shí)高峰問題
一個(gè)供參考的實(shí)現(xiàn)方式:
固定大小隊(duì)列,隊(duì)列中每個(gè)數(shù)據(jù)代表一個(gè)時(shí)間段的計(jì)數(shù),
訪問 -》 隊(duì)列頭拿數(shù)據(jù)(注意不出隊(duì))-》判斷是否跨時(shí)間段 -》 同一時(shí)間段,計(jì)數(shù)+1 -》跨時(shí)間段,新增數(shù)據(jù)入隊(duì),若
扔不進(jìn)去,表示時(shí)間窗滿,隊(duì)尾數(shù)據(jù)出隊(duì)
問題:當(dāng)流量稀疏時(shí),導(dǎo)致不會(huì)自動(dòng)釋放過期的數(shù)據(jù)
解決方案:根據(jù)時(shí)間段設(shè)置定時(shí)任務(wù),模擬訪問操作,只是將計(jì)數(shù)改為 + 0
1.10 小結(jié)
本文給出了幾種單機(jī)版的并發(fā)控制的技術(shù)手段,主要目的是介紹了一些可選的方案,技術(shù)細(xì)節(jié)待后續(xù)補(bǔ)全完善,當(dāng)然如果有其他的建議,歡迎評論交流
到此這篇關(guān)于Java中常見的并發(fā)控制手段的文章就介紹到這了,更多相關(guān)Java并發(fā)控制手段內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java并發(fā)編程之代碼實(shí)現(xiàn)兩玩家交換裝備
- Java并發(fā)編程之阻塞隊(duì)列(BlockingQueue)詳解
- java實(shí)戰(zhàn)案例之用戶注冊并發(fā)送郵件激活/發(fā)送郵件驗(yàn)證碼
- JAVA并發(fā)圖解
- java并發(fā)編程JUC CountDownLatch線程同步
- Java并發(fā)之Condition案例詳解
- java并發(fā)編程之ThreadLocal詳解
- Java 處理高并發(fā)負(fù)載類優(yōu)化方法案例詳解
- 淺談Java高并發(fā)解決方案以及高負(fù)載優(yōu)化方法
- Java httpClient連接池支持多線程高并發(fā)的實(shí)現(xiàn)
- Java之Rsync并發(fā)遷移數(shù)據(jù)并校驗(yàn)詳解
- Java面試題沖刺第二十四天--并發(fā)編程
- Java 模擬真正的并發(fā)請求詳情
相關(guān)文章
基于SpringBoot bootstrap.yml配置未生效的解決
這篇文章主要介紹了基于SpringBoot bootstrap.yml配置未生效的解決方式,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10java兩個(gè)數(shù)組合并為一個(gè)數(shù)組的幾種方法
這篇文章主要給大家介紹了關(guān)于java兩個(gè)數(shù)組合并為一個(gè)數(shù)組的幾種方法,最近在寫代碼時(shí)遇到了需要合并兩個(gè)數(shù)組的需求,文中將每種方法都介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07Springboot實(shí)現(xiàn)多線程及線程池監(jiān)控
線程池的監(jiān)控很重要,本文就來介紹一下Springboot實(shí)現(xiàn)多線程及線程池監(jiān)控,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-01-01Java Base64算法實(shí)際應(yīng)用之郵件發(fā)送實(shí)例分析
這篇文章主要介紹了Java Base64算法實(shí)際應(yīng)用之郵件發(fā)送,結(jié)合實(shí)例形式分析了java字符編碼與郵件發(fā)送相關(guān)操作技巧,需要的朋友可以參考下2019-09-09