Java常見面試題之多線程和高并發(fā)詳解
volatile
對(duì) volatile的理解
volatile 是一種輕量級(jí)的同步機(jī)制。
- 保證數(shù)據(jù)可見性
- 不保證原子性
- 禁止指令重排序
JMM
JMM(Java 內(nèi)存模型)是一種抽象的概念,描述了一組規(guī)則或規(guī)范,定義了程序中各個(gè)變量的訪問(wèn)方式。
JVM運(yùn)行程序的實(shí)體是線程,每個(gè)線程創(chuàng)建時(shí) JVM 都會(huì)為其創(chuàng)建一個(gè)工作內(nèi)存,是線程的私有數(shù)據(jù)區(qū)域。JMM中規(guī)定所有變量都存儲(chǔ)在主內(nèi)存,主內(nèi)存是共享內(nèi)存。線程對(duì)變量的操作在工作內(nèi)存中進(jìn)行,首先將變量從主內(nèi)存拷貝到工作內(nèi)存,操作完成后寫會(huì)主內(nèi)存。不同線程間無(wú)法訪問(wèn)對(duì)方的工作內(nèi)存,線程通信(傳值)通過(guò)主內(nèi)存來(lái)完成。
JMM 對(duì)于同步的規(guī)定:
- 線程解鎖前,必須把共享變量的值刷新回主內(nèi)存
- 線程加鎖前,必須讀取主內(nèi)存的最新值到自己的工作內(nèi)存
- 加鎖解鎖是同一把鎖
JMM 的三大特性
- 可見性
- 原子性
- 順序性
原子性是不可分割,某個(gè)線程正在做某個(gè)具體業(yè)務(wù)時(shí),中間不可以被分割,要么全部成功,要么全部失敗。
重排序:計(jì)算機(jī)在執(zhí)行程序時(shí),為了提高性能,編譯器和處理器常常對(duì)指令做重排序,源代碼經(jīng)過(guò)編譯器優(yōu)化重排序、指令并行重排序、內(nèi)存系統(tǒng)的重排序之后得到最終執(zhí)行的指令。
在單線程中保證程序最終執(zhí)行結(jié)果和代碼執(zhí)行順序執(zhí)行結(jié)果一致。
多線程中線程交替執(zhí)行,由于重排序,兩個(gè)線程中使用的變量能否保證一致性無(wú)法確定,結(jié)果無(wú)法確定。
處理器在處理重排序時(shí)需要考慮數(shù)據(jù)的依賴性。
volatile 實(shí)現(xiàn)禁止指令重排序,避免多線程環(huán)境下程序亂序執(zhí)行。是通過(guò)內(nèi)存屏障指令來(lái)執(zhí)行的,通過(guò)插入內(nèi)存屏障禁止在內(nèi)存屏障后的指令執(zhí)行重排序優(yōu)化,并強(qiáng)制刷出緩存數(shù)據(jù),保證線程能讀取到這些數(shù)據(jù)的最新版本。
實(shí)例1:volatile 保證可見性
class MyData { //volatile int number = 0;//case2 //int number=0; //case1 public void change() { number = 60; } } public class VolatileDemo { public static void main(String[] args) { MyData data=new MyData(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t come in"); try{ TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();} data.change(); System.out.println(Thread.currentThread().getName()+"\t updated number value:"+data.number); },"A").start(); while(data.number==0){} System.out.println(Thread.currentThread().getName()+"\t over, get number:"+data.number); } }
當(dāng)我們使用case1的時(shí)候,也就是number沒(méi)有volatile修飾的時(shí)候,運(yùn)行結(jié)果:
A come in
A updated number value:60
并且程序沒(méi)有執(zhí)行結(jié)束,說(shuō)明在main線程中由于不能保證可見性,一直在死循環(huán)。
當(dāng)執(zhí)行case2的時(shí)候:
A come in
A updated number value:60
main over, get number:60
保證了可見性,因此main成功結(jié)束。
實(shí)例2: volatile 不保證原子性
class MyData { volatile int number = 0; public void change() { number = 60; } public void addOne() { number++; } } public class VolatileDemo { public static void main(String[] args) { case2(); } //驗(yàn)證原子性 public static void case2() { MyData myData = new MyData(); for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { myData.addOne(); } }, String.valueOf(i)).start(); } while(Thread.activeCount()>2){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+"\t number value:"+myData.number); } }
最終輸出結(jié)果可以發(fā)現(xiàn)并不是 20000,且多次輸出結(jié)果并不一致,因此說(shuō)明 volatile 不能保證原子性。
如何保證原子性
- 加鎖:使用 synchronized 加鎖
- 使用 AtomicInteger
實(shí)例3:volatile 和 單例模式
DCL模式的單例模式
public class Singleton { private static Singleton instance=null; private Singleton(){ System.out.println(Thread.currentThread().getName()+" constructor"); } //DCL 雙端檢鎖機(jī)制 public static Singleton getInstance(){ if(instance==null){ synchronized (Singleton.class){ if(instance==null) instance=new Singleton(); } } return instance; } }
DCL 機(jī)制不能完全保證線程安全,因?yàn)橛兄噶钪嘏判虻拇嬖凇?br />
原因在于instance = new Singleton(); 可以分為三步:
1. memory=allocate();//分配內(nèi)存空間
2. instance(memory);//初始化對(duì)象
3. instance=memory;//設(shè)置instance指向分配的內(nèi)存地址,分配成功后,instance!=null
由于步驟2和步驟3不存在數(shù)據(jù)依賴關(guān)系,且無(wú)論重排序與否執(zhí)行結(jié)果在單線程中沒(méi)有改變,因此這兩個(gè)步驟的重排序是允許的。也就是說(shuō)指令重排序只會(huì)保證單線程串行語(yǔ)義的一致性(as-if-serial),但是不會(huì)關(guān)心多線程間的語(yǔ)義一致性。
因此,重排序之后,先執(zhí)行3會(huì)導(dǎo)致instance!=null,但是對(duì)象還未被初始化。此時(shí),別的線程在調(diào)用時(shí),獲取了一個(gè)未初始化的對(duì)象。
因此,在聲明 instance 時(shí),使用 volatile 進(jìn)行修飾,禁止指令重排序。
private static volatile Singleton instance = null;
CAS
CAS 的全程是 CompareAndSwap,是一條 CPU 并發(fā)原語(yǔ)。它的功能是判斷內(nèi)存某個(gè)位置的值是否為預(yù)期值,如果是則更新為新的值,這個(gè)過(guò)程是原子的。
CAS 的作用是比較當(dāng)前工作內(nèi)存中的值和主內(nèi)存中的值,如果相同則執(zhí)行操作,否則繼續(xù)比較直到主內(nèi)存和工作內(nèi)存中的值一致為止。主內(nèi)存值為V,工作內(nèi)存中的預(yù)期值為A,要修改的更新值為B,當(dāng)且僅當(dāng)A和V相同,將V修改為B,否則什么都不做。
CAS 底層原理:
在原子類中,CAS 操作都是通過(guò) Unsafe 類來(lái)完成的。
//AtomicInteger i++ public final int getAndIncrement(){ return unsafe.getAndAddInt(this,valueoffset,1); }
其中 this 是當(dāng)前對(duì)象, valueoffset 是一個(gè) long ,代表地址的偏移量。
//AtomicInteger.java private static final Unsafe unsfae=Unsafe.getUnsafe();//unsafe對(duì)象 private static final long valueOffset;//地址偏移量 static{ try{ valueoffset=unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"); }catch(Excepthion ex){throw new Error(ex);} } private volatile int value;//存儲(chǔ)的數(shù)值
- Unsafe
Unsafe 類是 rt.jar 下的 sun.misc 包下的一個(gè)類,基于該類可以直接操作特定內(nèi)存的數(shù)據(jù)。
Java方法無(wú)法直接訪問(wèn)底層系統(tǒng),需要使用 native 方法訪問(wèn),Unsafe 類的內(nèi)部方法都是 native 方法,其中的方法可以像C的指針一樣直接操作內(nèi)存,Java 中的 CAS 操作的執(zhí)行都依賴于 Unsafe 類的方法。
- valueOffset
該變量表示變量值在內(nèi)存中的偏移地址, Unsafe 就是根據(jù)內(nèi)存偏移地址獲取數(shù)據(jù)的。
Unsafe類
CAS 并發(fā)源于體現(xiàn)在 Java 中就是 Unsafe 類的各個(gè)方法。調(diào)用該類中的 CAS 方法,JVM會(huì)幫我們實(shí)現(xiàn)出 CAS 匯編指令,這是一種完全依賴于硬件的功能。
原語(yǔ)是由若干條指令組成的,用于完成某個(gè)功能的過(guò)程。原語(yǔ)的執(zhí)行必須是連續(xù)的,執(zhí)行過(guò)程不允許被中斷。所以 CAS 是一條 CPU 的原子指令,不會(huì)造成數(shù)據(jù)不一致問(wèn)題。
下邊是 AtomicInteger 中實(shí)現(xiàn) i++ 功能所調(diào)用的 Unsafe 類的函數(shù)。
//unsafe.getAndAddInt public final int getAndAddInt(Object var1,long var2,int var4){ int var5; do{ //獲取當(dāng)前的值的地址 var5=this.getIntVolatile(var1,var2); //var1代表對(duì)象,var2和var5分別代表當(dāng)前對(duì)象的真實(shí)值和期望值,如果二者相等,更新為var5+var4 }while(!this.compareAndSwapInt(var1,var2,var5,var5+var4); return var5; }
在 getAndAddInt 函數(shù)中,var1 代表了 AtomicInteger 對(duì)象, var2 代表了該對(duì)象在內(nèi)存中的地址, var4 代表了期望增加的數(shù)值。
首先通過(guò) var1 和 var2 獲取到當(dāng)前的主內(nèi)存中真實(shí)的 int 值,也就是 var5。
然后通過(guò)循環(huán)來(lái)進(jìn)行數(shù)據(jù)更改,當(dāng)比較到真實(shí)值和對(duì)象的當(dāng)前值相等,則更新,退出循環(huán);否則再次獲取當(dāng)前的真實(shí)值,繼續(xù)嘗試,直到成功。
在 CAS 中通過(guò)自旋而不是加鎖來(lái)保證一致性,同時(shí)和加鎖相比,提高了并發(fā)性。
具體情境來(lái)說(shuō):線程A和線程B并發(fā)執(zhí)行 AtomicInteger 的自增操作:
- AtomicInteger 中的 value 原始值為 3。主內(nèi)存中 value 為 3, 線程A和線程B的工作內(nèi)存中有 value 為 3 的副本;
- 線程 A 通過(guò) getIntVolatile() 獲取到 value 的值為3,并被掛起。
- 線程 B 也獲取到 value 的值為3,然后執(zhí)行 compareAndSwapInt 方法,比較到內(nèi)存真實(shí)值也是 3,因此成功修改內(nèi)存值為4.
- 此時(shí)線程 A 繼續(xù)執(zhí)行比較,發(fā)現(xiàn)對(duì)象中的 value 3 和主內(nèi)存中的 value 4 不一致,說(shuō)明已經(jīng)被修改,A 重新進(jìn)入循環(huán)。
- 線程 A 重新獲取 value,由于 value 被 volatile 修飾,所以線程 A 此時(shí) value 為4,和主內(nèi)存中 value 相等,修改成功。
CAS的缺點(diǎn)
- 如果CAS失敗,會(huì)一直嘗試。如果CAS長(zhǎng)時(shí)間不成功,會(huì)給CPU帶來(lái)很大的開銷。
- CAS 只能用來(lái)保證單個(gè)共享變量的原子操作,對(duì)于多個(gè)共享變量操作,CAS無(wú)法保證,需要使用鎖。
- 存在 ABA 問(wèn)題。
ABA問(wèn)題
CAS 實(shí)現(xiàn)一個(gè)重要前提需要取出內(nèi)存中某個(gè)時(shí)刻的數(shù)據(jù)并在當(dāng)下時(shí)刻比較并替換,這個(gè)時(shí)間差會(huì)導(dǎo)致數(shù)據(jù)的變化。
線程1從內(nèi)存位置V中取出A,線程2也從V中取出A,然后線程2通過(guò)一些操作將A變成B,然后又把V位置的數(shù)據(jù)變成A,此時(shí)線程1進(jìn)行CAS操作發(fā)現(xiàn)V中仍然是A,操作成功。盡管線程1的CAS操作成功,但是不代表這個(gè)過(guò)程沒(méi)有問(wèn)題。
這個(gè)問(wèn)題類似于幻讀問(wèn)題,通過(guò)新增版本號(hào)的機(jī)制來(lái)解決。在這里可以使用 AtomicStampedReference 來(lái)解決。
AtomicStampedReference
通過(guò) AtomicStampedReference 來(lái)解決這個(gè)問(wèn)題。
public class SolveABADemo { static AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(100,1); new Thread(()->{ int stamp=atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t 版本號(hào):"+stamp); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName()+"\t 版本號(hào):"+atomicStampedReference.getStamp()); atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1); System.out.println(Thread.currentThread().getName()+"\t 版本號(hào):"+atomicStampedReference.getStamp()); },"t1").start(); new Thread(()->{ int stamp=atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t 版本號(hào):"+stamp); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } boolean ret=atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1); System.out.println(Thread.currentThread().getName()+"\t"+ret +" stamp:"+atomicStampedReference.getStamp() +" value:"+atomicStampedReference.getReference()); },"t2").start(); } }
t1 版本號(hào):1
t2 版本號(hào):1
t1 版本號(hào):2
t1 版本號(hào):3
t2 false stamp:3 value:100
集合類的線程安全問(wèn)題
ConcurrentModificationException
這個(gè)異常也就是并發(fā)修改異常,java.util.ConcurrentModificationException。
導(dǎo)致這個(gè)異常的原因,是集合類本身是線程不安全的。
解決方案:
- 使用 Vector, Hashtable 等同步容器
- 使用 Collections.synchronizedxxx(new XX) 創(chuàng)建線程安全的容器
- 使用 CopyOnWriteList, CopyOnWriteArraySet, ConcurrentHashMap 等 j.u.c 包下的并發(fā)容器。
CopyOnWriteArrayList
底層使用了private transient volatile Object[] array;
CopyOnWriteArrayList 采用了寫時(shí)復(fù)制、讀寫分離的思想。
public boolean add(E e){ final ReentrantLock lock=this.lock; try{ //舊數(shù)組 Object[] elements = getArray(); int len = elements.length; //復(fù)制新數(shù)組 Object[] newElements = Arrays.copyOf(elements, len+1); //修改新數(shù)組 newElements[len] = e; //更改舊數(shù)組引用指向新數(shù)組 setArray(newElements); return true; }finally{ lock.unlock(); } }
添加元素時(shí),不是直接添加到當(dāng)前容器數(shù)組,而是復(fù)制到新的容器數(shù)組,向新的數(shù)組中添加元素,添加完之后將原容器引用指向新的容器。
這樣做的好處是可以對(duì)該容器進(jìn)行并發(fā)的讀,而不需要加鎖,因?yàn)樽x時(shí)容器不會(huì)添加任何元素。
CopyOnWriteArraySet 本身就是使用 CopyOnWriteArrayList 來(lái)實(shí)現(xiàn)的。
Java鎖
公平鎖和非公平鎖
ReentrantLock 可以指定構(gòu)造函數(shù)的 boolean 類型得到公平或非公平鎖,默認(rèn)是非公平鎖,synchronized也是非公平鎖。
公平鎖是多個(gè)線程按照申請(qǐng)鎖的順序獲取鎖,是 FIFO 的。并發(fā)環(huán)境中,每個(gè)線程在獲取鎖時(shí)先查看鎖維護(hù)的等待隊(duì)列,為空則戰(zhàn)友,否則加入隊(duì)列。
非公平鎖是指多個(gè)線程不是按照申請(qǐng)鎖的順序,有可能后申請(qǐng)的線程比先申請(qǐng)的線程優(yōu)先獲取鎖。高并發(fā)情況下可能導(dǎo)致優(yōu)先級(jí)反轉(zhuǎn)或者饑餓現(xiàn)象。并發(fā)環(huán)境中,上來(lái)嘗試占有鎖,嘗試失敗,再加入等待隊(duì)列。
可重入鎖(遞歸鎖)
可沖入鎖指的是同一線程外層函數(shù)獲取鎖之后,內(nèi)層遞歸函數(shù)自動(dòng)獲取鎖。也就是線程能進(jìn)入任何一個(gè)它已經(jīng)擁有的鎖所同步著的代碼塊。
ReentrantLock 和 synchronized 都是可重入鎖。
可重入鎖最大的作用用來(lái)避免死鎖。
自旋鎖
自旋鎖是指嘗試獲取鎖的線程不會(huì)立即阻塞,而是采用循環(huán)的方式嘗試獲取鎖。好處是減少線程上下文切換的消耗,缺點(diǎn)是循環(huán)時(shí)會(huì)消耗CPU資源。
實(shí)現(xiàn)自旋鎖:
public class SpinLockDemo { //使用AtomicReference<Thread>來(lái)更新當(dāng)前占用的 Thread AtomicReference<Thread> threadAtomicReference=new AtomicReference<>(); public static void main(String[] args) { SpinLockDemo demo=new SpinLockDemo(); new Thread(()->{ demo.myLock(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } demo.myUnlock(); },"t1").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ demo.myLock(); demo.myUnlock(); },"t2").start(); } public void myLock(){ Thread thread=Thread.currentThread(); System.out.println(Thread.currentThread().getName()+"\t come in"); //如果當(dāng)前占用的線程為null,則嘗試獲取更新 while(!threadAtomicReference.compareAndSet(null,thread)){ } } public void myUnlock(){ Thread thread=Thread.currentThread(); //釋放鎖,將占用的線程設(shè)置為null threadAtomicReference.compareAndSet(thread,null); System.out.println(Thread.currentThread().getName()+"\t unlocked"); } }
讀寫鎖
獨(dú)占鎖:該鎖一次只能被一個(gè)線程持有,如 ReentrantLock 和 synchronized。
共享鎖:該鎖可以被多個(gè)線程持有。
ReentrantReadWriteLock 中,讀鎖是共享鎖,寫鎖時(shí)獨(dú)占鎖。讀讀共享保證并發(fā)性,讀寫互斥。
并發(fā)工具類
CountDownLatch
CountDownLatch 的作用是讓一些線程阻塞直到另外一些線程完成一系列操作后才被喚醒。
CountDownLatch 在初始時(shí)設(shè)置一個(gè)數(shù)值,當(dāng)一個(gè)或者多個(gè)線程使用 await() 方法時(shí),這些線程會(huì)被阻塞。其余線程調(diào)用 countDown() 方法,將計(jì)數(shù)器減去1,當(dāng)計(jì)數(shù)器為0時(shí),調(diào)用 await() 方法被阻塞的線程會(huì)被喚醒,繼續(xù)執(zhí)行。
可以理解為,等大家都走了,保安鎖門。
CyclicBarrier
CyclicBarrier 是指可以循環(huán)使用的屏障,讓一組線程到達(dá)一個(gè)屏障時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障,屏障才會(huì)開門,被屏障攔截的線程才會(huì)繼續(xù)工作,線程進(jìn)入屏障通過(guò) await() 方法。
可以理解為,大家都到齊了,才能開會(huì)。
Semaphore
信號(hào)量用于:
- 多個(gè)共享資源的互斥使用
- 并發(fā)線程數(shù)的控制
可以理解為,多個(gè)車搶停車場(chǎng)的多個(gè)車位。當(dāng)進(jìn)入車位時(shí),調(diào)用 acquire() 方法占用資源。當(dāng)離開時(shí),調(diào)用 release() 方法釋放資源。
阻塞隊(duì)列
阻塞隊(duì)列首先是一個(gè)隊(duì)列,所起的作用如下:
- 當(dāng)阻塞隊(duì)列為空,從隊(duì)列中獲取元素的操作將會(huì)被阻塞
- 當(dāng)阻塞隊(duì)列為滿,向隊(duì)列中添加元素的操作將會(huì)被阻塞
試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞,直到其他線程向空的隊(duì)列中插入新的元素。同樣的,試圖向已滿的阻塞隊(duì)列中添加新元素的線程同樣會(huì)被阻塞,直到其他線程從隊(duì)列中移除元素使得隊(duì)列重新變得空閑起來(lái)并后序新增。
阻塞:阻塞是指在某些情況下會(huì)掛起線程,即阻塞,一旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒。
優(yōu)點(diǎn):BlockingQueue 能幫助我們進(jìn)行線程的阻塞和喚醒,而無(wú)需關(guān)心何時(shí)需要阻塞線程,何時(shí)需要喚醒線程。同時(shí)兼顧了效率和線程安全。
阻塞隊(duì)列的架構(gòu)
BlokcingQueue 接口實(shí)現(xiàn)了 Queue 接口,該接口有如下的實(shí)現(xiàn)類:
- ArrayBlockingQueue: 由數(shù)組組成的有界阻塞隊(duì)列
- LinkedBlockingQueue: 由鏈表組成的有界阻塞隊(duì)列(默認(rèn)大小為 Integer.MAX_VALUE)
- PriorityBlockingQueue:支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列
- DelayQueue:使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的延遲無(wú)界阻塞隊(duì)列
- SynchronousQueue: 不存儲(chǔ)元素的阻塞隊(duì)列,單個(gè)元素的隊(duì)列,同步提交隊(duì)列
- LinkedTransferQueue:鏈表組成的無(wú)界阻塞隊(duì)列
- LinkedBlockingDeque:鏈表組成的雙向阻塞隊(duì)列
阻塞隊(duì)列的方法
方法類型 | 拋出異常 | 特殊值 | 阻塞 | 超時(shí) |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek() | 無(wú) | 無(wú) |
- 拋出異常:當(dāng)隊(duì)列滿,add(e)會(huì)拋出異常IllegalStateException: Queue full;當(dāng)隊(duì)列空,remove()和element()會(huì)拋出異常NoSuchElementException
- 特殊值:offer(e)會(huì)返回 true/false。peek()會(huì)返回隊(duì)列元素或者null。
- 阻塞:隊(duì)列滿,put(e)會(huì)阻塞直到成功或中斷;隊(duì)列空take()會(huì)阻塞直到成功。
- 超時(shí):阻塞直到超時(shí)后退出,返回值和特殊值中的情況一樣。
生產(chǎn)者消費(fèi)者模式
方式1. 使用Lock
class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception { lock.lock(); try { //判斷 while (number != 0) { condition.await(); } //干活 number++; System.out.println(Thread.currentThread().getName() + " produce\t" + number); //通知喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement()throws Exception{ lock.lock(); try { //判斷 while (number == 0) { condition.await(); } //干活 number--; System.out.println(Thread.currentThread().getName() + " consume\t" + number); //通知喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } /** * 一個(gè)初始值為0的變量,兩個(gè)線程交替操作,一個(gè)加1一個(gè)減1,重復(fù)5次 * 1. 線程 操作 資源類 * 2. 判斷 干活 通知 * 3. 防止虛假喚醒機(jī)制:判斷的時(shí)候要用while而不是用if */ public class ProduceConsumeTraditionalDemo { public static void main(String[] args) { ShareData data=new ShareData(); new Thread(()->{ for (int i = 0; i < 5 ; i++) { try { data.increment(); } catch (Exception e) { e.printStackTrace(); } } },"A").start(); new Thread(()->{ for (int i = 0; i < 5 ; i++) { try { data.decrement(); } catch (Exception e) { e.printStackTrace(); } } },"B").start(); } }
打印結(jié)果
A produce 1
B consume 0
A produce 1
B consume 0
A produce 1
B consume 0
A produce 1
B consume 0
A produce 1
B consume 0
方法2:使用阻塞隊(duì)列
public class ProduceConsumeBlockingQueueDemo { public static void main(String[] args) { SharedData data=new SharedData(new ArrayBlockingQueue<>(10)); new Thread(()-> { System.out.println(Thread.currentThread().getName() + "\t生產(chǎn)線程啟動(dòng)"); try { data.produce(); } catch (InterruptedException e) { e.printStackTrace(); } },"Producer").start(); new Thread(()-> { System.out.println(Thread.currentThread().getName() + "\t消費(fèi)線程啟動(dòng)"); try { data.consume(); } catch (InterruptedException e) { e.printStackTrace(); } },"Consumer").start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } data.stop(); System.out.println("停止"); } } class SharedData{ private volatile boolean FLAG=true; private AtomicInteger atomicInteger=new AtomicInteger(); BlockingQueue<String> blockingQueue=null; public SharedData(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void produce() throws InterruptedException { String data=null; boolean ret; while(FLAG){ data=""+atomicInteger.incrementAndGet(); ret=blockingQueue.offer(data,2L,TimeUnit.SECONDS); if(ret){ System.out.println(Thread.currentThread().getName()+"\t插入"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"\t插入"+data+"失敗"); } TimeUnit.SECONDS.sleep(1); } System.out.println("生產(chǎn)結(jié)束,F(xiàn)LAG=false"); } public void consume() throws InterruptedException { String ret=null; while(FLAG){ ret=blockingQueue.poll(2L,TimeUnit.SECONDS); if(null==ret||ret.equalsIgnoreCase("")){ System.out.println(FLAG=false); System.out.println(Thread.currentThread().getName()+"\t消費(fèi)等待超時(shí)退出"); return; } System.out.println(Thread.currentThread().getName() + "\t消費(fèi)" + ret + "成功"); } } public void stop(){ FLAG=false; } }
使用阻塞隊(duì)列+原子類+volatile變量的方式。
打印結(jié)果如下:
java.util.concurrent.ArrayBlockingQueue
Producer 生產(chǎn)線程啟動(dòng)
Consumer 消費(fèi)線程啟動(dòng)
Producer 插入1成功
Consumer 消費(fèi)1成功
Producer 插入2成功
Consumer 消費(fèi)2成功
Producer 插入3成功
Consumer 消費(fèi)3成功
停止
生產(chǎn)結(jié)束,F(xiàn)LAG=false
false
Consumer 消費(fèi)等待超時(shí)退出
Synchronized 和 Lock 的區(qū)別
- 原始構(gòu)成
- Synchronized 是關(guān)鍵字,屬于JVM層面,底層是通過(guò) monitorenter 和 monitorexit 完成,依賴于 monitor 對(duì)象來(lái)完成。由于 wait/notify 方法也依賴于 monitor 對(duì)象,因此只有在同步塊或方法中才能調(diào)用這些方法。
- Lock 是 java.util.concurrent.locks.lock 包下的,是 api層面的鎖。
- 使用方法
- Synchronized 不需要用戶手動(dòng)釋放鎖,代碼完成之后系統(tǒng)自動(dòng)讓線程釋放鎖
- ReentrantLock 需要用戶手動(dòng)釋放鎖,沒(méi)有手動(dòng)釋放可能導(dǎo)致死鎖。
- 等待是否可以中斷
- Synchronized 不可中斷,除非拋出異?;蛘哒_\(yùn)行完成
- ReentrantLock 可以中斷。一種是通過(guò) tryLock(long timeout, TimeUnit unit),另一種是lockInterruptibly()放代碼塊中,調(diào)用interrupt()方法進(jìn)行中斷。
- 加鎖是否公平
- synchronized 是非公平鎖
- ReentrantLock 默認(rèn)非公平鎖,可以在構(gòu)造方法傳入 boolean 值,true 代表公平鎖,false 代表非公平鎖。
- 鎖綁定多個(gè) Condition
- Synchronized 只有一個(gè)阻塞隊(duì)列,只能隨機(jī)喚醒一個(gè)線程或者喚醒全部線程。
- ReentrantLock 用來(lái)實(shí)現(xiàn)分組喚醒,可以精確喚醒。
案例:三個(gè)線程循環(huán)打印
class ShareData{ private int number=1; private Lock lock=new ReentrantLock(); public void printA(){ lock.lock(); Condition conditionA=lock.newCondition(); try{ while(number!=1){ conditionA.await(); } for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=2; conditionA.signal(); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public void printB(){ lock.lock(); Condition conditionB=lock.newCondition(); try{ while(number!=2){ conditionB.await(); } for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=3; conditionB.signal(); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public void printC(){ lock.lock(); Condition conditionC=lock.newCondition(); try{ //判斷 while(number!=3){ conditionC.await(); } //干活 for (int i = 0; i < 15; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number=1; //通知 conditionC.signal(); }catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { ShareData data=new ShareData(); new Thread(() -> data.printA(),"A").start(); new Thread(() -> data.printB(),"B").start(); new Thread(() -> data.printC(),"C").start(); } }
線程池
創(chuàng)建線程
- 實(shí)現(xiàn) Runnable 接口
- 實(shí)現(xiàn) Callable 接口
- 繼承 Thread 類
- 使用線程池
Thread的構(gòu)造函數(shù)中并沒(méi)有傳入 Callable 的方式,但是可以傳入 Runnable 接口:
Thread thread=new Thread(Runnable runnable, String name);。為了使用 Callable 接口,我們需要使用到 FutureTask 類。 FutureTask 類實(shí)現(xiàn)了 RunnableFuture 這一接口,而 RunnableFutre 又是 Future 的子接口,因此 FutureTask 可以作為參數(shù)使用上述的 Thread 構(gòu)造函數(shù)。同時(shí), FutureTask 本身構(gòu)造函數(shù)可以傳入 Callable 。
class MyThread implements Callable<Integer>{ @Override public Integer call() { System.out.println("come in callable"); return 2019; } } class Main{ public static void main(String [] args){ FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2()); Thread t1=new Thread(futureTask,"A"); } }
線程池架構(gòu)
除此之外,還有 Executors 工具類。
ThreadPoolExecutor
線程池有七大參數(shù):
public ThreadPoolExecutor( int corePoolSize,//線程池常駐核心線程數(shù) int maximumPoolSize,//線程池能容納同時(shí)執(zhí)行最大線程數(shù) long keepAliveTime,//多余的空閑線程的存活時(shí)間,當(dāng)前線程池線程數(shù)量超過(guò)core,空閑時(shí)間達(dá)到keepAliveTime,多余空閑線程會(huì)被銷毀直到只剩下core個(gè) TimeUnit unit, BlockingQueue<Runnable> workQueue,//被提交尚未被執(zhí)行的任務(wù)隊(duì)列 ThreadFactory threadFactory,//創(chuàng)建線程的線程工廠 RejectedExecutionHandler handler//拒絕策略 ) {...}
處理流程如下:
- 創(chuàng)建線程池,等待提交過(guò)來(lái)的任務(wù)請(qǐng)求。
- 添加請(qǐng)求任務(wù)
- 如果運(yùn)行線程數(shù)小于 corePoolSize,創(chuàng)建線程運(yùn)行該任務(wù)
- 如果運(yùn)行線程數(shù)大于等于 corePoolSize,將任務(wù)放入隊(duì)列
- 隊(duì)列滿,且運(yùn)行線程數(shù)量小于 maximumPoolSize,創(chuàng)建非核心線程運(yùn)行任務(wù)
- 隊(duì)列滿,且運(yùn)行線程數(shù)量大于等于 maximumPoolSize,線程池會(huì)啟動(dòng)飽和拒絕策略執(zhí)行。
- 線程完成任務(wù),會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行
- 一個(gè)線程無(wú)事可做超過(guò) keepAliveTime 時(shí):
- 如果當(dāng)前運(yùn)行線程數(shù)大于 corePoolSize,該線程被停掉
- 線程池的所有任務(wù)完成后最終會(huì)收縮到 corePoolSize 的大小。
拒絕策略
在 JDK 中有四種內(nèi)置的拒絕策略,均實(shí)現(xiàn)了 RejectedExecutionHandler 接口。
- AbortPolicy: 直接拋出 RejectedExecutionException 異常,是默認(rèn)的拒絕策略。
- DiscardPolicy: 直接丟棄任務(wù),不予處理也不拋出異常。如果允許任務(wù)丟失,是最好的處理策略。
- DiscardOldestPolicy: 拋棄隊(duì)列中等待最久的任務(wù),然后把當(dāng)前任務(wù)加入隊(duì)列嘗試再次提交。
- CallerRunsPolicy: 調(diào)用者運(yùn)行。該策略既不會(huì)拋棄任務(wù),也不會(huì)拋出異常,而是將某些任務(wù)回退到調(diào)用者。
三種常用線程池
1、Executors.newFixedThreadPool(int)
創(chuàng)建固定容量的線程池,控制最大并發(fā)數(shù),超出的線程在隊(duì)列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
其中 corePoolSize 和 maximumPoolSize 值是相等的,并且使用的是 LinkedBlockingQueue。
適用于執(zhí)行長(zhǎng)期的任務(wù),性能比較高。
2、Executors.newSingleThreadExecutor()
創(chuàng)建了一個(gè)單線程的線程池,只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照順序執(zhí)行。
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
其中 corePoolSize 和 maximumPoolSize 都設(shè)置為1,使用的也是 LinkedBlockingQueue。
適用于一個(gè)任務(wù)一個(gè)任務(wù)執(zhí)行的場(chǎng)景。
3、Executors.newCachedThreadPool()
創(chuàng)建了一個(gè)可緩存的線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可以靈活回收空閑線程,沒(méi)有可以回收的,則新建線程。
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
設(shè)置 corePoolSize 為0, maximumPoolSize 設(shè)置為 Integer.MAX_VALUE,使用的是 SynchronousQueue。來(lái)了任務(wù)就創(chuàng)建線程執(zhí)行,線程空閑超過(guò)60秒后銷毀。
適用于執(zhí)行很多短期異步的小程序或者負(fù)載比較輕的服務(wù)器。
工作中使用什么樣的線程池
在阿里巴巴Java開發(fā)手冊(cè)中有如下規(guī)定:
- 線程資源必須通過(guò)線程池提供,不允許在應(yīng)用中自行顯示創(chuàng)建線程。
- 說(shuō)明:使用線程池的好處是減少在創(chuàng)建和銷毀線程上消耗的時(shí)間和系統(tǒng)資源的開銷,解決資源不足的問(wèn)題。如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程導(dǎo)致消耗完內(nèi)存或者過(guò)度切換。
- 線程池不允許使用 Executors 去創(chuàng)建,也就是不能使用上述的三種線程池,而是要通過(guò) ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源韓進(jìn)的風(fēng)險(xiǎn)。
- FixedThreadPool 和 SingleThreadPool 都采用了 LinkedBlockingQueue,其允許的隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能堆積大量的請(qǐng)求,導(dǎo)致OOM。
- CachedThreadPool 和 ScheduledThreadPool 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE,可能創(chuàng)建大量的線程,導(dǎo)致OOM。
如何設(shè)置線程池的線程數(shù)目
Runtime.getRuntime().availableProcessors()獲取當(dāng)前設(shè)備的CPU個(gè)數(shù)。
- CPU密集型任務(wù)
- CPU 密集的含義是任務(wù)需要大量的運(yùn)算,而沒(méi)有阻塞,CPU一致全速運(yùn)行
- CPU 密集任務(wù)只有在真正的多核 CPU 上才能得到加速(通過(guò)多線程),而在單核 CPU 上,無(wú)論開幾個(gè)模擬的多線程都不能得到加速
- CPU 密集型任務(wù)配置盡可能少的線程數(shù)量,一般設(shè)置為 CPU 核心數(shù) + 1
- IO 密集型
- IO 密集型,是指該任務(wù)需要大量的IO,大量的阻塞
- 單線程上運(yùn)行 IO 密集型的任務(wù)會(huì)導(dǎo)致浪費(fèi)大量的 CPU 運(yùn)算能力浪費(fèi)在等待上
- IO 密集型任務(wù)使用多線程可以大大加速程序運(yùn)行,利用了被浪費(fèi)掉的阻塞時(shí)間
- IO 密集型時(shí),大部分線程都阻塞,需要多配置線程數(shù),可以采用CPU核心數(shù) * 2,或者采用 CPU 核心數(shù) / (1 - 阻塞系數(shù)),阻塞系數(shù)在0.8 ~ 0.9之間
死鎖
產(chǎn)生死鎖的原因
死鎖是指兩個(gè)或兩個(gè)以上的進(jìn)程在執(zhí)行過(guò)程中,因?yàn)闋?zhēng)奪資源造成的互相等待的現(xiàn)象。
死鎖需要滿族的四大條件如下:
- 互斥
- 循環(huán)等待
- 不可搶占
- 占有并等待
產(chǎn)生死鎖的主要原因有:
- 系統(tǒng)資源不足
- 進(jìn)程運(yùn)行推進(jìn)順序不當(dāng)
- 資源分配不當(dāng)
死鎖實(shí)例
class HoldLockThread implements Runnable{ private String lock1; private String lock2; public HoldLockThread(String lock1, String lock2) { this.lock1 = lock1; this.lock2 = lock2; } @Override public void run() { synchronized (lock1){ System.out.println(Thread.currentThread().getName()+"\t持有"+lock1+"\t嘗試獲取"+lock2); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2){ System.out.println(Thread.currentThread().getName()+"\t持有"+lock1+"\t嘗試獲取"+lock2); } } } } public class DeadLockDemo { public static void main(String[] args) { String lockA="lockA"; String lockB="lockB"; new Thread(new HoldLockThread(lockA,lockB),"Thread1").start(); new Thread(new HoldLockThread(lockB,lockA),"Thread2").start(); } }
輸出如下結(jié)果,程序并沒(méi)有終止。
Thread2 持有l(wèi)ockB 嘗試獲取lockA
Thread1 持有l(wèi)ockA 嘗試獲取lockB
死鎖定位分析
使用 jps ,類似于 linux 中的 ps 命令。
在上述 java 文件中,使用 IDEA 中的 open In Terminal,或者在該文件目錄下使用 cmd 命令行工具。
首先使用 jps -l命令,類似于ls -l命令,輸出當(dāng)前運(yùn)行的 java 線程,從中能得知 DeadLockDemo 線程的線程號(hào)。
然后,使用jstack threadId來(lái)查看棧信息。輸出如下:
Java stack information for the threads listed above:
===================================================
"Thread2":
at interview.jvm.deadlock.HoldLockThread.run(DeadLockDemo.java:22)
- waiting to lock <0x00000000d6240328> (a java.lang.String)
- locked <0x00000000d6240360> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)
"Thread1":
at interview.jvm.deadlock.HoldLockThread.run(DeadLockDemo.java:22)
- waiting to lock <0x00000000d6240360> (a java.lang.String)
- locked <0x00000000d6240328> (a java.lang.String)
at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
springboot druid數(shù)據(jù)庫(kù)連接池連接失敗后一直重連的解決方法
本文主要介紹了springboot druid數(shù)據(jù)庫(kù)連接池連接失敗后一直重連的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04Java運(yùn)用SWT插件編寫桌面記事本應(yīng)用程序
這篇文章主要為大家介紹了一個(gè)Java項(xiàng)目實(shí)戰(zhàn),一步步教你實(shí)現(xiàn)記事本,步驟很詳細(xì),運(yùn)用SWT插件手把手編寫記事本,感興趣的小伙伴們可以參考一下2016-01-01JavaMail實(shí)現(xiàn)簡(jiǎn)單郵件發(fā)送
這篇文章主要為大家詳細(xì)介紹了JavaMail實(shí)現(xiàn)簡(jiǎn)單郵件發(fā)送,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08SpringBoot整合BootStrap實(shí)戰(zhàn)
這篇文章主要介紹了SpringBoot整合BootStrap實(shí)戰(zhàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09網(wǎng)關(guān)Spring Cloud Gateway HTTP超時(shí)配置問(wèn)題
這篇文章主要介紹了網(wǎng)關(guān)Spring Cloud Gateway HTTP超時(shí)配置問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01java Collection 之Set使用說(shuō)明
本篇文章小編為大家介紹,java Collection 之Set使用說(shuō)明。需要的朋友參考下2013-04-04SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式
這篇文章主要介紹了SpringBoot中處理的轉(zhuǎn)發(fā)與重定向方式,分別就轉(zhuǎn)發(fā)和重定向做了概念解說(shuō),結(jié)合示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-11-11玩轉(zhuǎn)spring boot MVC應(yīng)用(2)
玩轉(zhuǎn)spring boot,如何快速搭建一個(gè)MCV程序?這篇文章為大家詳細(xì)主要介紹了一個(gè)MCV程序的快速搭建過(guò)程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-01-01