java并發(fā)中的同步器使用方式
同步器
Java 并發(fā)包中的同步器是一些用于協(xié)調(diào)多個(gè)線程執(zhí)行的工具,用于實(shí)現(xiàn)線程之間的同步和互斥操作。這些同步器提供了不同的機(jī)制來控制線程的訪問和執(zhí)行順序,以實(shí)現(xiàn)線程安全和并發(fā)控制。
1、Semaphore(信號量)
- Semaphore 是 Java 并發(fā)包中的同步器之一,用于控制對臨界區(qū)資源的訪問數(shù)量。它允許多個(gè)線程同時(shí)訪問臨界區(qū)資源,但限制了同一時(shí)間內(nèi)可以訪問資源的線程數(shù)量。
- Semaphore 維護(hù)一個(gè)許可證計(jì)數(shù),線程可以獲取和釋放這些許可證。當(dāng)許可證數(shù)量為零時(shí),線程需要等待,直到其他線程釋放許可證。
Semaphore 基本用法
import java.util.concurrent.Semaphore; public class SemaphoreExample { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); // 初始化信號量,允許同時(shí)訪問的線程數(shù)量為3 // 創(chuàng)建多個(gè)線程來模擬訪問臨界區(qū)資源 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { try { semaphore.acquire(); // 獲取許可證,如果沒有許可證則阻塞 System.out.println("Thread " + threadId + " acquired a permit and is accessing the resource."); Thread.sleep(2000); // 模擬訪問臨界區(qū)資源的耗時(shí)操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 釋放許可證 System.out.println("Thread " + threadId + " released the permit."); } }); thread.start(); } } }
運(yùn)行結(jié)果:
Thread 1 acquired a permit and is accessing the resource.
Thread 3 acquired a permit and is accessing the resource.
Thread 2 acquired a permit and is accessing the resource.
Thread 2 released the permit.
Thread 4 acquired a permit and is accessing the resource.
Thread 5 acquired a permit and is accessing the resource.
Thread 3 released the permit.
Thread 1 released the permit.
Thread 4 released the permit.
Thread 5 released the permit.
在上述示例中,我們創(chuàng)建了一個(gè) Semaphore 實(shí)例,并初始化許可證數(shù)量為 3。然后創(chuàng)建了多個(gè)線程,每個(gè)線程在獲取許可證后訪問臨界區(qū)資源,模擬耗時(shí)操作后釋放許可證。由于許可證數(shù)量有限,只有一部分線程能夠同時(shí)訪問資源,其他線程需要等待。
Semaphore 適用場景
- 有限資源的并發(fā)訪問,如數(shù)據(jù)庫連接池、線程池等。
- 控制對某個(gè)資源的同時(shí)訪問數(shù)量,以避免資源競爭和過度消耗。
2、CountDownLatch
CountDownLatch 是 Java 并發(fā)包中的同步器之一,用于實(shí)現(xiàn)一種等待機(jī)制,允許一個(gè)或多個(gè)線程等待其他線程完成一組操作后再繼續(xù)執(zhí)行。
它通過維護(hù)一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)等待和通知的機(jī)制。
在創(chuàng)建 CountDownLatch 時(shí),需要指定初始計(jì)數(shù)值,每次調(diào)用 countDown() 方法會減少計(jì)數(shù)值,當(dāng)計(jì)數(shù)值達(dá)到零時(shí),等待的線程會被喚醒繼續(xù)執(zhí)行。
CountDownLatch 基本用法
import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) { int numberOfTasks = 3; CountDownLatch latch = new CountDownLatch(numberOfTasks); // 創(chuàng)建多個(gè)線程來模擬完成任務(wù) for (int i = 1; i <= numberOfTasks; i++) { int taskId = i; Thread thread = new Thread(() -> { try { System.out.println("Task " + taskId + " is executing..."); Thread.sleep(2000); // 模擬任務(wù)執(zhí)行耗時(shí) System.out.println("Task " + taskId + " is completed."); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 完成任務(wù)后減少計(jì)數(shù) } }); thread.start(); } try { System.out.println("Main thread is waiting for tasks to complete..."); latch.await(); // 等待所有任務(wù)完成 System.out.println("All tasks are completed. Main thread continues."); } catch (InterruptedException e) { e.printStackTrace(); } } }
運(yùn)行結(jié)果:
Task 1 is executing...
Main thread is waiting for tasks to complete...
Task 2 is executing...
Task 3 is executing...
Task 1 is completed.
Task 2 is completed.
Task 3 is completed.
All tasks are completed. Main thread continues.
在上述示例中,我們創(chuàng)建了一個(gè) CountDownLatch 實(shí)例,并初始化計(jì)數(shù)值為 3。然后創(chuàng)建了多個(gè)線程來模擬完成任務(wù),每個(gè)線程執(zhí)行完任務(wù)后調(diào)用 countDown() 方法減少計(jì)數(shù)。主線程在執(zhí)行 latch.await() 時(shí)等待計(jì)數(shù)值為零,等待所有任務(wù)完成后繼續(xù)執(zhí)行。
使用 CountDownLatch 可以實(shí)現(xiàn)多個(gè)線程之間的協(xié)調(diào),確保某些操作在其他操作完成后再繼續(xù)執(zhí)行。
CountDownLatch 適用場景
- 主線程等待多個(gè)子線程完成任務(wù)后再繼續(xù)執(zhí)行。
- 等待多個(gè)線程完成初始化工作后再開始并行操作。
3、CyclicBarrier
CyclicBarrier 是 Java 并發(fā)包中的同步器之一,用于實(shí)現(xiàn)一組線程在達(dá)到一個(gè)共同點(diǎn)之前等待彼此,并在達(dá)到共同點(diǎn)后繼續(xù)執(zhí)行。它可以被重置并重新使用,適用于需要多個(gè)線程協(xié)同工作的場景。
CyclicBarrier 維護(hù)一個(gè)計(jì)數(shù)器和一個(gè)柵欄動(dòng)作(barrier action)。當(dāng)線程調(diào)用 await() 方法時(shí),計(jì)數(shù)器減少,當(dāng)計(jì)數(shù)器達(dá)到零時(shí),所有等待的線程會被喚醒并繼續(xù)執(zhí)行,同時(shí)會執(zhí)行柵欄動(dòng)作。計(jì)數(shù)器可以被重置,并且可以設(shè)置柵欄動(dòng)作,在達(dá)到共同點(diǎn)后執(zhí)行。
CyclicBarrier 基本用法
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 3; Runnable barrierAction = () -> System.out.println("All threads reached the barrier!"); CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction); // 創(chuàng)建多個(gè)線程來模擬并行執(zhí)行任務(wù) for (int i = 1; i <= numberOfThreads; i++) { int threadId = i; Thread thread = new Thread(() -> { try { System.out.println("Thread " + threadId + " is performing its task."); Thread.sleep(2000); // 模擬任務(wù)執(zhí)行耗時(shí) System.out.println("Thread " + threadId + " has reached the barrier."); barrier.await(); // 等待其他線程達(dá)到柵欄點(diǎn) System.out.println("Thread " + threadId + " continues after the barrier."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } } }
運(yùn)行結(jié)果:
Thread 1 is performing its task.
Thread 3 is performing its task.
Thread 2 is performing its task.
Thread 2 has reached the barrier.
Thread 3 has reached the barrier.
Thread 1 has reached the barrier.
All threads reached the barrier!
Thread 1 continues after the barrier.
Thread 2 continues after the barrier.
Thread 3 continues after the barrier.
在上述示例中,我們創(chuàng)建了一個(gè) CyclicBarrier 實(shí)例,初始化等待的線程數(shù)量為 3,并設(shè)置了柵欄動(dòng)作。
然后創(chuàng)建多個(gè)線程,每個(gè)線程模擬執(zhí)行任務(wù)后等待其他線程達(dá)到柵欄點(diǎn),當(dāng)所有線程都達(dá)到柵欄點(diǎn)時(shí),柵欄動(dòng)作會被執(zhí)行。
使用 CyclicBarrier 可以實(shí)現(xiàn)多線程協(xié)同工作的場景,確保所有線程在某個(gè)共同點(diǎn)之前等待彼此,并在達(dá)到共同點(diǎn)后繼續(xù)執(zhí)行。
CyclicBarrier 計(jì)數(shù)器重置用法
package com.lf.java.basic.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class MultipleCyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 3; int numberOfRounds = 3; Runnable barrierAction = () -> System.out.println("All threads reached the barrier!"); for (int round = 1; round <= numberOfRounds; round++) { CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction); System.out.println("Round " + round + ": Starting tasks"); // 創(chuàng)建多個(gè)線程來模擬并行執(zhí)行任務(wù) for (int i = 1; i <= numberOfThreads; i++) { int threadId = i; int finalRound = round; Thread thread = new Thread(() -> { try { System.out.println("Thread " + threadId + " is performing its task for Round " + finalRound); Thread.sleep(2000); // 模擬任務(wù)執(zhí)行耗時(shí) System.out.println("Thread " + threadId + " has reached the barrier for Round " + finalRound); barrier.await(); // 等待其他線程達(dá)到柵欄點(diǎn) System.out.println("Thread " + threadId + " continues after the barrier for Round " + finalRound); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } // 等待所有線程完成當(dāng)前輪次的任務(wù) try { Thread.sleep(3000); // 等待一段時(shí)間以觀察效果 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Round " + round + ": All tasks completed\n"); // 讓當(dāng)前輪次的所有線程都離開柵欄點(diǎn),以便重新使用 barrier.reset(); } } }
運(yùn)行結(jié)果:
Round 1: Starting tasks
Thread 1 is performing its task for Round 1
Thread 2 is performing its task for Round 1
Thread 3 is performing its task for Round 1
Thread 3 has reached the barrier for Round 1
Thread 2 has reached the barrier for Round 1
Thread 1 has reached the barrier for Round 1
All threads reached the barrier!
Thread 2 continues after the barrier for Round 1
Thread 1 continues after the barrier for Round 1
Thread 3 continues after the barrier for Round 1
Round 1: All tasks completedRound 2: Starting tasks
Thread 1 is performing its task for Round 2
Thread 2 is performing its task for Round 2
Thread 3 is performing its task for Round 2
Thread 3 has reached the barrier for Round 2
Thread 2 has reached the barrier for Round 2
Thread 1 has reached the barrier for Round 2
All threads reached the barrier!
Thread 1 continues after the barrier for Round 2
Thread 3 continues after the barrier for Round 2
Thread 2 continues after the barrier for Round 2
Round 2: All tasks completedRound 3: Starting tasks
Thread 1 is performing its task for Round 3
Thread 2 is performing its task for Round 3
Thread 3 is performing its task for Round 3
Thread 1 has reached the barrier for Round 3
Thread 2 has reached the barrier for Round 3
Thread 3 has reached the barrier for Round 3
All threads reached the barrier!
Thread 3 continues after the barrier for Round 3
Thread 1 continues after the barrier for Round 3
Thread 2 continues after the barrier for Round 3
Round 3: All tasks completed
在上述示例中,我們模擬了多輪任務(wù)協(xié)同。每一輪都創(chuàng)建一個(gè)新的 CyclicBarrier 實(shí)例,用于協(xié)調(diào)線程的等待和通知。在每一輪的任務(wù)完成后,我們使用 barrier.reset() 來重置計(jì)數(shù)器,以便進(jìn)行下一輪的任務(wù)協(xié)同。
運(yùn)行這個(gè)示例可以看到多輪任務(wù)協(xié)同的效果,每一輪的任務(wù)都會等待所有線程完成后再繼續(xù),然后重置計(jì)數(shù)器以準(zhǔn)備下一輪。
CyclicBarrier 適用場景
- 將多個(gè)線程分成階段進(jìn)行,每個(gè)階段需要等待其他線程完成后再繼續(xù)。
- 并行計(jì)算中的分治操作,等待所有線程完成分治任務(wù)后進(jìn)行合并計(jì)算。
4、Phaser
Phaser 是 Java 并發(fā)包中的同步器之一,它提供了更靈活的多階段線程協(xié)調(diào)機(jī)制,適用于需要分階段進(jìn)行多個(gè)任務(wù)的并行執(zhí)行和協(xié)調(diào)的場景。Phaser 可以用于更復(fù)雜的同步需求,例如循環(huán)的多階段任務(wù)協(xié)同。
Phaser 維護(hù)了一個(gè)計(jì)數(shù)器和多個(gè)階段(phase)。在每個(gè)階段,線程可以注冊、等待和注銷,以及在某個(gè)階段到達(dá)時(shí)執(zhí)行特定的操作。
Phaser 基本用法
import java.util.concurrent.Phaser; public class PhaserExample { public static void main(String[] args) { int numberOfThreads = 3; int numberOfPhases = 3; Phaser phaser = new Phaser(numberOfThreads) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Phase " + phase + " completed."); return phase == numberOfPhases - 1 || registeredParties == 0; } }; // 創(chuàng)建多個(gè)線程來模擬并行執(zhí)行任務(wù) for (int i = 0; i < numberOfThreads; i++) { int threadId = i; Thread thread = new Thread(() -> { for (int phase = 0; phase < numberOfPhases; phase++) { System.out.println("Thread " + threadId + " is in Phase " + phase); phaser.arriveAndAwaitAdvance(); // 等待其他線程到達(dá)當(dāng)前階段 } }); thread.start(); } } }
運(yùn)行結(jié)果:
Thread 0 is in Phase 0
Thread 1 is in Phase 0
Thread 2 is in Phase 0
Phase 0 completed.
Thread 2 is in Phase 1
Thread 1 is in Phase 1
Thread 0 is in Phase 1
Phase 1 completed.
Thread 1 is in Phase 2
Thread 2 is in Phase 2
Thread 0 is in Phase 2
Phase 2 completed.
在上述示例中,我們創(chuàng)建了一個(gè) Phaser 實(shí)例,設(shè)置初始注冊線程數(shù)量為 3。然后,我們創(chuàng)建多個(gè)線程來模擬并行執(zhí)行任務(wù),每個(gè)線程都會在每個(gè)階段調(diào)用 phaser.arriveAndAwaitAdvance() 等待其他線程到達(dá)當(dāng)前階段。當(dāng)所有線程都到達(dá)后,onAdvance() 方法會被調(diào)用,用于執(zhí)行階段結(jié)束后的操作。
Phaser 提供了更靈活的多階段協(xié)同機(jī)制,適用于需要多個(gè)階段的任務(wù)協(xié)同和并行執(zhí)行的場景。它還支持動(dòng)態(tài)添加或刪除等待線程,使其更適用于動(dòng)態(tài)變化的并發(fā)需求。
Phaser 適用場景
- 需要分階段執(zhí)行的任務(wù),每個(gè)階段可以有不同的線程數(shù)。
- 需要?jiǎng)討B(tài)添加或刪除等待線程的場景。
5、ReentrantLock
ReentrantLock 是 Java 并發(fā)包中的同步器之一,它是一個(gè)可重入的互斥鎖,提供了與 synchronized 關(guān)鍵字相似的功能,但更為靈活。
與 synchronized 不同,ReentrantLock 具有更多的控制選項(xiàng)和功能,例如公平性、可中斷性、超時(shí)等待等。
ReentrantLock 基本用法
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); // 創(chuàng)建多個(gè)線程來模擬使用鎖 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { try { lock.lock(); // 獲取鎖 System.out.println("Thread " + threadId + " acquired the lock."); Thread.sleep(2000); // 模擬臨界區(qū)操作耗時(shí) } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖 System.out.println("Thread " + threadId + " released the lock."); } }); thread.start(); } } }
運(yùn)行結(jié)果:
Thread 1 acquired the lock.
Thread 1 released the lock.
Thread 2 acquired the lock.
Thread 2 released the lock.
Thread 3 acquired the lock.
Thread 3 released the lock.
Thread 4 acquired the lock.
Thread 4 released the lock.
Thread 5 acquired the lock.
Thread 5 released the lock.
在上述示例中,我們創(chuàng)建了一個(gè) ReentrantLock 實(shí)例,并在多個(gè)線程中使用它來模擬對共享資源的訪問。每個(gè)線程在訪問資源前調(diào)用 lock.lock() 來獲取鎖,訪問資源后調(diào)用 lock.unlock() 來釋放鎖。
需要注意的是,為了避免死鎖,應(yīng)該在 finally 塊中釋放鎖,以確保無論是否發(fā)生異常,鎖都會被釋放。
ReentrantLock 還提供了其他方法,如 tryLock()(嘗試獲取鎖,如果鎖可用則獲取,否則返回 false)、lockInterruptibly()(可中斷的獲取鎖,可響應(yīng)線程中斷)等,使其更加靈活和強(qiáng)大。
ReentrantLock 中斷獲取鎖用法
package com.lf.java.basic.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class InterruptibleLockExample { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); // 創(chuàng)建線程嘗試獲取鎖 Thread thread = new Thread(() -> { try { lock.lockInterruptibly(); // 可中斷獲取鎖 System.out.println("Thread acquired the lock."); Thread.sleep(5000); // 模擬臨界區(qū)操作耗時(shí) } catch (InterruptedException e) { //中斷喚醒線程 System.out.println("Thread interrupted while waiting for the lock."); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); // 釋放鎖 System.out.println("Thread released the lock."); } } }); // 啟動(dòng)線程 thread.start(); // 主線程等待一段時(shí)間后嘗試中斷線程 try { Thread.sleep(2000); System.out.println("Thread interrupt before"); thread.interrupt(); // 中斷線程的等待 System.out.println("Thread interrupt after"); } catch (InterruptedException e) { System.out.println("InterruptedException catch"); e.printStackTrace(); } } }
運(yùn)行結(jié)果:
Thread acquired the lock.
Thread interrupt before
Thread interrupt after
Thread interrupted while waiting for the lock.
Thread released the lock.
在上述示例中,創(chuàng)建了一個(gè)線程嘗試獲取鎖,但是主線程在啟動(dòng)線程后等待了一段時(shí)間后中斷了該線程的等待。
由于我們使用了 lock.lockInterruptibly() 來獲取鎖,線程在等待鎖的過程中可以響應(yīng)中斷,一旦被中斷,它會拋出 InterruptedException,從而可以捕獲中斷事件并做出相應(yīng)處理。
ReentrantLock 適用場景:
- 需要更精細(xì)的同步控制,例如在某些情況下需要手動(dòng)釋放鎖。
- 需要可中斷或超時(shí)等待的線程。
6、ReadWriteLock
ReadWriteLock 是 Java 并發(fā)包中的同步器之一,用于實(shí)現(xiàn)讀寫分離的鎖機(jī)制,提供了更高效的并發(fā)訪問控制。
它允許多個(gè)線程同時(shí)讀取共享資源,但在寫入資源時(shí)只允許一個(gè)線程進(jìn)行,從而提高了并發(fā)性能。
ReadWriteLock 包含兩種鎖:讀鎖和寫鎖。
- 讀鎖(ReadLock):多個(gè)線程可以同時(shí)獲取讀鎖,只要沒有線程持有寫鎖。在沒有寫鎖的情況下,多個(gè)線程可以并發(fā)讀取共享資源,從而提高并發(fā)性能。
- 寫鎖(Write Lock):寫鎖是獨(dú)占的,只有一個(gè)線程可以持有寫鎖。在一個(gè)線程持有寫鎖時(shí),其他線程無法獲取讀鎖或?qū)戞i,從而確保對共享資源的寫操作是互斥的。
ReadWriteLock 基本用法
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockExample { public static void main(String[] args) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 創(chuàng)建多個(gè)讀線程 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { readWriteLock.readLock().lock(); // 獲取讀鎖 try { System.out.println("Thread " + threadId + " is reading."); Thread.sleep(2000); // 模擬讀取操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); // 釋放讀鎖 } }); thread.start(); } // 創(chuàng)建一個(gè)寫線程 Thread writeThread = new Thread(() -> { readWriteLock.writeLock().lock(); // 獲取寫鎖 try { System.out.println("Write thread is writing."); Thread.sleep(2000); // 模擬寫入操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); // 釋放寫鎖 } }); writeThread.start(); } }
運(yùn)行結(jié)果:
Thread 1 is reading.
Thread 2 is reading.
Thread 4 is reading.
Thread 3 is reading.
Thread 5 is reading.
Write thread is writing
在上述示例中,我們創(chuàng)建了一個(gè) ReadWriteLock 實(shí)例,然后創(chuàng)建多個(gè)讀線程和一個(gè)寫線程來模擬讀寫操作。
讀線程在執(zhí)行時(shí)調(diào)用 readWriteLock.readLock().lock() 來獲取讀鎖,寫線程在執(zhí)行時(shí)調(diào)用 readWriteLock.writeLock().lock() 來獲取寫鎖。
使用 ReadWriteLock 可以提高對共享資源的并發(fā)訪問性能,適用于讀操作頻繁,寫操作較少的場景。
ReadWriteLock 適用場景
- 讀操作頻繁,寫操作較少的情況,以提高并發(fā)性能。
- 允許多個(gè)線程同時(shí)讀取資源,但在寫入資源時(shí)確保互斥。
7、 Condition
Condition 是 Java 并發(fā)包中的同步器之一,它提供了更靈活的線程等待和通知機(jī)制,用于在多線程環(huán)境下實(shí)現(xiàn)精細(xì)的線程協(xié)調(diào)。
Condition 是與 Lock 結(jié)合使用的,它可以替代傳統(tǒng)的 wait() 和 notify() 方法,提供更多的控制選項(xiàng)和功能。
通過 Condition,我們可以實(shí)現(xiàn)更精確的等待和通知,以及更靈活的線程喚醒機(jī)制。
Condition 基本用法
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // 創(chuàng)建一個(gè)等待線程 Thread waitingThread = new Thread(() -> { lock.lock(); try { System.out.println("Waiting thread is waiting..."); condition.await(); // 等待條件滿足 System.out.println("Waiting thread is awake."); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 創(chuàng)建一個(gè)喚醒線程 Thread signalingThread = new Thread(() -> { lock.lock(); try { Thread.sleep(2000); // 模擬等待一段時(shí)間 System.out.println("Signaling thread is awake."); condition.signal(); // 喚醒等待線程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 啟動(dòng)線程 waitingThread.start(); signalingThread.start(); } }
運(yùn)行結(jié)果:
Waiting thread is waiting...
Signaling thread is awake.
Waiting thread is awake.
在上述示例中,我們創(chuàng)建了一個(gè) ReentrantLock 實(shí)例和一個(gè)與之關(guān)聯(lián)的 Condition,然后創(chuàng)建了一個(gè)等待線程和一個(gè)喚醒線程。
等待線程在調(diào)用 condition.await() 后進(jìn)入等待狀態(tài),直到喚醒線程調(diào)用 condition.signal() 來喚醒它。
通過使用 Condition,我們可以更加精確地控制線程的等待和通知,使線程協(xié)調(diào)更加靈活。
Condition 實(shí)現(xiàn)阻塞隊(duì)列
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BlockingQueueWithCondition<T> { private final Queue<T> queue = new LinkedList<>(); private final int capacity; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public BlockingQueueWithCondition(int capacity) { this.capacity = capacity; } public void put(T item) throws InterruptedException { lock.lock(); try { while (queue.size() == capacity) { notFull.await(); } queue.offer(item); notEmpty.signal(); } finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { notEmpty.await(); } T item = queue.poll(); notFull.signal(); return item; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public static void main(String[] args) { BlockingQueueWithCondition<Integer> queue = new BlockingQueueWithCondition<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
運(yùn)行結(jié)果:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我們使用 Condition 來實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列,其中 put() 方法用于向隊(duì)列中放入元素,take() 方法用于從隊(duì)列中取出元素。
當(dāng)隊(duì)列滿時(shí),生產(chǎn)者線程會等待 notFull 條件,當(dāng)隊(duì)列為空時(shí),消費(fèi)者線程會等待 notEmpty 條件。
這個(gè)示例展示了如何使用 Condition 來實(shí)現(xiàn)線程之間的協(xié)調(diào),以及如何實(shí)現(xiàn)一個(gè)簡單的阻塞隊(duì)列。注意,這個(gè)示例并沒有處理所有的邊界情況和異常情況,實(shí)際使用時(shí)需要考慮更多細(xì)節(jié)。
8、BlockingQueue
BlockingQueue 是 Java 并發(fā)包中的一個(gè)接口,它提供了一種線程安全的隊(duì)列實(shí)現(xiàn),用于在多線程環(huán)境下進(jìn)行數(shù)據(jù)的生產(chǎn)和消費(fèi)。
BlockingQueue 支持阻塞操作,當(dāng)隊(duì)列滿或空時(shí),線程會被阻塞,直到條件滿足。
BlockingQueue 提供了多種實(shí)現(xiàn),包括:
- ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列。
- LinkedBlockingQueue:基于鏈表的可選有界阻塞隊(duì)列。
- PriorityBlockingQueue:基于優(yōu)先級的無界阻塞隊(duì)列。
- DelayQueue:基于延遲時(shí)間的無界阻塞隊(duì)列。
- SynchronousQueue:不存儲元素的阻塞隊(duì)列,用于直接傳遞數(shù)據(jù)。
- LinkedTransferQueue:基于鏈表的無界阻塞隊(duì)列,結(jié)合了 LinkedBlockingQueue 和SynchronousQueue 特性。
- LinkedBlockingDeque:基于鏈表的雙端阻塞隊(duì)列。
BlockingQueue 基本用法
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
運(yùn)行結(jié)果:
Consumed: 1
Produced: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我們使用了 ArrayBlockingQueue 來實(shí)現(xiàn)阻塞隊(duì)列,其中生產(chǎn)者線程使用 put() 方法向隊(duì)列中放入元素,消費(fèi)者線程使用 take() 方法從隊(duì)列中取出元素。
當(dāng)隊(duì)列滿或空時(shí),線程會被阻塞,直到條件滿足。
BlockingQueue 是實(shí)現(xiàn)線程安全的生產(chǎn)者-消費(fèi)者模式的常用工具,它簡化了線程之間的協(xié)調(diào)和通信。
9、BlockingDeque
BlockingDeque(阻塞雙端隊(duì)列)是 Java 并發(fā)包中的一個(gè)接口,它是 BlockingQueue 接口的擴(kuò)展,提供了雙端隊(duì)列的功能,并支持阻塞操作。BlockingDeque 可以在隊(duì)列的兩端插入和刪除元素,同時(shí)支持阻塞操作,使得在多線程環(huán)境下更容易實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)。
BlockingDeque 接口的實(shí)現(xiàn)類包括:
- LinkedBlockingDeque:基于鏈表的阻塞雙端隊(duì)列,可選有界或無界。
- LinkedBlockingDeque:基于鏈表的雙端阻塞隊(duì)列,無界。
BlockingDeque基本用法
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; public class BlockingDequeExample { public static void main(String[] args) { BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { deque.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = deque.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
運(yùn)行結(jié)果:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我們使用了 LinkedBlockingDeque 來實(shí)現(xiàn)阻塞雙端隊(duì)列,生產(chǎn)者線程使用 put() 方法向隊(duì)列中放入元素,消費(fèi)者線程使用 take() 方法從隊(duì)列中取出元素。
與 BlockingQueue 類似,當(dāng)隊(duì)列滿或空時(shí),線程會被阻塞,直到條件滿足。
BlockingDeque 可以更靈活地實(shí)現(xiàn)在隊(duì)列兩端插入和刪除元素,適用于更多種類的場景,例如雙向數(shù)據(jù)傳輸和窗口滑動(dòng)等。
以下是一些常用的在隊(duì)列兩端插入和刪除元素的方法:
- 在隊(duì)列頭部插入元素:
void addFirst(E e): 將元素添加到隊(duì)列的頭部,如果隊(duì)列已滿,則拋出異常。
boolean offerFirst(E e): 將元素添加到隊(duì)列的頭部,如果隊(duì)列已滿,則返回 false。
void putFirst(E e): 將元素添加到隊(duì)列的頭部,如果隊(duì)列已滿,則阻塞等待直到有空間。
- 在隊(duì)列尾部插入元素:
void addLast(E e):將元素添加到隊(duì)列的尾部,如果隊(duì)列已滿,則拋出異常。
boolean offerLast(E e):將元素添加到隊(duì)列的尾部,如果隊(duì)列已滿,則返回 false。
void putLast(E e):將元素添加到隊(duì)列的尾部,如果隊(duì)列已滿,則阻塞等待直到有空間。
- 從隊(duì)列頭部刪除元素:
E removeFirst(): 移除并返回隊(duì)列頭部的元素,如果隊(duì)列為空,則拋出異常。
E pollFirst(): 移除并返回隊(duì)列頭部的元素,如果隊(duì)列為空,則返回 null。
E takeFirst(): 移除并返回隊(duì)列頭部的元素,如果隊(duì)列為空,則阻塞等待直到有元素。
- 從隊(duì)列尾部刪除元素:
E removeLast():移除并返回隊(duì)列尾部的元素,如果隊(duì)列為空,則拋出異常。
E pollLast(): 移除并返回隊(duì)列尾部的元素,如果隊(duì)列為空,則返回 null。
E takeLast(): 移除并返回隊(duì)列尾部的元素,如果隊(duì)列為空,則阻塞等待直到有元素。
這些方法使得你可以在雙端隊(duì)列的頭部和尾部執(zhí)行插入和刪除操作,根據(jù)具體的需求選擇合適的方法來實(shí)現(xiàn)線程安全的雙端隊(duì)列操作。
10、LockSupport
LockSupport 是 Java 并發(fā)包中提供的工具類,用于線程的阻塞和喚醒操作。
它提供了一種基于許可(permit)的方式來控制線程的阻塞和喚醒,相對于傳統(tǒng)的 wait() 和 notify() 方法,LockSupport 更加靈活和可靠。
主要的方法包括:
- void park():阻塞當(dāng)前線程,直到獲得許可。
- void park(Object blocker):阻塞當(dāng)前線程,并將 blocker 關(guān)聯(lián)到當(dāng)前線程,用于監(jiān)控和診斷工具。
- void parkNanos(long nanos):阻塞當(dāng)前線程,最多等待指定的納秒數(shù),直到獲得許可。
- void parkNanos(Object blocker, long nanos):阻塞當(dāng)前線程,并將 blocker關(guān)聯(lián)到當(dāng)前線程,最多等待指定的納秒數(shù)。
- void parkUntil(long deadline):阻塞當(dāng)前線程,直到指定的時(shí)間戳,直到獲得許可。
- void parkUntil(Object blocker, long deadline):阻塞當(dāng)前線程,并將 blocker關(guān)聯(lián)到當(dāng)前線程,直到指定的時(shí)間戳。
- void unpark(Thread thread):喚醒指定的線程,如果線程被阻塞,則解除阻塞狀態(tài)。
LockSupport基本用法
import java.util.concurrent.locks.LockSupport; public class LockSupportExample { public static void main(String[] args) { Thread thread = new Thread(() -> { System.out.println("Thread is going to be parked."); LockSupport.park(); // 阻塞當(dāng)前線程 System.out.println("Thread is unparked."); }); thread.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Main thread is unparking the parked thread."); LockSupport.unpark(thread); // 喚醒被阻塞的線程 } }
運(yùn)行結(jié)果:
Thread is going to be parked.
Main thread is unparking the parked thread.
Thread is unparked.
在上述示例中,我們創(chuàng)建了一個(gè)新線程,調(diào)用了 LockSupport.park() 來阻塞該線程。
然后,主線程等待 2 秒后,調(diào)用了 LockSupport.unpark(thread) 來喚醒被阻塞的線程。與傳統(tǒng)的 wait() 和 notify() 方法不同,LockSupport 是基于許可的,不需要獲取某個(gè)特定對象的鎖來進(jìn)行阻塞和喚醒操作。
LockSupport 提供了一種更直接、靈活和可控的線程阻塞和喚醒機(jī)制,適用于各種多線程協(xié)調(diào)的場景。
11、Exchanger
Exchanger 是 Java 并發(fā)包中的同步器之一,用于實(shí)現(xiàn)兩個(gè)線程之間交換數(shù)據(jù)。
它提供了一個(gè)同步點(diǎn),當(dāng)兩個(gè)線程都到達(dá)這個(gè)同步點(diǎn)時(shí),它們可以交換數(shù)據(jù)。Exchanger 可以用于實(shí)現(xiàn)線程間的數(shù)據(jù)傳遞和協(xié)作
Exchanger 提供了兩個(gè)線程之間交換數(shù)據(jù)的功能,但僅限于兩個(gè)線程。當(dāng)兩個(gè)線程都到達(dá) Exchanger 同步點(diǎn)時(shí),它們可以通過 exchange() 方法交換數(shù)據(jù),然后各自繼續(xù)執(zhí)行。
Exchanger基本用法
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); // 創(chuàng)建一個(gè)線程來發(fā)送數(shù)據(jù) Thread senderThread = new Thread(() -> { try { String dataToSend = "Hello from Sender"; System.out.println("Sender is sending: " + dataToSend); exchanger.exchange(dataToSend); // 發(fā)送數(shù)據(jù)并等待接收數(shù)據(jù) } catch (InterruptedException e) { e.printStackTrace(); } }); // 創(chuàng)建一個(gè)線程來接收數(shù)據(jù) Thread receiverThread = new Thread(() -> { try { String receivedData = exchanger.exchange(null); // 等待接收數(shù)據(jù)并發(fā)送數(shù)據(jù) System.out.println("Receiver received: " + receivedData); } catch (InterruptedException e) { e.printStackTrace(); } }); // 啟動(dòng)線程 senderThread.start(); receiverThread.start(); } }
運(yùn)行結(jié)果
Sender is sending: Hello from Sender
Receiver received: Hello from Sender
在上述示例中,我們創(chuàng)建了一個(gè) Exchanger 實(shí)例,然后創(chuàng)建了一個(gè)發(fā)送數(shù)據(jù)的線程和一個(gè)接收數(shù)據(jù)的線程。
當(dāng)發(fā)送數(shù)據(jù)的線程調(diào)用 exchange() 方法時(shí),它會發(fā)送數(shù)據(jù)并等待接收數(shù)據(jù);而接收數(shù)據(jù)的線程調(diào)用 exchange() 方法時(shí),它會等待接收數(shù)據(jù)并發(fā)送數(shù)據(jù)。當(dāng)兩個(gè)線程都到達(dá) Exchanger 同步點(diǎn)時(shí),它們會交換數(shù)據(jù),并繼續(xù)執(zhí)行。
需要注意的是,Exchanger 只適用于兩個(gè)線程之間的數(shù)據(jù)交換。如果需要更多線程之間的數(shù)據(jù)交換,可能需要組合使用多個(gè) Exchanger。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java 中用split分割字符串,最后的空格等不被拆分的方法
下面小編就為大家?guī)硪黄猨ava 中用split分割字符串,最后的空格等不被拆分的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-02-02使用springboot logback動(dòng)態(tài)獲取application的配置項(xiàng)
這篇文章主要介紹了使用springboot logback動(dòng)態(tài)獲取application的配置項(xiàng),具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java中Object toString方法簡介_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
Object類在Java里面是一個(gè)比較特殊的類,JAVA為了組織這個(gè)類組織得比較方便,它提供了一個(gè)最根上的類,相當(dāng)于所有的類都是從這個(gè)類繼承,這個(gè)類就叫Object。接下來通過本文給大家介紹Object toString方法,需要的的朋友參考下吧2017-05-05Spring?AOP操作的相關(guān)術(shù)語及環(huán)境準(zhǔn)備
這篇文章主要為大家介紹了Spring?AOP操作的相關(guān)術(shù)語及環(huán)境準(zhǔn)備學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05win10下定時(shí)運(yùn)行與開機(jī)自啟動(dòng)jar包的方法記錄
這篇文章主要給大家介紹了關(guān)于win10下定時(shí)運(yùn)行與開機(jī)自啟動(dòng)jar包的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11阿里云主機(jī)上安裝jdk 某庫出現(xiàn)問題的解決方法
今天安裝jdk到阿里云服務(wù)上,首先看下阿里云是32位還是64位的,如果是32位下載32位的包,如果是64位的下載64位的包,下面與大家分享下安裝過程中遇到問題的解決方法2013-06-06淺談HttpClient、okhttp和RestTemplate的區(qū)別
這篇文章主要介紹了HttpClient、okhttp和RestTemplate的區(qū)別,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06