Java常見面試題之多線程和高并發(fā)詳解
volatile
對 volatile的理解
volatile 是一種輕量級的同步機制。
- 保證數(shù)據(jù)可見性
- 不保證原子性
- 禁止指令重排序
JMM
JMM(Java 內(nèi)存模型)是一種抽象的概念,描述了一組規(guī)則或規(guī)范,定義了程序中各個變量的訪問方式。
JVM運行程序的實體是線程,每個線程創(chuàng)建時 JVM 都會為其創(chuàng)建一個工作內(nèi)存,是線程的私有數(shù)據(jù)區(qū)域。JMM中規(guī)定所有變量都存儲在主內(nèi)存,主內(nèi)存是共享內(nèi)存。線程對變量的操作在工作內(nèi)存中進行,首先將變量從主內(nèi)存拷貝到工作內(nèi)存,操作完成后寫會主內(nèi)存。不同線程間無法訪問對方的工作內(nèi)存,線程通信(傳值)通過主內(nèi)存來完成。
JMM 對于同步的規(guī)定:
- 線程解鎖前,必須把共享變量的值刷新回主內(nèi)存
- 線程加鎖前,必須讀取主內(nèi)存的最新值到自己的工作內(nèi)存
- 加鎖解鎖是同一把鎖
JMM 的三大特性
- 可見性
- 原子性
- 順序性
原子性是不可分割,某個線程正在做某個具體業(yè)務(wù)時,中間不可以被分割,要么全部成功,要么全部失敗。
重排序:計算機在執(zhí)行程序時,為了提高性能,編譯器和處理器常常對指令做重排序,源代碼經(jīng)過編譯器優(yōu)化重排序、指令并行重排序、內(nèi)存系統(tǒng)的重排序之后得到最終執(zhí)行的指令。
在單線程中保證程序最終執(zhí)行結(jié)果和代碼執(zhí)行順序執(zhí)行結(jié)果一致。
多線程中線程交替執(zhí)行,由于重排序,兩個線程中使用的變量能否保證一致性無法確定,結(jié)果無法確定。
處理器在處理重排序時需要考慮數(shù)據(jù)的依賴性。
volatile 實現(xiàn)禁止指令重排序,避免多線程環(huán)境下程序亂序執(zhí)行。是通過內(nèi)存屏障指令來執(zhí)行的,通過插入內(nèi)存屏障禁止在內(nèi)存屏障后的指令執(zhí)行重排序優(yōu)化,并強制刷出緩存數(shù)據(jù),保證線程能讀取到這些數(shù)據(jù)的最新版本。
實例1:volatile 保證可見性
class MyData { //volatile int number = 0;//case2 //int number=0; //case1 public void change() { number = 60; } } public class VolatileDemo { public static void main(String[] args) { MyData data=new MyData(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t come in"); try{ TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();} data.change(); System.out.println(Thread.currentThread().getName()+"\t updated number value:"+data.number); },"A").start(); while(data.number==0){} System.out.println(Thread.currentThread().getName()+"\t over, get number:"+data.number); } }
當我們使用case1的時候,也就是number沒有volatile修飾的時候,運行結(jié)果:
A come in
A updated number value:60
并且程序沒有執(zhí)行結(jié)束,說明在main線程中由于不能保證可見性,一直在死循環(huán)。
當執(zhí)行case2的時候:
A come in
A updated number value:60
main over, get number:60
保證了可見性,因此main成功結(jié)束。
實例2: volatile 不保證原子性
class MyData { volatile int number = 0; public void change() { number = 60; } public void addOne() { number++; } } public class VolatileDemo { public static void main(String[] args) { case2(); } //驗證原子性 public static void case2() { MyData myData = new MyData(); for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { myData.addOne(); } }, String.valueOf(i)).start(); } while(Thread.activeCount()>2){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+"\t number value:"+myData.number); } }
最終輸出結(jié)果可以發(fā)現(xiàn)并不是 20000,且多次輸出結(jié)果并不一致,因此說明 volatile 不能保證原子性。
如何保證原子性
- 加鎖:使用 synchronized 加鎖
- 使用 AtomicInteger
實例3:volatile 和 單例模式
DCL模式的單例模式
public class Singleton { private static Singleton instance=null; private Singleton(){ System.out.println(Thread.currentThread().getName()+" constructor"); } //DCL 雙端檢鎖機制 public static Singleton getInstance(){ if(instance==null){ synchronized (Singleton.class){ if(instance==null) instance=new Singleton(); } } return instance; } }
DCL 機制不能完全保證線程安全,因為有指令重排序的存在。
原因在于instance = new Singleton(); 可以分為三步:
1. memory=allocate();//分配內(nèi)存空間
2. instance(memory);//初始化對象
3. instance=memory;//設(shè)置instance指向分配的內(nèi)存地址,分配成功后,instance!=null
由于步驟2和步驟3不存在數(shù)據(jù)依賴關(guān)系,且無論重排序與否執(zhí)行結(jié)果在單線程中沒有改變,因此這兩個步驟的重排序是允許的。也就是說指令重排序只會保證單線程串行語義的一致性(as-if-serial),但是不會關(guān)心多線程間的語義一致性。
因此,重排序之后,先執(zhí)行3會導(dǎo)致instance!=null,但是對象還未被初始化。此時,別的線程在調(diào)用時,獲取了一個未初始化的對象。
因此,在聲明 instance 時,使用 volatile 進行修飾,禁止指令重排序。
private static volatile Singleton instance = null;
CAS
CAS 的全程是 CompareAndSwap,是一條 CPU 并發(fā)原語。它的功能是判斷內(nèi)存某個位置的值是否為預(yù)期值,如果是則更新為新的值,這個過程是原子的。
CAS 的作用是比較當前工作內(nèi)存中的值和主內(nèi)存中的值,如果相同則執(zhí)行操作,否則繼續(xù)比較直到主內(nèi)存和工作內(nèi)存中的值一致為止。主內(nèi)存值為V,工作內(nèi)存中的預(yù)期值為A,要修改的更新值為B,當且僅當A和V相同,將V修改為B,否則什么都不做。
CAS 底層原理:
在原子類中,CAS 操作都是通過 Unsafe 類來完成的。
//AtomicInteger i++ public final int getAndIncrement(){ return unsafe.getAndAddInt(this,valueoffset,1); }
其中 this 是當前對象, valueoffset 是一個 long ,代表地址的偏移量。
//AtomicInteger.java private static final Unsafe unsfae=Unsafe.getUnsafe();//unsafe對象 private static final long valueOffset;//地址偏移量 static{ try{ valueoffset=unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"); }catch(Excepthion ex){throw new Error(ex);} } private volatile int value;//存儲的數(shù)值
- Unsafe
Unsafe 類是 rt.jar 下的 sun.misc 包下的一個類,基于該類可以直接操作特定內(nèi)存的數(shù)據(jù)。
Java方法無法直接訪問底層系統(tǒng),需要使用 native 方法訪問,Unsafe 類的內(nèi)部方法都是 native 方法,其中的方法可以像C的指針一樣直接操作內(nèi)存,Java 中的 CAS 操作的執(zhí)行都依賴于 Unsafe 類的方法。
- valueOffset
該變量表示變量值在內(nèi)存中的偏移地址, Unsafe 就是根據(jù)內(nèi)存偏移地址獲取數(shù)據(jù)的。
Unsafe類
CAS 并發(fā)源于體現(xiàn)在 Java 中就是 Unsafe 類的各個方法。調(diào)用該類中的 CAS 方法,JVM會幫我們實現(xiàn)出 CAS 匯編指令,這是一種完全依賴于硬件的功能。
原語是由若干條指令組成的,用于完成某個功能的過程。原語的執(zhí)行必須是連續(xù)的,執(zhí)行過程不允許被中斷。所以 CAS 是一條 CPU 的原子指令,不會造成數(shù)據(jù)不一致問題。
下邊是 AtomicInteger 中實現(xiàn) i++ 功能所調(diào)用的 Unsafe 類的函數(shù)。
//unsafe.getAndAddInt public final int getAndAddInt(Object var1,long var2,int var4){ int var5; do{ //獲取當前的值的地址 var5=this.getIntVolatile(var1,var2); //var1代表對象,var2和var5分別代表當前對象的真實值和期望值,如果二者相等,更新為var5+var4 }while(!this.compareAndSwapInt(var1,var2,var5,var5+var4); return var5; }
在 getAndAddInt 函數(shù)中,var1 代表了 AtomicInteger 對象, var2 代表了該對象在內(nèi)存中的地址, var4 代表了期望增加的數(shù)值。
首先通過 var1 和 var2 獲取到當前的主內(nèi)存中真實的 int 值,也就是 var5。
然后通過循環(huán)來進行數(shù)據(jù)更改,當比較到真實值和對象的當前值相等,則更新,退出循環(huán);否則再次獲取當前的真實值,繼續(xù)嘗試,直到成功。
在 CAS 中通過自旋而不是加鎖來保證一致性,同時和加鎖相比,提高了并發(fā)性。
具體情境來說:線程A和線程B并發(fā)執(zhí)行 AtomicInteger 的自增操作:
- AtomicInteger 中的 value 原始值為 3。主內(nèi)存中 value 為 3, 線程A和線程B的工作內(nèi)存中有 value 為 3 的副本;
- 線程 A 通過 getIntVolatile() 獲取到 value 的值為3,并被掛起。
- 線程 B 也獲取到 value 的值為3,然后執(zhí)行 compareAndSwapInt 方法,比較到內(nèi)存真實值也是 3,因此成功修改內(nèi)存值為4.
- 此時線程 A 繼續(xù)執(zhí)行比較,發(fā)現(xiàn)對象中的 value 3 和主內(nèi)存中的 value 4 不一致,說明已經(jīng)被修改,A 重新進入循環(huán)。
- 線程 A 重新獲取 value,由于 value 被 volatile 修飾,所以線程 A 此時 value 為4,和主內(nèi)存中 value 相等,修改成功。
CAS的缺點
- 如果CAS失敗,會一直嘗試。如果CAS長時間不成功,會給CPU帶來很大的開銷。
- CAS 只能用來保證單個共享變量的原子操作,對于多個共享變量操作,CAS無法保證,需要使用鎖。
- 存在 ABA 問題。
ABA問題
CAS 實現(xiàn)一個重要前提需要取出內(nèi)存中某個時刻的數(shù)據(jù)并在當下時刻比較并替換,這個時間差會導(dǎo)致數(shù)據(jù)的變化。
線程1從內(nèi)存位置V中取出A,線程2也從V中取出A,然后線程2通過一些操作將A變成B,然后又把V位置的數(shù)據(jù)變成A,此時線程1進行CAS操作發(fā)現(xiàn)V中仍然是A,操作成功。盡管線程1的CAS操作成功,但是不代表這個過程沒有問題。
這個問題類似于幻讀問題,通過新增版本號的機制來解決。在這里可以使用 AtomicStampedReference 來解決。
AtomicStampedReference
通過 AtomicStampedReference 來解決這個問題。
public class SolveABADemo { static AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(100,1); new Thread(()->{ int stamp=atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t 版本號:"+stamp); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName()+"\t 版本號:"+atomicStampedReference.getStamp()); atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName()+"\t 版本號:"+atomicStampedReference.getStamp()); },"t1").start(); new Thread(()->{ int stamp=atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t 版本號:"+stamp); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } boolean ret=atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1); System.out.println(Thread.currentThread().getName()+"\t"+ret +" stamp:"+atomicStampedReference.getStamp() +" value:"+atomicStampedReference.getReference()); },"t2").start(); } }
t1 版本號:1
t2 版本號:1
t1 版本號:2
t1 版本號:3
t2 false stamp:3 value:100
集合類的線程安全問題
ConcurrentModificationException
這個異常也就是并發(fā)修改異常,java.util.ConcurrentModificationException。
導(dǎo)致這個異常的原因,是集合類本身是線程不安全的。
解決方案:
- 使用 Vector, Hashtable 等同步容器
- 使用 Collections.synchronizedxxx(new XX) 創(chuàng)建線程安全的容器
- 使用 CopyOnWriteList, CopyOnWriteArraySet, ConcurrentHashMap 等 j.u.c 包下的并發(fā)容器。
CopyOnWriteArrayList
底層使用了private transient volatile Object[] array;
CopyOnWriteArrayList 采用了寫時復(fù)制、讀寫分離的思想。
public boolean add(E e){ final ReentrantLock lock=this.lock; try{ //舊數(shù)組 Object[] elements = getArray(); int len = elements.length; //復(fù)制新數(shù)組 Object[] newElements = Arrays.copyOf(elements, len+1); //修改新數(shù)組 newElements[len] = e; //更改舊數(shù)組引用指向新數(shù)組 setArray(newElements); return true; }finally{ lock.unlock(); } }
添加元素時,不是直接添加到當前容器數(shù)組,而是復(fù)制到新的容器數(shù)組,向新的數(shù)組中添加元素,添加完之后將原容器引用指向新的容器。
這樣做的好處是可以對該容器進行并發(fā)的讀,而不需要加鎖,因為讀時容器不會添加任何元素。
CopyOnWriteArraySet 本身就是使用 CopyOnWriteArrayList 來實現(xiàn)的。
Java鎖
公平鎖和非公平鎖
ReentrantLock 可以指定構(gòu)造函數(shù)的 boolean 類型得到公平或非公平鎖,默認是非公平鎖,synchronized也是非公平鎖。
公平鎖是多個線程按照申請鎖的順序獲取鎖,是 FIFO 的。并發(fā)環(huán)境中,每個線程在獲取鎖時先查看鎖維護的等待隊列,為空則戰(zhàn)友,否則加入隊列。
非公平鎖是指多個線程不是按照申請鎖的順序,有可能后申請的線程比先申請的線程優(yōu)先獲取鎖。高并發(fā)情況下可能導(dǎo)致優(yōu)先級反轉(zhuǎn)或者饑餓現(xiàn)象。并發(fā)環(huán)境中,上來嘗試占有鎖,嘗試失敗,再加入等待隊列。
可重入鎖(遞歸鎖)
可沖入鎖指的是同一線程外層函數(shù)獲取鎖之后,內(nèi)層遞歸函數(shù)自動獲取鎖。也就是線程能進入任何一個它已經(jīng)擁有的鎖所同步著的代碼塊。
ReentrantLock 和 synchronized 都是可重入鎖。
可重入鎖最大的作用用來避免死鎖。
自旋鎖
自旋鎖是指嘗試獲取鎖的線程不會立即阻塞,而是采用循環(huán)的方式嘗試獲取鎖。好處是減少線程上下文切換的消耗,缺點是循環(huán)時會消耗CPU資源。
實現(xiàn)自旋鎖:
public class SpinLockDemo { //使用AtomicReference<Thread>來更新當前占用的 Thread AtomicReference<Thread> threadAtomicReference=new AtomicReference<>(); public static void main(String[] args) { SpinLockDemo demo=new SpinLockDemo(); new Thread(()->{ demo.myLock(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } demo.myUnlock(); },"t1").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ demo.myLock(); demo.myUnlock(); },"t2").start(); } public void myLock(){ Thread thread=Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"\t come in"); //如果當前占用的線程為null,則嘗試獲取更新 while(!threadAtomicReference.compareAndSet(null,thread)){ } } public void myUnlock(){ Thread thread=Thread.currentThread(); //釋放鎖,將占用的線程設(shè)置為null threadAtomicReference.compareAndSet(thread,null); System.out.println(Thread.currentThread().getName()+"\t unlocked"); } }
讀寫鎖
獨占鎖:該鎖一次只能被一個線程持有,如 ReentrantLock 和 synchronized。
共享鎖:該鎖可以被多個線程持有。
ReentrantReadWriteLock 中,讀鎖是共享鎖,寫鎖時獨占鎖。讀讀共享保證并發(fā)性,讀寫互斥。
并發(fā)工具類
CountDownLatch
CountDownLatch 的作用是讓一些線程阻塞直到另外一些線程完成一系列操作后才被喚醒。
CountDownLatch 在初始時設(shè)置一個數(shù)值,當一個或者多個線程使用 await() 方法時,這些線程會被阻塞。其余線程調(diào)用 countDown() 方法,將計數(shù)器減去1,當計數(shù)器為0時,調(diào)用 await() 方法被阻塞的線程會被喚醒,繼續(xù)執(zhí)行。
可以理解為,等大家都走了,保安鎖門。
CyclicBarrier
CyclicBarrier 是指可以循環(huán)使用的屏障,讓一組線程到達一個屏障時被阻塞,直到最后一個線程到達屏障,屏障才會開門,被屏障攔截的線程才會繼續(xù)工作,線程進入屏障通過 await() 方法。
可以理解為,大家都到齊了,才能開會。
Semaphore
信號量用于:
- 多個共享資源的互斥使用
- 并發(fā)線程數(shù)的控制
可以理解為,多個車搶停車場的多個車位。當進入車位時,調(diào)用 acquire() 方法占用資源。當離開時,調(diào)用 release() 方法釋放資源。
阻塞隊列
阻塞隊列首先是一個隊列,所起的作用如下:
- 當阻塞隊列為空,從隊列中獲取元素的操作將會被阻塞
- 當阻塞隊列為滿,向隊列中添加元素的操作將會被阻塞
試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他線程向空的隊列中插入新的元素。同樣的,試圖向已滿的阻塞隊列中添加新元素的線程同樣會被阻塞,直到其他線程從隊列中移除元素使得隊列重新變得空閑起來并后序新增。
阻塞:阻塞是指在某些情況下會掛起線程,即阻塞,一旦條件滿足,被掛起的線程又會自動被喚醒。
優(yōu)點:BlockingQueue 能幫助我們進行線程的阻塞和喚醒,而無需關(guān)心何時需要阻塞線程,何時需要喚醒線程。同時兼顧了效率和線程安全。
阻塞隊列的架構(gòu)
BlokcingQueue 接口實現(xiàn)了 Queue 接口,該接口有如下的實現(xiàn)類:
- ArrayBlockingQueue: 由數(shù)組組成的有界阻塞隊列
- LinkedBlockingQueue: 由鏈表組成的有界阻塞隊列(默認大小為 Integer.MAX_VALUE)
- PriorityBlockingQueue:支持優(yōu)先級排序的無界阻塞隊列
- DelayQueue:使用優(yōu)先級隊列實現(xiàn)的延遲無界阻塞隊列
- SynchronousQueue: 不存儲元素的阻塞隊列,單個元素的隊列,同步提交隊列
- LinkedTransferQueue:鏈表組成的無界阻塞隊列
- LinkedBlockingDeque:鏈表組成的雙向阻塞隊列
阻塞隊列的方法
方法類型 | 拋出異常 | 特殊值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek() | 無 | 無 |
- 拋出異常:當隊列滿,add(e)會拋出異常IllegalStateException: Queue full;當隊列空,remove()和element()會拋出異常NoSuchElementException
- 特殊值:offer(e)會返回 true/false。peek()會返回隊列元素或者null。
- 阻塞:隊列滿,put(e)會阻塞直到成功或中斷;隊列空take()會阻塞直到成功。
- 超時:阻塞直到超時后退出,返回值和特殊值中的情況一樣。
生產(chǎn)者消費者模式
方式1. 使用Lock
class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception { lock.lock(); try { //判斷 while (number != 0) { condition.await(); } //干活 number++; System.out.println(Thread.currentThread().getName() + " produce\t" + number); //通知喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement()throws Exception{ lock.lock(); try { //判斷 while (number == 0) { condition.await(); } //干活 number--; System.out.println(Thread.currentThread().getName() + " consume\t" + number); //通知喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } /** * 一個初始值為0的變量,兩個線程交替操作,一個加1一個減1,重復(fù)5次 * 1. 線程 操作 資源類 * 2. 判斷 干活 通知 * 3. 防止虛假喚醒機制:判斷的時候要用while而不是用if */ public class ProduceConsumeTraditionalDemo { public static void main(String[] args) { ShareData data=new ShareData(); new Thread(()->{ for (int i = 0; i < 5 ; i++) { try { data.increment(); } catch (Exception e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 5 ; i++) { try { data.decrement(); } catch (Exception e) { e.printStackTrace(); } } },"B").start(); } }
打印結(jié)果
A produce 1
B consume 0
A produce 1
B consume 0
A produce 1
B consume 0
A produce 1
B consume 0
A produce 1
B consume 0
方法2:使用阻塞隊列
public class ProduceConsumeBlockingQueueDemo { public static void main(String[] args) { SharedData data=new SharedData(new ArrayBlockingQueue<>(10)); new Thread(()-> { System.out.println(Thread.currentThread().getName() + "\t生產(chǎn)線程啟動"); try { data.produce(); } catch (InterruptedException e) { e.printStackTrace(); } },"Producer").start(); new Thread(()-> { System.out.println(Thread.currentThread().getName() + "\t消費線程啟動"); try { data.consume(); } catch (InterruptedException e) { e.printStackTrace(); } },"Consumer").start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } data.stop(); System.out.println("停止"); } } class SharedData{ private volatile boolean FLAG=true; private AtomicInteger atomicInteger=new AtomicInteger(); BlockingQueue<String> blockingQueue=null; public SharedData(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void produce() throws InterruptedException { String data=null; boolean ret; while(FLAG){ data=""+atomicInteger.incrementAndGet(); ret=blockingQueue.offer(data,2L,TimeUnit.SECONDS); if(ret){ System.out.println(Thread.currentThread().getName()+"\t插入"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"\t插入"+data+"失敗"); } TimeUnit.SECONDS.sleep(1); } System.out.println("生產(chǎn)結(jié)束,F(xiàn)LAG=false"); } public void consume() throws InterruptedException { String ret=null; while(FLAG){ ret=blockingQueue.poll(2L,TimeUnit.SECONDS); if(null==ret||ret.equalsIgnoreCase("")){ System.out.println(FLAG=false); System.out.println(Thread.currentThread().getName()+"\t消費等待超時退出"); return; } System.out.println(Thread.currentThread().getName() + "\t消費" + ret + "成功"); } } public void stop(){ FLAG=false; } }
使用阻塞隊列+原子類+volatile變量的方式。
打印結(jié)果如下:
java.util.concurrent.ArrayBlockingQueue
Producer 生產(chǎn)線程啟動
Consumer 消費線程啟動
Producer 插入1成功
Consumer 消費1成功
Producer 插入2成功
Consumer 消費2成功
Producer 插入3成功
Consumer 消費3成功
停止
生產(chǎn)結(jié)束,F(xiàn)LAG=false
false
Consumer 消費等待超時退出
Synchronized 和 Lock 的區(qū)別
- 原始構(gòu)成
- Synchronized 是關(guān)鍵字,屬于JVM層面,底層是通過 monitorenter 和 monitorexit 完成,依賴于 monitor 對象來完成。由于 wait/notify 方法也依賴于 monitor 對象,因此只有在同步塊或方法中才能調(diào)用這些方法。
- Lock 是 java.util.concurrent.locks.lock 包下的,是 api層面的鎖。
- 使用方法
- Synchronized 不需要用戶手動釋放鎖,代碼完成之后系統(tǒng)自動讓線程釋放鎖
- ReentrantLock 需要用戶手動釋放鎖,沒有手動釋放可能導(dǎo)致死鎖。
- 等待是否可以中斷
- Synchronized 不可中斷,除非拋出異?;蛘哒_\行完成
- ReentrantLock 可以中斷。一種是通過 tryLock(long timeout, TimeUnit unit),另一種是lockInterruptibly()放代碼塊中,調(diào)用interrupt()方法進行中斷。
- 加鎖是否公平
- synchronized 是非公平鎖
- ReentrantLock 默認非公平鎖,可以在構(gòu)造方法傳入 boolean 值,true 代表公平鎖,false 代表非公平鎖。
- 鎖綁定多個 Condition
- Synchronized 只有一個阻塞隊列,只能隨機喚醒一個線程或者喚醒全部線程。
- ReentrantLock 用來實現(xiàn)分組喚醒,可以精確喚醒。
案例:三個線程循環(huán)打印
class ShareData{ private int number=1; private Lock lock=new ReentrantLock(); public void printA(){ lock.lock(); Condition conditionA=lock.newCondition(); try{ while(number!=1){ conditionA.await(); } for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=2; conditionA.signal(); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public void printB(){ lock.lock(); Condition conditionB=lock.newCondition(); try{ while(number!=2){ conditionB.await(); } for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=3; conditionB.signal(); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public void printC(){ lock.lock(); Condition conditionC=lock.newCondition(); try{ //判斷 while(number!=3){ conditionC.await(); } //干活 for (int i = 0; i < 15; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=1; //通知 conditionC.signal(); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { ShareData data=new ShareData(); new Thread(() -> data.printA(),"A").start(); new Thread(() -> data.printB(),"B").start(); new Thread(() -> data.printC(),"C").start(); } }
線程池
創(chuàng)建線程
- 實現(xiàn) Runnable 接口
- 實現(xiàn) Callable 接口
- 繼承 Thread 類
- 使用線程池
Thread的構(gòu)造函數(shù)中并沒有傳入 Callable 的方式,但是可以傳入 Runnable 接口:
Thread thread=new Thread(Runnable runnable, String name);。為了使用 Callable 接口,我們需要使用到 FutureTask 類。 FutureTask 類實現(xiàn)了 RunnableFuture 這一接口,而 RunnableFutre 又是 Future 的子接口,因此 FutureTask 可以作為參數(shù)使用上述的 Thread 構(gòu)造函數(shù)。同時, FutureTask 本身構(gòu)造函數(shù)可以傳入 Callable 。
class MyThread implements Callable<Integer>{ @Override public Integer call() { System.out.println("come in callable"); return 2019; } } class Main{ public static void main(String [] args){ FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2()); Thread t1=new Thread(futureTask,"A"); } }
線程池架構(gòu)
除此之外,還有 Executors 工具類。
ThreadPoolExecutor
線程池有七大參數(shù):
public ThreadPoolExecutor( int corePoolSize,//線程池常駐核心線程數(shù) int maximumPoolSize,//線程池能容納同時執(zhí)行最大線程數(shù) long keepAliveTime,//多余的空閑線程的存活時間,當前線程池線程數(shù)量超過core,空閑時間達到keepAliveTime,多余空閑線程會被銷毀直到只剩下core個 TimeUnit unit, BlockingQueue<Runnable> workQueue,//被提交尚未被執(zhí)行的任務(wù)隊列 ThreadFactory threadFactory,//創(chuàng)建線程的線程工廠 RejectedExecutionHandler handler//拒絕策略 ) {...}
處理流程如下:
- 創(chuàng)建線程池,等待提交過來的任務(wù)請求。
- 添加請求任務(wù)
- 如果運行線程數(shù)小于 corePoolSize,創(chuàng)建線程運行該任務(wù)
- 如果運行線程數(shù)大于等于 corePoolSize,將任務(wù)放入隊列
- 隊列滿,且運行線程數(shù)量小于 maximumPoolSize,創(chuàng)建非核心線程運行任務(wù)
- 隊列滿,且運行線程數(shù)量大于等于 maximumPoolSize,線程池會啟動飽和拒絕策略執(zhí)行。
- 線程完成任務(wù),會從隊列中取下一個任務(wù)來執(zhí)行
- 一個線程無事可做超過 keepAliveTime 時:
- 如果當前運行線程數(shù)大于 corePoolSize,該線程被停掉
- 線程池的所有任務(wù)完成后最終會收縮到 corePoolSize 的大小。
拒絕策略
在 JDK 中有四種內(nèi)置的拒絕策略,均實現(xiàn)了 RejectedExecutionHandler 接口。
- AbortPolicy: 直接拋出 RejectedExecutionException 異常,是默認的拒絕策略。
- DiscardPolicy: 直接丟棄任務(wù),不予處理也不拋出異常。如果允許任務(wù)丟失,是最好的處理策略。
- DiscardOldestPolicy: 拋棄隊列中等待最久的任務(wù),然后把當前任務(wù)加入隊列嘗試再次提交。
- CallerRunsPolicy: 調(diào)用者運行。該策略既不會拋棄任務(wù),也不會拋出異常,而是將某些任務(wù)回退到調(diào)用者。
三種常用線程池
1、Executors.newFixedThreadPool(int)
創(chuàng)建固定容量的線程池,控制最大并發(fā)數(shù),超出的線程在隊列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
其中 corePoolSize 和 maximumPoolSize 值是相等的,并且使用的是 LinkedBlockingQueue。
適用于執(zhí)行長期的任務(wù),性能比較高。
2、Executors.newSingleThreadExecutor()
創(chuàng)建了一個單線程的線程池,只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照順序執(zhí)行。
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
其中 corePoolSize 和 maximumPoolSize 都設(shè)置為1,使用的也是 LinkedBlockingQueue。
適用于一個任務(wù)一個任務(wù)執(zhí)行的場景。
3、Executors.newCachedThreadPool()
創(chuàng)建了一個可緩存的線程池,如果線程池長度超過處理需要,可以靈活回收空閑線程,沒有可以回收的,則新建線程。
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
設(shè)置 corePoolSize 為0, maximumPoolSize 設(shè)置為 Integer.MAX_VALUE,使用的是 SynchronousQueue。來了任務(wù)就創(chuàng)建線程執(zhí)行,線程空閑超過60秒后銷毀。
適用于執(zhí)行很多短期異步的小程序或者負載比較輕的服務(wù)器。
工作中使用什么樣的線程池
在阿里巴巴Java開發(fā)手冊中有如下規(guī)定:
- 線程資源必須通過線程池提供,不允許在應(yīng)用中自行顯示創(chuàng)建線程。
- 說明:使用線程池的好處是減少在創(chuàng)建和銷毀線程上消耗的時間和系統(tǒng)資源的開銷,解決資源不足的問題。如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程導(dǎo)致消耗完內(nèi)存或者過度切換。
- 線程池不允許使用 Executors 去創(chuàng)建,也就是不能使用上述的三種線程池,而是要通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源韓進的風(fēng)險。
- FixedThreadPool 和 SingleThreadPool 都采用了 LinkedBlockingQueue,其允許的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,導(dǎo)致OOM。
- CachedThreadPool 和 ScheduledThreadPool 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE,可能創(chuàng)建大量的線程,導(dǎo)致OOM。
如何設(shè)置線程池的線程數(shù)目
Runtime.getRuntime().availableProcessors()獲取當前設(shè)備的CPU個數(shù)。
- CPU密集型任務(wù)
- CPU 密集的含義是任務(wù)需要大量的運算,而沒有阻塞,CPU一致全速運行
- CPU 密集任務(wù)只有在真正的多核 CPU 上才能得到加速(通過多線程),而在單核 CPU 上,無論開幾個模擬的多線程都不能得到加速
- CPU 密集型任務(wù)配置盡可能少的線程數(shù)量,一般設(shè)置為 CPU 核心數(shù) + 1
- IO 密集型
- IO 密集型,是指該任務(wù)需要大量的IO,大量的阻塞
- 單線程上運行 IO 密集型的任務(wù)會導(dǎo)致浪費大量的 CPU 運算能力浪費在等待上
- IO 密集型任務(wù)使用多線程可以大大加速程序運行,利用了被浪費掉的阻塞時間
- IO 密集型時,大部分線程都阻塞,需要多配置線程數(shù),可以采用CPU核心數(shù) * 2,或者采用 CPU 核心數(shù) / (1 - 阻塞系數(shù)),阻塞系數(shù)在0.8 ~ 0.9之間
死鎖
產(chǎn)生死鎖的原因
死鎖是指兩個或兩個以上的進程在執(zhí)行過程中,因為爭奪資源造成的互相等待的現(xiàn)象。
死鎖需要滿族的四大條件如下:
- 互斥
- 循環(huán)等待
- 不可搶占
- 占有并等待
產(chǎn)生死鎖的主要原因有:
- 系統(tǒng)資源不足
- 進程運行推進順序不當
- 資源分配不當
死鎖實例
class HoldLockThread implements Runnable{ private String lock1; private String lock2; public HoldLockThread(String lock1, String lock2) { this.lock1 = lock1; this.lock2 = lock2; } @Override public void run() { synchronized (lock1){ System.out.println(Thread.currentThread().getName()+"\t持有"+lock1+"\t嘗試獲取"+lock2); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2){ System.out.println(Thread.currentThread().getName()+"\t持有"+lock1+"\t嘗試獲取"+lock2); } } } } public class DeadLockDemo { public static void main(String[] args) { String lockA="lockA"; String lockB="lockB"; new Thread(new HoldLockThread(lockA,lockB),"Thread1").start(); new Thread(new HoldLockThread(lockB,lockA),"Thread2").start(); } }
輸出如下結(jié)果,程序并沒有終止。
Thread2 持有l(wèi)ockB 嘗試獲取lockA
Thread1 持有l(wèi)ockA 嘗試獲取lockB
死鎖定位分析
使用 jps ,類似于 linux 中的 ps 命令。
在上述 java 文件中,使用 IDEA 中的 open In Terminal,或者在該文件目錄下使用 cmd 命令行工具。
首先使用 jps -l命令,類似于ls -l命令,輸出當前運行的 java 線程,從中能得知 DeadLockDemo 線程的線程號。
然后,使用jstack threadId來查看棧信息。輸出如下:
Java stack information for the threads listed above:
===================================================
"Thread2":
at interview.jvm.deadlock.HoldLockThread.run(DeadLockDemo.java:22)
- waiting to lock <0x00000000d6240328> (a java.lang.String)
- locked <0x00000000d6240360> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
"Thread1":
at interview.jvm.deadlock.HoldLockThread.run(DeadLockDemo.java:22)
- waiting to lock <0x00000000d6240360> (a java.lang.String)
- locked <0x00000000d6240328> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。
相關(guān)文章
springboot druid數(shù)據(jù)庫連接池連接失敗后一直重連的解決方法
本文主要介紹了springboot druid數(shù)據(jù)庫連接池連接失敗后一直重連的解決方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04網(wǎng)關(guān)Spring Cloud Gateway HTTP超時配置問題
這篇文章主要介紹了網(wǎng)關(guān)Spring Cloud Gateway HTTP超時配置問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式
這篇文章主要介紹了SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式,分別就轉(zhuǎn)發(fā)和重定向做了概念解說,結(jié)合示例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-11-11玩轉(zhuǎn)spring boot MVC應(yīng)用(2)
玩轉(zhuǎn)spring boot,如何快速搭建一個MCV程序?這篇文章為大家詳細主要介紹了一個MCV程序的快速搭建過程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-01-01