Java多線程(單例模式,阻塞隊列,定時器,線程池)詳解
1. 單例模式(singleton pattern)
單例模式是通過代碼,保護一個類,使得類在整個進程(應(yīng)用)運行過程中有且只有一個。
常用在配置對象、控制類。
設(shè)計模式(design pattern):對一些解決通用問題的、經(jīng)常書寫得代碼片段的總結(jié)與歸納。
1.1 懶漢模式
一開始就初始化
public class StarvingMode {
// 是線程安全的
// 類加載的時候執(zhí)行
// JVM 保證了類加載的過程是線程安全的
private static StarvingMode instance = new StarvingMode();
public static StarvingMode getInstance() {
return instance;
}
// 將構(gòu)造方法私有化,防止其他線程new
private StarvingMode() {}
}
1.2 餓漢模式
等到用的時候在進行初始化
a. 餓漢模式-單線程版
類加載的時候不創(chuàng)建實例,第一次使用的時候才創(chuàng)建實例
public class LazyModeV1 {
private static LazyModeV1 instance = null;
public static LazyModeV1 getInstance(){
// 第一次調(diào)用這個方法時,說明我們應(yīng)該實例化對象了
// 原子性
if (instance == null) {
instance = new LazyModeV1(); // 只在第一次的時候執(zhí)行
}
return instance;
}
// 將構(gòu)造方法私有化,防止其他線程new
private LazyModeV1(){};
}
但是如果在多個線程中同時調(diào)用 getInstance 方法, 就可能導(dǎo)致創(chuàng) 建出多個實例,一旦實例已經(jīng)創(chuàng)建好了, 后面再多線程環(huán)境調(diào)用 getInstance 就不再有線程安全問題了(不再修改 instance 了)
b. 餓漢模式-多線程版
加 synchronized 鎖 使線程安全
public class LazyModeV2 {
private static LazyModeV2 instance = null;
// 加synchronized鎖,但是這樣性能太低,所以有了mode3
public synchronized static LazyModeV2 getInstance(){
// 第一次調(diào)用這個方法時,說明我們應(yīng)該實例化對象了
if (instance == null) {
instance = new LazyModeV2(); // 只在第一次的時候執(zhí)行
}
return instance;
}
private LazyModeV2(){};
}
但是顯而易見,如果簡單粗暴的加鎖,只在第一次初始化時為保證線程安全使用一次,在后續(xù)getInstance 時也要進行加鎖解鎖操作,降低性能。
c. 餓漢模式-多線程改進版
1.使用雙重 if 判定, 降低鎖競爭的頻率
2.給 instance 加上了 volatile
class LazyModeV3 {
// volatile
private volatile static LazyModeV3 instance = null;
public static LazyModeV3 getInstance(){
// 1. 第一次調(diào)用這個方法時,說明我們應(yīng)該實例化對象了
if (instance == null) {
// 在第一次instance 沒有初始化的時候
// 沒有鎖保護,有多個線程可以走到這里 a, b, c, d
// 2. **但是只有第一個線程a能加鎖,a 加鎖后并且實例化對象,
// **b, c, d 加鎖進去后發(fā)現(xiàn)instance != null, 就不會再創(chuàng)建了
synchronized (LazyModeV3.class) {
// 3. 加鎖之后才能執(zhí)行
// 第一個搶到鎖的線程看instance 是 null
// 其他第一個搶到鎖的線程看instance 是 null
// 保證instance 只實例化一次
if (instance == null) {
instance = new LazyModeV3(); // 只在第一次的時候執(zhí)行
// 4. 但是還可能出問題,出現(xiàn)重排序,變成 1 -> 3 -> 2 其他線程掉instance就出現(xiàn)問題,
// 所以定義時就加上volatile,防止重排序;
}
}
}
return instance;
}
private LazyModeV3(){};
}2 阻塞隊列(blocking queue)
2.1 阻塞隊列
阻塞隊列是一種特殊的隊列也遵守 "先進先出" 的原則
阻塞隊列能是一種線程安全的數(shù)據(jù)結(jié)構(gòu), 并且具有以下特性:
- 當(dāng)隊列滿的時候, 繼續(xù)入隊列就會阻塞, 直到有其他線程從隊列中取走元素
- 當(dāng)隊列空的時候, 繼續(xù)出隊列也會阻塞, 直到有其他線程往隊列中插入元素
阻塞隊列的一個典型應(yīng)用場景就是 "生產(chǎn)者消費者模型".
2.2 生產(chǎn)者消費者模型
生產(chǎn)者消費者模式就是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。
生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取
- 阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力
- 阻塞隊列也能使生產(chǎn)者和消費者之間 解耦
2.3 標(biāo)準庫中的阻塞隊列
在 Java 標(biāo)準庫,JUC包下的blocking queue,是Queue 的子接口

- BlockingQueue 是一個接口. 真正實現(xiàn)的類是 LinkedBlockingQueue(無上限)、ArrayBlockingQueue(有上限)
- put 方法用于阻塞式的入隊列, take 用于阻塞式的出隊列
- BlockingQueue 也有 offer, poll, peek 等方法, 但是這些方法不帶有阻塞特性
- 都會拋出lnterruptedException 異常,可以被中斷
public class Main0 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue b1 = new LinkedBlockingDeque();
BlockingQueue<Integer> b2 = new ArrayBlockingQueue<>(3);
b2.put(1);
b2.put(2);
b2.put(3);
b2.put(4); // 插入第四個時就會阻塞
}
}
2.4 實現(xiàn)阻塞隊列
通過 "循環(huán)隊列" 的方式來實現(xiàn).
使用 synchronized 進行加鎖控制.
put 插入元素的時候, 判定如果隊列滿了, 就進行 wait. (注意, 要在循環(huán)中進行 wait. 被喚醒時不一定 隊列就不滿了, 因為同時可能是喚醒了多個線程).
take 取出元素的時候, 判定如果隊列為空, 就進行 wait. (也是循環(huán) wait)
public class MyArrayBlockingQueue {
private long[] array;
private int frontIndex;
private int rearIndex;
private int size;
public MyArrayBlockingQueue (int capacity){
array = new long[capacity];
frontIndex = 0;
rearIndex = 0;
size = 0;
}
public synchronized void put (long val) throws InterruptedException {
// while 防止假喚醒
while(size == array.length){
this.wait();
}
// 預(yù)期:隊列一定不是滿的
array[rearIndex] = val;
rearIndex++;
if(rearIndex == array.length){
rearIndex = 0;
}
// notify();
// 在多生產(chǎn)者,多消費者時用notifyAll()
notifyAll();
}
public synchronized long take () throws InterruptedException {
while(size == 0){
wait();
}
long val = array[frontIndex];
frontIndex++;
if(frontIndex == array.length){
frontIndex = 0;
}
// notify();
// 在多生產(chǎn)者,多消費者時用notifyAll()
notifyAll();
return val;
}
}3. 定時器
定時器一種實際開發(fā)中非常常用的組件,類似于一個“鬧鐘”,達到特定時間執(zhí)行某個特定的代碼。
3.1 標(biāo)準庫中的定時器
標(biāo)準庫中提供了一個 Timer 類. Timer 類的核心方法為 schedule
schedule 包含兩個參數(shù). 第一個參數(shù)指定即將要執(zhí)行的任務(wù)代碼, 第二個參數(shù)指定多長時間之后執(zhí) 行 (單位為毫秒)
public class UserTimer {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
System.out.println("鬧鐘響了");
}
};
//timer.schedule(task, 5000); // 5秒后執(zhí)行任務(wù)
timer.schedule(task, 2000, 3000); // 2秒后執(zhí)行任務(wù),并且之后每三秒執(zhí)行一次
while (true){} // 主線程死循環(huán),所以之后的輸出都不是主線程打印的
}
}
3.2 實現(xiàn)定時器
- 一個帶優(yōu)先級的阻塞隊列
- 隊列中的每個元素是一個 Task 對象.
- Task 中帶有一個時間屬性, 隊首元素就是即將執(zhí)行的Task
- 同時有一個 worker 線程一直掃描隊首元素, 看隊首元素是否需要執(zhí)行
為啥要帶優(yōu)先級呢?
因為阻塞隊列中的任務(wù)都有各自的執(zhí)行時刻 (delay). 最先執(zhí)行的任務(wù)一定是 delay 最小的. 使用帶 優(yōu)先級的隊列就可以高效的把這個 delay 最小的任務(wù)找出來.
import java.util.concurrent.PriorityBlockingQueue;
// 定義一個工作抽象類
abstract class MyTimerTask implements Comparable<MyTimerTask> {
long runAt; // 這個任務(wù)應(yīng)該在何時運行(記錄為 ms 為單位的時間戳)
abstract public void run();
@Override
public int compareTo(MyTimerTask o) {
if (runAt < o.runAt) {
return -1;
} else if (runAt > o.runAt) {
return 1;
} else {
return 0;
}
}
}
// 定時器
public class MyTimer {
// 這里是普通屬性,不是靜態(tài)屬性
// 優(yōu)先級隊列,要求元素具備比較能力
private final PriorityBlockingQueue<MyTimerTask> queue = new PriorityBlockingQueue<>();
private final Object newTaskComing = new Object();
public MyTimer() {
Worker worker = new Worker();
worker.start();
}
// 不能使用靜態(tài)內(nèi)部類,否則看不到外部類的屬性
class Worker extends Thread {
@Override
public void run() {
while (true) {
MyTimerTask task = null;
try {
task = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
// task 應(yīng)該有個應(yīng)該執(zhí)行的時刻(不能記錄 delay)
long now = System.currentTimeMillis();
long delay = task.runAt - now;
if (delay <= 0) {
task.run();
} else {
try {
// Thread.sleep(delay); // 5s
// 應(yīng)該在兩種條件下醒來:
// 1. 有新的任務(wù)過來了(任務(wù)可能比當(dāng)前最小的任務(wù)更靠前)
// 2. 沒有新任務(wù)來,但到了該執(zhí)行該任務(wù)的時候了
synchronized (newTaskComing) {
newTaskComing.wait(delay); // 最多等待delay秒
}
// 如果當(dāng)前時間已經(jīng)在要執(zhí)行任務(wù)的時間之后了
// 說明任務(wù)的執(zhí)行時間已過,所以應(yīng)該去執(zhí)行任務(wù)了
// 否則,先把這個任務(wù)放回去(因為時間還沒到),再去取最小的任務(wù)
if (System.currentTimeMillis() >= task.runAt) {
task.run();
} else {
queue.put(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public void schedule(MyTimerTask task, long delay) {
// 該方法非工作線程(主線程)調(diào)用
task.runAt = System.currentTimeMillis() + delay;
queue.put(task);
synchronized (newTaskComing) {
newTaskComing.notify();
}
}
}4 線程池
因為創(chuàng)建線程 / 銷毀線程 的開銷較大,使用線程池就是減少每次啟動、銷毀線程的損耗

4.1 標(biāo)準庫中的線程池
Executor -> ExecutorService -> ThreadPoolExcutor() 實現(xiàn)類
- corePoolSize: 正式員工的名額上限
- maximumPoolSize: 正式+臨時的名額上限
- keepAliveTime + unit: 臨時工允許空閑時間的上限
- workQueue: 任務(wù)隊列
- handler: 拒絕(默認)、調(diào)用者允許、丟棄最老的、丟棄當(dāng)前
import java.util.Scanner;
import java.util.concurrent.*;
public class Demo {
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
ExecutorService service = new ThreadPoolExecutor(
3, // 正式員工 10
9, // 臨時員工 20
10, TimeUnit.SECONDS,
queue, // 阻塞隊列
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "飯店廚師");
return t;
}
}, // 線程工廠
new ThreadPoolExecutor.AbortPolicy() // 拒絕策略
);
// 定義任務(wù)
Runnable task = new Runnable() {
@Override
public void run() {
try {
TimeUnit.DAYS.sleep(365);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 把任務(wù)提交給線程池對象(公司)
Scanner s = new Scanner(System.in);
for (int i = 1; i < 100; i++) {
s.nextLine();
service.execute(task);
System.out.println(i);
}
}
}Executor 是接口
Executor 定義了一些固定策略的線程池
4.2 Executors 創(chuàng)建線程池的幾種方式
- newFixedThreadPool: 創(chuàng)建固定線程數(shù)的線程池(只有正式員工)
- newCachedThreadPool: 創(chuàng)建線程數(shù)目動態(tài)增長的線程池(只有臨時員工)
- newSingleThreadExecutor: 創(chuàng)建只包含單個線程的線程池(只有一個正式員工)
- newScheduledThreadPool: 設(shè)定 延遲時間后執(zhí)行命令,或者定期執(zhí)行命令. 是進階版的 Timer
public class Demo2 {
public static void main(String[] args) {
// 不太建議在實際生產(chǎn)項目下使用
ExecutorService service = Executors.newFixedThreadPool(10);
ExecutorService service1 = Executors.newSingleThreadExecutor();
ExecutorService service2 = Executors.newCachedThreadPool();
Runnable task = new Runnable() {
@Override
public void run() {
}
};
service.execute(task);
}
}
4.3 利用線程池 創(chuàng)建多線程計算fib 數(shù)
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo3 {
static class CalcFib implements Runnable {
private final int n;
CalcFib(int n) {
this.n = n;
}
@Override
public void run() {
long r = fib(n);
System.out.printf("fib(%d) = %d\n", n, r);
}
private long fib(int n) {
if (n == 0 || n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 2);
}
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
ExecutorService service = Executors.newFixedThreadPool(10);
while (true) {
System.out.print("提交數(shù)字: ");
int n = scanner.nextInt();
Runnable task = new CalcFib(n);
service.execute(task);
}
}
}4.4 實現(xiàn)線程池

總結(jié):
線程中線程是按需創(chuàng)建:
- 一開始一個線程都沒有︰隨著任務(wù)提交,創(chuàng)建core線程(當(dāng)前線程數(shù)<corePoolSize)
- 優(yōu)先提交隊列,直到隊列滿
- 創(chuàng)建臨時工去處理 > corePoolSize的線程,直到maximumPoolSize
- 執(zhí)行拒絕策略
import java.util.concurrent.*;
// 線程池類
public class MyThreadPoolExecutor implements Executor {
private int currentCoreSize; // 當(dāng)前正式員工的數(shù)量
private final int corePoolSize; // 正式員工的數(shù)量上限
private int currentTemporarySize; // 當(dāng)前臨時員工的數(shù)量
private final int temporaryPoolSize; // 臨時員工的數(shù)量上限
private final ThreadFactory threadFactory;// 創(chuàng)建線程的工廠對象
// 臨時工摸魚的時間上限
private final long keepAliveTime;
private final TimeUnit unit;
// 傳遞任務(wù)的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
public MyThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.temporaryPoolSize = maximumPoolSize - corePoolSize;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
this.keepAliveTime = keepAliveTime;
this.unit = unit;
}
// 向線程池中提交任務(wù)
@Override
public void execute(Runnable command) {
// 1. 如果正式員工的數(shù)量還低于正式員工的數(shù)量上限,則優(yōu)先創(chuàng)建正式員工處理任務(wù)
// 1.1 需要管理,當(dāng)前正式員工有多少,正式員工的數(shù)量上限有多少?
if (currentCoreSize < corePoolSize) {
// 優(yōu)先創(chuàng)建正式員工進行處理
// 創(chuàng)建一個線程,這個線程中的任務(wù)就是不斷地取任務(wù)-做任務(wù),但是不需要考慮退出的問題
CoreJob job = new CoreJob(workQueue, command);
// Thread thread = new Thread(job); // 不使用工廠創(chuàng)建的線程
Thread thread = threadFactory.newThread(job); // thread 代表的就是正式員工
String name = String.format("正式員工-%d", currentCoreSize);
thread.setName(name);
thread.start();
// 只是兩種不同的策略,沒有誰是正確的說法
// 1. 把 command 放到隊列中;command 的執(zhí)行次序是在隊列已有的任務(wù)之后
// 2. 創(chuàng)建正式員工的時候,就把 command 提交給正式員工,讓 command 優(yōu)先執(zhí)行
// 我們這里采用第二種方案,主要原因就是 java 官方的就是使用的第二種策略
currentCoreSize++;
return;
}
// 走到這里,說明正式員工的數(shù)量 == 正式員工的上限了
// 2. 優(yōu)先把任務(wù)放入隊列中,如果放入成功,execute 執(zhí)行結(jié)束,否則還需要繼續(xù)
// 2.1 需要一個阻塞隊列
// workQueue.put(command); // 帶阻塞的放入,是否滿足這里的需求?
// 我們這里希望的是立即得到結(jié)果
boolean success = workQueue.offer(command);
if (success == true) {
// 說明放入隊列成功
return;
}
// 隊列也已經(jīng)放滿了
// 3. 繼續(xù)判斷,臨時工的數(shù)量有沒有到上限,如果沒有到達,創(chuàng)建新的臨時工來處理
if (currentTemporarySize < temporaryPoolSize) {
// 創(chuàng)建臨時工進行處理
TemporaryJob job = new TemporaryJob(keepAliveTime, unit, workQueue, command);
//Thread thread = new Thread(job); // 不使用工廠創(chuàng)建的線程
Thread thread = threadFactory.newThread(job); // thread 代表的就是臨時員工
String name = String.format("臨時員工-%d", currentTemporarySize);
thread.setName(name);
thread.start();
currentTemporarySize++;
return;
}
// 4. 執(zhí)行拒絕策略
// 為了實現(xiàn)方便,暫時不考慮其他策略
throw new RejectedExecutionException();
}
// 一個正式員工線程要完成的工作
class CoreJob implements Runnable {
// 需要阻塞隊列
private final BlockingQueue<Runnable> workQueue;
private Runnable firstCommand;
CoreJob(BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
this.workQueue = workQueue;
this.firstCommand = firstCommand;
}
@Override
public void run() {
try {
firstCommand.run(); // 優(yōu)先先把剛提交的任務(wù)先做掉了
firstCommand = null; // 這里設(shè)置 null 的意思是,不影響 firstCommand 對象被 GC 時的回收
while (!Thread.interrupted()) {
Runnable command = workQueue.take();
command.run();
}
} catch (InterruptedException ignored) {}
}
}
// 一個臨時員工線程要完成的工作
class TemporaryJob implements Runnable {
// 需要阻塞隊列
private final BlockingQueue<Runnable> workQueue;
private final long keepAliveTime;
private final TimeUnit unit;
private Runnable firstCommand;
TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
this.keepAliveTime = keepAliveTime;
this.unit = unit;
this.workQueue = workQueue;
this.firstCommand = firstCommand;
}
@Override
public void run() {
try {
firstCommand.run(); // 優(yōu)先先把剛提交的任務(wù)先做掉了
firstCommand = null; // 這里設(shè)置 null 的意思是,不影響 firstCommand 對象被 GC 時的回收
// 一旦超過一定時間沒有任務(wù),臨時工是需要退出的
// 1. keepAliveTime + unit 記錄起來
// 2. 怎么就知道超過多久沒有任務(wù)了?如果一定時間內(nèi)都無法從隊列中取出來任務(wù),則認為摸魚時間夠了
while (!Thread.interrupted()) {
// Runnable command = workQueue.take();
Runnable command = workQueue.poll(keepAliveTime, unit);
if (command == null) {
// 說明,沒有取到任務(wù)
// 說明超時時間已到
// 說明該線程已經(jīng) keepAliveTime + unit 時間沒有工作了
// 所以,可以退出了
break;
}
command.run();
}
} catch (InterruptedException ignored) {}
}
}
}以上就是Java多線程(單例模式,阻塞隊列,定時器,線程池)詳解的詳細內(nèi)容,更多關(guān)于Java多線程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot工程Docker多環(huán)境中使用同一個Jar包解決方案
在Docker多環(huán)境部署中,SpringBoot工程可以通過環(huán)境變量來動態(tài)改變配置,無需重新打包,利用volume掛載或docker?cp命令,可以將配置文件直接傳入容器,提高部署效率,并保證安全性2024-09-09
Java的MyBatis框架中對數(shù)據(jù)庫進行動態(tài)SQL查詢的教程
這篇文章主要介紹了Java的MyBatis框架中對數(shù)據(jù)庫進行動態(tài)SQL查詢的教程,講解了MyBatis中一些控制查詢流程的常用語句,需要的朋友可以參考下2016-04-04
java實現(xiàn)小型局域網(wǎng)群聊功能(C/S模式)
這篇文章主要介紹了java利用TCP協(xié)議實現(xiàn)小型局域網(wǎng)群聊功能(C/S模式) ,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-08-08

