java并發(fā)中的同步器使用方式
同步器
Java 并發(fā)包中的同步器是一些用于協(xié)調多個線程執(zhí)行的工具,用于實現(xiàn)線程之間的同步和互斥操作。這些同步器提供了不同的機制來控制線程的訪問和執(zhí)行順序,以實現(xiàn)線程安全和并發(fā)控制。
1、Semaphore(信號量)
- Semaphore 是 Java 并發(fā)包中的同步器之一,用于控制對臨界區(qū)資源的訪問數(shù)量。它允許多個線程同時訪問臨界區(qū)資源,但限制了同一時間內可以訪問資源的線程數(shù)量。
- Semaphore 維護一個許可證計數(shù),線程可以獲取和釋放這些許可證。當許可證數(shù)量為零時,線程需要等待,直到其他線程釋放許可證。
Semaphore 基本用法
import java.util.concurrent.Semaphore; public class SemaphoreExample { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); // 初始化信號量,允許同時訪問的線程數(shù)量為3 // 創(chuàng)建多個線程來模擬訪問臨界區(qū)資源 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { try { semaphore.acquire(); // 獲取許可證,如果沒有許可證則阻塞 System.out.println("Thread " + threadId + " acquired a permit and is accessing the resource."); Thread.sleep(2000); // 模擬訪問臨界區(qū)資源的耗時操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 釋放許可證 System.out.println("Thread " + threadId + " released the permit."); } }); thread.start(); } } }
運行結果:
Thread 1 acquired a permit and is accessing the resource.
Thread 3 acquired a permit and is accessing the resource.
Thread 2 acquired a permit and is accessing the resource.
Thread 2 released the permit.
Thread 4 acquired a permit and is accessing the resource.
Thread 5 acquired a permit and is accessing the resource.
Thread 3 released the permit.
Thread 1 released the permit.
Thread 4 released the permit.
Thread 5 released the permit.
在上述示例中,我們創(chuàng)建了一個 Semaphore 實例,并初始化許可證數(shù)量為 3。然后創(chuàng)建了多個線程,每個線程在獲取許可證后訪問臨界區(qū)資源,模擬耗時操作后釋放許可證。由于許可證數(shù)量有限,只有一部分線程能夠同時訪問資源,其他線程需要等待。
Semaphore 適用場景
- 有限資源的并發(fā)訪問,如數(shù)據庫連接池、線程池等。
- 控制對某個資源的同時訪問數(shù)量,以避免資源競爭和過度消耗。
2、CountDownLatch
CountDownLatch 是 Java 并發(fā)包中的同步器之一,用于實現(xiàn)一種等待機制,允許一個或多個線程等待其他線程完成一組操作后再繼續(xù)執(zhí)行。
它通過維護一個計數(shù)器來實現(xiàn)等待和通知的機制。
在創(chuàng)建 CountDownLatch 時,需要指定初始計數(shù)值,每次調用 countDown() 方法會減少計數(shù)值,當計數(shù)值達到零時,等待的線程會被喚醒繼續(xù)執(zhí)行。
CountDownLatch 基本用法
import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) { int numberOfTasks = 3; CountDownLatch latch = new CountDownLatch(numberOfTasks); // 創(chuàng)建多個線程來模擬完成任務 for (int i = 1; i <= numberOfTasks; i++) { int taskId = i; Thread thread = new Thread(() -> { try { System.out.println("Task " + taskId + " is executing..."); Thread.sleep(2000); // 模擬任務執(zhí)行耗時 System.out.println("Task " + taskId + " is completed."); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 完成任務后減少計數(shù) } }); thread.start(); } try { System.out.println("Main thread is waiting for tasks to complete..."); latch.await(); // 等待所有任務完成 System.out.println("All tasks are completed. Main thread continues."); } catch (InterruptedException e) { e.printStackTrace(); } } }
運行結果:
Task 1 is executing...
Main thread is waiting for tasks to complete...
Task 2 is executing...
Task 3 is executing...
Task 1 is completed.
Task 2 is completed.
Task 3 is completed.
All tasks are completed. Main thread continues.
在上述示例中,我們創(chuàng)建了一個 CountDownLatch 實例,并初始化計數(shù)值為 3。然后創(chuàng)建了多個線程來模擬完成任務,每個線程執(zhí)行完任務后調用 countDown() 方法減少計數(shù)。主線程在執(zhí)行 latch.await() 時等待計數(shù)值為零,等待所有任務完成后繼續(xù)執(zhí)行。
使用 CountDownLatch 可以實現(xiàn)多個線程之間的協(xié)調,確保某些操作在其他操作完成后再繼續(xù)執(zhí)行。
CountDownLatch 適用場景
- 主線程等待多個子線程完成任務后再繼續(xù)執(zhí)行。
- 等待多個線程完成初始化工作后再開始并行操作。
3、CyclicBarrier
CyclicBarrier 是 Java 并發(fā)包中的同步器之一,用于實現(xiàn)一組線程在達到一個共同點之前等待彼此,并在達到共同點后繼續(xù)執(zhí)行。它可以被重置并重新使用,適用于需要多個線程協(xié)同工作的場景。
CyclicBarrier 維護一個計數(shù)器和一個柵欄動作(barrier action)。當線程調用 await() 方法時,計數(shù)器減少,當計數(shù)器達到零時,所有等待的線程會被喚醒并繼續(xù)執(zhí)行,同時會執(zhí)行柵欄動作。計數(shù)器可以被重置,并且可以設置柵欄動作,在達到共同點后執(zhí)行。
CyclicBarrier 基本用法
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 3; Runnable barrierAction = () -> System.out.println("All threads reached the barrier!"); CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction); // 創(chuàng)建多個線程來模擬并行執(zhí)行任務 for (int i = 1; i <= numberOfThreads; i++) { int threadId = i; Thread thread = new Thread(() -> { try { System.out.println("Thread " + threadId + " is performing its task."); Thread.sleep(2000); // 模擬任務執(zhí)行耗時 System.out.println("Thread " + threadId + " has reached the barrier."); barrier.await(); // 等待其他線程達到柵欄點 System.out.println("Thread " + threadId + " continues after the barrier."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } } }
運行結果:
Thread 1 is performing its task.
Thread 3 is performing its task.
Thread 2 is performing its task.
Thread 2 has reached the barrier.
Thread 3 has reached the barrier.
Thread 1 has reached the barrier.
All threads reached the barrier!
Thread 1 continues after the barrier.
Thread 2 continues after the barrier.
Thread 3 continues after the barrier.
在上述示例中,我們創(chuàng)建了一個 CyclicBarrier 實例,初始化等待的線程數(shù)量為 3,并設置了柵欄動作。
然后創(chuàng)建多個線程,每個線程模擬執(zhí)行任務后等待其他線程達到柵欄點,當所有線程都達到柵欄點時,柵欄動作會被執(zhí)行。
使用 CyclicBarrier 可以實現(xiàn)多線程協(xié)同工作的場景,確保所有線程在某個共同點之前等待彼此,并在達到共同點后繼續(xù)執(zhí)行。
CyclicBarrier 計數(shù)器重置用法
package com.lf.java.basic.concurrent; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class MultipleCyclicBarrierExample { public static void main(String[] args) { int numberOfThreads = 3; int numberOfRounds = 3; Runnable barrierAction = () -> System.out.println("All threads reached the barrier!"); for (int round = 1; round <= numberOfRounds; round++) { CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, barrierAction); System.out.println("Round " + round + ": Starting tasks"); // 創(chuàng)建多個線程來模擬并行執(zhí)行任務 for (int i = 1; i <= numberOfThreads; i++) { int threadId = i; int finalRound = round; Thread thread = new Thread(() -> { try { System.out.println("Thread " + threadId + " is performing its task for Round " + finalRound); Thread.sleep(2000); // 模擬任務執(zhí)行耗時 System.out.println("Thread " + threadId + " has reached the barrier for Round " + finalRound); barrier.await(); // 等待其他線程達到柵欄點 System.out.println("Thread " + threadId + " continues after the barrier for Round " + finalRound); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); thread.start(); } // 等待所有線程完成當前輪次的任務 try { Thread.sleep(3000); // 等待一段時間以觀察效果 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Round " + round + ": All tasks completed\n"); // 讓當前輪次的所有線程都離開柵欄點,以便重新使用 barrier.reset(); } } }
運行結果:
Round 1: Starting tasks
Thread 1 is performing its task for Round 1
Thread 2 is performing its task for Round 1
Thread 3 is performing its task for Round 1
Thread 3 has reached the barrier for Round 1
Thread 2 has reached the barrier for Round 1
Thread 1 has reached the barrier for Round 1
All threads reached the barrier!
Thread 2 continues after the barrier for Round 1
Thread 1 continues after the barrier for Round 1
Thread 3 continues after the barrier for Round 1
Round 1: All tasks completedRound 2: Starting tasks
Thread 1 is performing its task for Round 2
Thread 2 is performing its task for Round 2
Thread 3 is performing its task for Round 2
Thread 3 has reached the barrier for Round 2
Thread 2 has reached the barrier for Round 2
Thread 1 has reached the barrier for Round 2
All threads reached the barrier!
Thread 1 continues after the barrier for Round 2
Thread 3 continues after the barrier for Round 2
Thread 2 continues after the barrier for Round 2
Round 2: All tasks completedRound 3: Starting tasks
Thread 1 is performing its task for Round 3
Thread 2 is performing its task for Round 3
Thread 3 is performing its task for Round 3
Thread 1 has reached the barrier for Round 3
Thread 2 has reached the barrier for Round 3
Thread 3 has reached the barrier for Round 3
All threads reached the barrier!
Thread 3 continues after the barrier for Round 3
Thread 1 continues after the barrier for Round 3
Thread 2 continues after the barrier for Round 3
Round 3: All tasks completed
在上述示例中,我們模擬了多輪任務協(xié)同。每一輪都創(chuàng)建一個新的 CyclicBarrier 實例,用于協(xié)調線程的等待和通知。在每一輪的任務完成后,我們使用 barrier.reset() 來重置計數(shù)器,以便進行下一輪的任務協(xié)同。
運行這個示例可以看到多輪任務協(xié)同的效果,每一輪的任務都會等待所有線程完成后再繼續(xù),然后重置計數(shù)器以準備下一輪。
CyclicBarrier 適用場景
- 將多個線程分成階段進行,每個階段需要等待其他線程完成后再繼續(xù)。
- 并行計算中的分治操作,等待所有線程完成分治任務后進行合并計算。
4、Phaser
Phaser 是 Java 并發(fā)包中的同步器之一,它提供了更靈活的多階段線程協(xié)調機制,適用于需要分階段進行多個任務的并行執(zhí)行和協(xié)調的場景。Phaser 可以用于更復雜的同步需求,例如循環(huán)的多階段任務協(xié)同。
Phaser 維護了一個計數(shù)器和多個階段(phase)。在每個階段,線程可以注冊、等待和注銷,以及在某個階段到達時執(zhí)行特定的操作。
Phaser 基本用法
import java.util.concurrent.Phaser; public class PhaserExample { public static void main(String[] args) { int numberOfThreads = 3; int numberOfPhases = 3; Phaser phaser = new Phaser(numberOfThreads) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("Phase " + phase + " completed."); return phase == numberOfPhases - 1 || registeredParties == 0; } }; // 創(chuàng)建多個線程來模擬并行執(zhí)行任務 for (int i = 0; i < numberOfThreads; i++) { int threadId = i; Thread thread = new Thread(() -> { for (int phase = 0; phase < numberOfPhases; phase++) { System.out.println("Thread " + threadId + " is in Phase " + phase); phaser.arriveAndAwaitAdvance(); // 等待其他線程到達當前階段 } }); thread.start(); } } }
運行結果:
Thread 0 is in Phase 0
Thread 1 is in Phase 0
Thread 2 is in Phase 0
Phase 0 completed.
Thread 2 is in Phase 1
Thread 1 is in Phase 1
Thread 0 is in Phase 1
Phase 1 completed.
Thread 1 is in Phase 2
Thread 2 is in Phase 2
Thread 0 is in Phase 2
Phase 2 completed.
在上述示例中,我們創(chuàng)建了一個 Phaser 實例,設置初始注冊線程數(shù)量為 3。然后,我們創(chuàng)建多個線程來模擬并行執(zhí)行任務,每個線程都會在每個階段調用 phaser.arriveAndAwaitAdvance() 等待其他線程到達當前階段。當所有線程都到達后,onAdvance() 方法會被調用,用于執(zhí)行階段結束后的操作。
Phaser 提供了更靈活的多階段協(xié)同機制,適用于需要多個階段的任務協(xié)同和并行執(zhí)行的場景。它還支持動態(tài)添加或刪除等待線程,使其更適用于動態(tài)變化的并發(fā)需求。
Phaser 適用場景
- 需要分階段執(zhí)行的任務,每個階段可以有不同的線程數(shù)。
- 需要動態(tài)添加或刪除等待線程的場景。
5、ReentrantLock
ReentrantLock 是 Java 并發(fā)包中的同步器之一,它是一個可重入的互斥鎖,提供了與 synchronized 關鍵字相似的功能,但更為靈活。
與 synchronized 不同,ReentrantLock 具有更多的控制選項和功能,例如公平性、可中斷性、超時等待等。
ReentrantLock 基本用法
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); // 創(chuàng)建多個線程來模擬使用鎖 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { try { lock.lock(); // 獲取鎖 System.out.println("Thread " + threadId + " acquired the lock."); Thread.sleep(2000); // 模擬臨界區(qū)操作耗時 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖 System.out.println("Thread " + threadId + " released the lock."); } }); thread.start(); } } }
運行結果:
Thread 1 acquired the lock.
Thread 1 released the lock.
Thread 2 acquired the lock.
Thread 2 released the lock.
Thread 3 acquired the lock.
Thread 3 released the lock.
Thread 4 acquired the lock.
Thread 4 released the lock.
Thread 5 acquired the lock.
Thread 5 released the lock.
在上述示例中,我們創(chuàng)建了一個 ReentrantLock 實例,并在多個線程中使用它來模擬對共享資源的訪問。每個線程在訪問資源前調用 lock.lock() 來獲取鎖,訪問資源后調用 lock.unlock() 來釋放鎖。
需要注意的是,為了避免死鎖,應該在 finally 塊中釋放鎖,以確保無論是否發(fā)生異常,鎖都會被釋放。
ReentrantLock 還提供了其他方法,如 tryLock()(嘗試獲取鎖,如果鎖可用則獲取,否則返回 false)、lockInterruptibly()(可中斷的獲取鎖,可響應線程中斷)等,使其更加靈活和強大。
ReentrantLock 中斷獲取鎖用法
package com.lf.java.basic.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class InterruptibleLockExample { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); // 創(chuàng)建線程嘗試獲取鎖 Thread thread = new Thread(() -> { try { lock.lockInterruptibly(); // 可中斷獲取鎖 System.out.println("Thread acquired the lock."); Thread.sleep(5000); // 模擬臨界區(qū)操作耗時 } catch (InterruptedException e) { //中斷喚醒線程 System.out.println("Thread interrupted while waiting for the lock."); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); // 釋放鎖 System.out.println("Thread released the lock."); } } }); // 啟動線程 thread.start(); // 主線程等待一段時間后嘗試中斷線程 try { Thread.sleep(2000); System.out.println("Thread interrupt before"); thread.interrupt(); // 中斷線程的等待 System.out.println("Thread interrupt after"); } catch (InterruptedException e) { System.out.println("InterruptedException catch"); e.printStackTrace(); } } }
運行結果:
Thread acquired the lock.
Thread interrupt before
Thread interrupt after
Thread interrupted while waiting for the lock.
Thread released the lock.
在上述示例中,創(chuàng)建了一個線程嘗試獲取鎖,但是主線程在啟動線程后等待了一段時間后中斷了該線程的等待。
由于我們使用了 lock.lockInterruptibly() 來獲取鎖,線程在等待鎖的過程中可以響應中斷,一旦被中斷,它會拋出 InterruptedException,從而可以捕獲中斷事件并做出相應處理。
ReentrantLock 適用場景:
- 需要更精細的同步控制,例如在某些情況下需要手動釋放鎖。
- 需要可中斷或超時等待的線程。
6、ReadWriteLock
ReadWriteLock 是 Java 并發(fā)包中的同步器之一,用于實現(xiàn)讀寫分離的鎖機制,提供了更高效的并發(fā)訪問控制。
它允許多個線程同時讀取共享資源,但在寫入資源時只允許一個線程進行,從而提高了并發(fā)性能。
ReadWriteLock 包含兩種鎖:讀鎖和寫鎖。
- 讀鎖(ReadLock):多個線程可以同時獲取讀鎖,只要沒有線程持有寫鎖。在沒有寫鎖的情況下,多個線程可以并發(fā)讀取共享資源,從而提高并發(fā)性能。
- 寫鎖(Write Lock):寫鎖是獨占的,只有一個線程可以持有寫鎖。在一個線程持有寫鎖時,其他線程無法獲取讀鎖或寫鎖,從而確保對共享資源的寫操作是互斥的。
ReadWriteLock 基本用法
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockExample { public static void main(String[] args) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 創(chuàng)建多個讀線程 for (int i = 1; i <= 5; i++) { int threadId = i; Thread thread = new Thread(() -> { readWriteLock.readLock().lock(); // 獲取讀鎖 try { System.out.println("Thread " + threadId + " is reading."); Thread.sleep(2000); // 模擬讀取操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); // 釋放讀鎖 } }); thread.start(); } // 創(chuàng)建一個寫線程 Thread writeThread = new Thread(() -> { readWriteLock.writeLock().lock(); // 獲取寫鎖 try { System.out.println("Write thread is writing."); Thread.sleep(2000); // 模擬寫入操作 } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); // 釋放寫鎖 } }); writeThread.start(); } }
運行結果:
Thread 1 is reading.
Thread 2 is reading.
Thread 4 is reading.
Thread 3 is reading.
Thread 5 is reading.
Write thread is writing
在上述示例中,我們創(chuàng)建了一個 ReadWriteLock 實例,然后創(chuàng)建多個讀線程和一個寫線程來模擬讀寫操作。
讀線程在執(zhí)行時調用 readWriteLock.readLock().lock() 來獲取讀鎖,寫線程在執(zhí)行時調用 readWriteLock.writeLock().lock() 來獲取寫鎖。
使用 ReadWriteLock 可以提高對共享資源的并發(fā)訪問性能,適用于讀操作頻繁,寫操作較少的場景。
ReadWriteLock 適用場景
- 讀操作頻繁,寫操作較少的情況,以提高并發(fā)性能。
- 允許多個線程同時讀取資源,但在寫入資源時確保互斥。
7、 Condition
Condition 是 Java 并發(fā)包中的同步器之一,它提供了更靈活的線程等待和通知機制,用于在多線程環(huán)境下實現(xiàn)精細的線程協(xié)調。
Condition 是與 Lock 結合使用的,它可以替代傳統(tǒng)的 wait() 和 notify() 方法,提供更多的控制選項和功能。
通過 Condition,我們可以實現(xiàn)更精確的等待和通知,以及更靈活的線程喚醒機制。
Condition 基本用法
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionExample { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // 創(chuàng)建一個等待線程 Thread waitingThread = new Thread(() -> { lock.lock(); try { System.out.println("Waiting thread is waiting..."); condition.await(); // 等待條件滿足 System.out.println("Waiting thread is awake."); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 創(chuàng)建一個喚醒線程 Thread signalingThread = new Thread(() -> { lock.lock(); try { Thread.sleep(2000); // 模擬等待一段時間 System.out.println("Signaling thread is awake."); condition.signal(); // 喚醒等待線程 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 啟動線程 waitingThread.start(); signalingThread.start(); } }
運行結果:
Waiting thread is waiting...
Signaling thread is awake.
Waiting thread is awake.
在上述示例中,我們創(chuàng)建了一個 ReentrantLock 實例和一個與之關聯(lián)的 Condition,然后創(chuàng)建了一個等待線程和一個喚醒線程。
等待線程在調用 condition.await() 后進入等待狀態(tài),直到喚醒線程調用 condition.signal() 來喚醒它。
通過使用 Condition,我們可以更加精確地控制線程的等待和通知,使線程協(xié)調更加靈活。
Condition 實現(xiàn)阻塞隊列
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BlockingQueueWithCondition<T> { private final Queue<T> queue = new LinkedList<>(); private final int capacity; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public BlockingQueueWithCondition(int capacity) { this.capacity = capacity; } public void put(T item) throws InterruptedException { lock.lock(); try { while (queue.size() == capacity) { notFull.await(); } queue.offer(item); notEmpty.signal(); } finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { notEmpty.await(); } T item = queue.poll(); notFull.signal(); return item; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public static void main(String[] args) { BlockingQueueWithCondition<Integer> queue = new BlockingQueueWithCondition<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
運行結果:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我們使用 Condition 來實現(xiàn)了一個阻塞隊列,其中 put() 方法用于向隊列中放入元素,take() 方法用于從隊列中取出元素。
當隊列滿時,生產者線程會等待 notFull 條件,當隊列為空時,消費者線程會等待 notEmpty 條件。
這個示例展示了如何使用 Condition 來實現(xiàn)線程之間的協(xié)調,以及如何實現(xiàn)一個簡單的阻塞隊列。注意,這個示例并沒有處理所有的邊界情況和異常情況,實際使用時需要考慮更多細節(jié)。
8、BlockingQueue
BlockingQueue 是 Java 并發(fā)包中的一個接口,它提供了一種線程安全的隊列實現(xiàn),用于在多線程環(huán)境下進行數(shù)據的生產和消費。
BlockingQueue 支持阻塞操作,當隊列滿或空時,線程會被阻塞,直到條件滿足。
BlockingQueue 提供了多種實現(xiàn),包括:
- ArrayBlockingQueue:基于數(shù)組的有界阻塞隊列。
- LinkedBlockingQueue:基于鏈表的可選有界阻塞隊列。
- PriorityBlockingQueue:基于優(yōu)先級的無界阻塞隊列。
- DelayQueue:基于延遲時間的無界阻塞隊列。
- SynchronousQueue:不存儲元素的阻塞隊列,用于直接傳遞數(shù)據。
- LinkedTransferQueue:基于鏈表的無界阻塞隊列,結合了 LinkedBlockingQueue 和SynchronousQueue 特性。
- LinkedBlockingDeque:基于鏈表的雙端阻塞隊列。
BlockingQueue 基本用法
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
運行結果:
Consumed: 1
Produced: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我們使用了 ArrayBlockingQueue 來實現(xiàn)阻塞隊列,其中生產者線程使用 put() 方法向隊列中放入元素,消費者線程使用 take() 方法從隊列中取出元素。
當隊列滿或空時,線程會被阻塞,直到條件滿足。
BlockingQueue 是實現(xiàn)線程安全的生產者-消費者模式的常用工具,它簡化了線程之間的協(xié)調和通信。
9、BlockingDeque
BlockingDeque(阻塞雙端隊列)是 Java 并發(fā)包中的一個接口,它是 BlockingQueue 接口的擴展,提供了雙端隊列的功能,并支持阻塞操作。BlockingDeque 可以在隊列的兩端插入和刪除元素,同時支持阻塞操作,使得在多線程環(huán)境下更容易實現(xiàn)數(shù)據的生產和消費。
BlockingDeque 接口的實現(xiàn)類包括:
- LinkedBlockingDeque:基于鏈表的阻塞雙端隊列,可選有界或無界。
- LinkedBlockingDeque:基于鏈表的雙端阻塞隊列,無界。
BlockingDeque基本用法
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; public class BlockingDequeExample { public static void main(String[] args) { BlockingDeque<Integer> deque = new LinkedBlockingDeque<>(5); Thread producerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { deque.put(i); System.out.println("Produced: " + i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { for (int i = 1; i <= 10; i++) { int item = deque.take(); System.out.println("Consumed: " + item); Thread.sleep(1500); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); } }
運行結果:
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Produced: 10
Consumed: 8
Consumed: 9
Consumed: 10
在上述示例中,我們使用了 LinkedBlockingDeque 來實現(xiàn)阻塞雙端隊列,生產者線程使用 put() 方法向隊列中放入元素,消費者線程使用 take() 方法從隊列中取出元素。
與 BlockingQueue 類似,當隊列滿或空時,線程會被阻塞,直到條件滿足。
BlockingDeque 可以更靈活地實現(xiàn)在隊列兩端插入和刪除元素,適用于更多種類的場景,例如雙向數(shù)據傳輸和窗口滑動等。
以下是一些常用的在隊列兩端插入和刪除元素的方法:
- 在隊列頭部插入元素:
void addFirst(E e): 將元素添加到隊列的頭部,如果隊列已滿,則拋出異常。
boolean offerFirst(E e): 將元素添加到隊列的頭部,如果隊列已滿,則返回 false。
void putFirst(E e): 將元素添加到隊列的頭部,如果隊列已滿,則阻塞等待直到有空間。
- 在隊列尾部插入元素:
void addLast(E e):將元素添加到隊列的尾部,如果隊列已滿,則拋出異常。
boolean offerLast(E e):將元素添加到隊列的尾部,如果隊列已滿,則返回 false。
void putLast(E e):將元素添加到隊列的尾部,如果隊列已滿,則阻塞等待直到有空間。
- 從隊列頭部刪除元素:
E removeFirst(): 移除并返回隊列頭部的元素,如果隊列為空,則拋出異常。
E pollFirst(): 移除并返回隊列頭部的元素,如果隊列為空,則返回 null。
E takeFirst(): 移除并返回隊列頭部的元素,如果隊列為空,則阻塞等待直到有元素。
- 從隊列尾部刪除元素:
E removeLast():移除并返回隊列尾部的元素,如果隊列為空,則拋出異常。
E pollLast(): 移除并返回隊列尾部的元素,如果隊列為空,則返回 null。
E takeLast(): 移除并返回隊列尾部的元素,如果隊列為空,則阻塞等待直到有元素。
這些方法使得你可以在雙端隊列的頭部和尾部執(zhí)行插入和刪除操作,根據具體的需求選擇合適的方法來實現(xiàn)線程安全的雙端隊列操作。
10、LockSupport
LockSupport 是 Java 并發(fā)包中提供的工具類,用于線程的阻塞和喚醒操作。
它提供了一種基于許可(permit)的方式來控制線程的阻塞和喚醒,相對于傳統(tǒng)的 wait() 和 notify() 方法,LockSupport 更加靈活和可靠。
主要的方法包括:
- void park():阻塞當前線程,直到獲得許可。
- void park(Object blocker):阻塞當前線程,并將 blocker 關聯(lián)到當前線程,用于監(jiān)控和診斷工具。
- void parkNanos(long nanos):阻塞當前線程,最多等待指定的納秒數(shù),直到獲得許可。
- void parkNanos(Object blocker, long nanos):阻塞當前線程,并將 blocker關聯(lián)到當前線程,最多等待指定的納秒數(shù)。
- void parkUntil(long deadline):阻塞當前線程,直到指定的時間戳,直到獲得許可。
- void parkUntil(Object blocker, long deadline):阻塞當前線程,并將 blocker關聯(lián)到當前線程,直到指定的時間戳。
- void unpark(Thread thread):喚醒指定的線程,如果線程被阻塞,則解除阻塞狀態(tài)。
LockSupport基本用法
import java.util.concurrent.locks.LockSupport; public class LockSupportExample { public static void main(String[] args) { Thread thread = new Thread(() -> { System.out.println("Thread is going to be parked."); LockSupport.park(); // 阻塞當前線程 System.out.println("Thread is unparked."); }); thread.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Main thread is unparking the parked thread."); LockSupport.unpark(thread); // 喚醒被阻塞的線程 } }
運行結果:
Thread is going to be parked.
Main thread is unparking the parked thread.
Thread is unparked.
在上述示例中,我們創(chuàng)建了一個新線程,調用了 LockSupport.park() 來阻塞該線程。
然后,主線程等待 2 秒后,調用了 LockSupport.unpark(thread) 來喚醒被阻塞的線程。與傳統(tǒng)的 wait() 和 notify() 方法不同,LockSupport 是基于許可的,不需要獲取某個特定對象的鎖來進行阻塞和喚醒操作。
LockSupport 提供了一種更直接、靈活和可控的線程阻塞和喚醒機制,適用于各種多線程協(xié)調的場景。
11、Exchanger
Exchanger 是 Java 并發(fā)包中的同步器之一,用于實現(xiàn)兩個線程之間交換數(shù)據。
它提供了一個同步點,當兩個線程都到達這個同步點時,它們可以交換數(shù)據。Exchanger 可以用于實現(xiàn)線程間的數(shù)據傳遞和協(xié)作
Exchanger 提供了兩個線程之間交換數(shù)據的功能,但僅限于兩個線程。當兩個線程都到達 Exchanger 同步點時,它們可以通過 exchange() 方法交換數(shù)據,然后各自繼續(xù)執(zhí)行。
Exchanger基本用法
import java.util.concurrent.Exchanger; public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); // 創(chuàng)建一個線程來發(fā)送數(shù)據 Thread senderThread = new Thread(() -> { try { String dataToSend = "Hello from Sender"; System.out.println("Sender is sending: " + dataToSend); exchanger.exchange(dataToSend); // 發(fā)送數(shù)據并等待接收數(shù)據 } catch (InterruptedException e) { e.printStackTrace(); } }); // 創(chuàng)建一個線程來接收數(shù)據 Thread receiverThread = new Thread(() -> { try { String receivedData = exchanger.exchange(null); // 等待接收數(shù)據并發(fā)送數(shù)據 System.out.println("Receiver received: " + receivedData); } catch (InterruptedException e) { e.printStackTrace(); } }); // 啟動線程 senderThread.start(); receiverThread.start(); } }
運行結果
Sender is sending: Hello from Sender
Receiver received: Hello from Sender
在上述示例中,我們創(chuàng)建了一個 Exchanger 實例,然后創(chuàng)建了一個發(fā)送數(shù)據的線程和一個接收數(shù)據的線程。
當發(fā)送數(shù)據的線程調用 exchange() 方法時,它會發(fā)送數(shù)據并等待接收數(shù)據;而接收數(shù)據的線程調用 exchange() 方法時,它會等待接收數(shù)據并發(fā)送數(shù)據。當兩個線程都到達 Exchanger 同步點時,它們會交換數(shù)據,并繼續(xù)執(zhí)行。
需要注意的是,Exchanger 只適用于兩個線程之間的數(shù)據交換。如果需要更多線程之間的數(shù)據交換,可能需要組合使用多個 Exchanger。
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
java 中用split分割字符串,最后的空格等不被拆分的方法
下面小編就為大家?guī)硪黄猨ava 中用split分割字符串,最后的空格等不被拆分的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02使用springboot logback動態(tài)獲取application的配置項
這篇文章主要介紹了使用springboot logback動態(tài)獲取application的配置項,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08Java中Object toString方法簡介_動力節(jié)點Java學院整理
Object類在Java里面是一個比較特殊的類,JAVA為了組織這個類組織得比較方便,它提供了一個最根上的類,相當于所有的類都是從這個類繼承,這個類就叫Object。接下來通過本文給大家介紹Object toString方法,需要的的朋友參考下吧2017-05-05淺談HttpClient、okhttp和RestTemplate的區(qū)別
這篇文章主要介紹了HttpClient、okhttp和RestTemplate的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06