Java線程間共享與協(xié)作詳細(xì)介紹
線程的共享
synchronized內(nèi)置鎖
Java 支持多個(gè)線程同時(shí)訪問一個(gè)對象或者對象的成員變量,關(guān)鍵字synchronized 可以修飾方法或者以同步塊的形式來進(jìn)行使用,它主要確保多個(gè)線程在同一個(gè)時(shí)刻,只能有一個(gè)線程處于方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性,又稱為內(nèi)置鎖機(jī)制。 對象鎖和類鎖: 對象鎖是用于對象實(shí)例方法,或者一個(gè)對象實(shí)例上的,類鎖是用于類的靜態(tài)方法或者一個(gè)類的 class 對象上的。我們知道,類的對象實(shí)例可以有很多個(gè),但是每個(gè)類只有一個(gè) class 對象,所以不同對象實(shí)例的對象鎖是互不干擾的,但是每個(gè)類只有一個(gè)類鎖。 但是有一點(diǎn)必須注意的是,其實(shí)類鎖只是一個(gè)概念上的東西,并不是真實(shí)存在的,類鎖其實(shí)鎖的是每個(gè)類的對應(yīng)的 class 對象。類鎖和對象鎖之間也是互不干擾的。
代碼示例:
*類說明:synchronized關(guān)鍵字的使用方法 */ public class SynTest { private long count =0; private Object obj = new Object();//作為一個(gè)鎖 public long getCount() { return count; } public void setCount(long count) { this.count = count; } /*用在同步塊上*/ public void incCount(){ synchronized (obj){ count++; } } /*用在方法上*/ public synchronized void incCount2(){ count++; } /*用在同步塊上,但是鎖的是當(dāng)前類的對象實(shí)例*/ public void incCount3(){ synchronized (this){ count++; } } //線程 private static class Count extends Thread{ private SynTest simplOper; public Count(SynTest simplOper) { this.simplOper = simplOper; } @Override public void run() { for(int i=0;i<10000;i++){ simplOper.incCount();//count = count+10000 } } } public static void main(String[] args) throws InterruptedException { SynTest simplOper = new SynTest(); //啟動(dòng)兩個(gè)線程 Count count1 = new Count(simplOper); Count count2 = new Count(simplOper); count1.start(); count2.start(); Thread.sleep(50); System.out.println(simplOper.count);//20000 } }
/** *類說明:鎖的實(shí)例不一樣,也是可以并行的 */ public class DiffInstance { private static class InstanceSyn implements Runnable{ private DiffInstance diffInstance; public InstanceSyn(DiffInstance diffInstance) { this.diffInstance = diffInstance; } @Override public void run() { System.out.println("TestInstance is running..."+ diffInstance); diffInstance.instance(); } } private static class Instance2Syn implements Runnable{ private DiffInstance diffInstance; public Instance2Syn(DiffInstance diffInstance) { this.diffInstance = diffInstance; } @Override public void run() { System.out.println("TestInstance2 is running..."+ diffInstance); diffInstance.instance2(); } } private synchronized void instance(){ SleepTools.second(3); System.out.println("synInstance is going..."+this.toString()); SleepTools.second(3); System.out.println("synInstance ended "+this.toString()); } private synchronized void instance2(){ SleepTools.second(3); System.out.println("synInstance2 is going..."+this.toString()); SleepTools.second(3); System.out.println("synInstance2 ended "+this.toString()); } public static void main(String[] args) { DiffInstance instance1 = new DiffInstance(); Thread t3 = new Thread(new Instance2Syn(instance1)); DiffInstance instance2 = new DiffInstance(); Thread t4 = new Thread(new InstanceSyn(instance1)); //先執(zhí)行完一個(gè)才會(huì)執(zhí)行另外一個(gè) t3.start(); t4.start(); SleepTools.second(1); } }
/** *類說明:演示實(shí)例鎖和類鎖是不同的,兩者可以并行 */ public class InstanceAndClass { private static class SynClass extends Thread{ @Override public void run() { System.out.println("TestClass is running..."); synClass(); } } private static class InstanceSyn implements Runnable{ private InstanceAndClass SynClassAndInstance; public InstanceSyn(InstanceAndClass SynClassAndInstance) { this.SynClassAndInstance = SynClassAndInstance; } @Override public void run() { System.out.println("TestInstance is running..."+SynClassAndInstance); SynClassAndInstance.instance(); } } private synchronized void instance(){ SleepTools.second(1); System.out.println("synInstance is going..."+this.toString()); SleepTools.second(1); System.out.println("synInstance ended "+this.toString()); } private static synchronized void synClass(){ SleepTools.second(1); System.out.println("synClass going..."); SleepTools.second(1); System.out.println("synClass end"); } public static void main(String[] args) { InstanceAndClass synClassAndInstance = new InstanceAndClass(); Thread t1 = new SynClass(); Thread t2 = new Thread(new InstanceSyn(synClassAndInstance)); t2.start(); SleepTools.second(1); t1.start(); } }
/** *類說明:類鎖和鎖static變量也是不同的 可以并行 */ public class StaticAndClass { private static class SynClass extends Thread{ @Override public void run() { System.out.println(currentThread().getName() +":SynClass is running..."); synClass(); } } private static class SynStatic extends Thread{ @Override public void run() { System.out.println(currentThread().getName() +"SynStatic is running..."); synStatic(); } } private static synchronized void synClass(){ System.out.println(Thread.currentThread().getName() +"synClass going..."); SleepTools.second(1); System.out.println(Thread.currentThread().getName() +"synClass end"); } private static Object obj = new Object(); private static void synStatic(){ synchronized (obj){ System.out.println(Thread.currentThread().getName() +"synStatic going..."); SleepTools.second(1); System.out.println(Thread.currentThread().getName() +"synStatic end"); } } public static void main(String[] args) { StaticAndClass synClassAndInstance = new StaticAndClass(); Thread t1 = new SynClass(); //Thread t2 = new SynStatic(); Thread t2 = new SynClass(); t2.start(); SleepTools.second(1); t1.start(); } }
錯(cuò)誤的加鎖和原因分析
原因:雖然我們對 i 進(jìn)行了加鎖,但是
但是當(dāng)我們反編譯這個(gè)類的 class 文件后,可以看到 i++實(shí)際是,
本質(zhì)上是返回了一個(gè)新的 Integer 對象。也就是每個(gè)線程實(shí)際加鎖的是不同的 Integer 對象。
volatile,最輕量的同步機(jī)制
volatile 保證了不同線程對這個(gè)變量進(jìn)行操作時(shí)的可見性,即一個(gè)線程修改了某個(gè)變量的值,這新值對其他線程來說是立即可見的。
不加 volatile 時(shí),子線程無法感知主線程修改了 ready 的值,從而不會(huì)退出循環(huán),而加了 volatile 后,子線程可以感知主線程修改了 ready 的值,迅速退出循環(huán)。
/** * 類說明:演示Volatile的提供的可見性 */ public class VolatileCase { private volatile static boolean ready; private static int number; // private static class PrintThread extends Thread{ @Override public void run() { System.out.println("PrintThread is running......."); while(!ready);//無限循環(huán) System.out.println("number = "+number); } } public static void main(String[] args) { new PrintThread().start(); SleepTools.second(1); number = 51;//如果沒有加volatile關(guān)鍵字則主線程都結(jié)束了也沒有打印number的值,加了關(guān)鍵值后打印出來的值就是主線程修改的值 ready = true; SleepTools.second(5); System.out.println("main is ended!"); } }
但是 volatile 不能保證數(shù)據(jù)在多個(gè)線程下同時(shí)寫時(shí)的線程安全。
/** * 類說明: */ public class NotSafe { private volatile long count =0; public long getCount() { return count; } public void setCount(long count) { this.count = count; } //count進(jìn)行累加 public void incCount(){ count++; } //線程 private static class Count extends Thread{ private NotSafe simplOper; public Count(NotSafe simplOper) { this.simplOper = simplOper; } @Override public void run() { for(int i=0;i<10000;i++){ simplOper.incCount(); } } } public static void main(String[] args) throws InterruptedException { NotSafe simplOper = new NotSafe(); //啟動(dòng)兩個(gè)線程 Count count1 = new Count(simplOper); Count count2 = new Count(simplOper); count1.start(); count2.start(); Thread.sleep(50); System.out.println(simplOper.count);//20000? } }
volatile 最適用的場景:一個(gè)線程寫,多個(gè)線程讀。
ThreadLocal
與 Synchonized的比較
ThreadLocal 和 Synchonized 都用于解決多線程并發(fā)訪問??墒?ThreadLocal與 synchronized 有本質(zhì)的差別。synchronized 是利用鎖的機(jī)制,使變量或代碼塊在某一時(shí)該僅僅能被一個(gè)線程訪問。而 ThreadLocal 為每個(gè)線程都提供了變量的副本 ,使得每個(gè)線程在某一時(shí)間訪問到的并非同一個(gè)對象,這樣就隔離了多個(gè)線程對數(shù)據(jù)的數(shù)據(jù)共享。 Spring 的事務(wù)就借助了 ThreadLocal 類。Spring 會(huì)從數(shù)據(jù)庫連接池中獲得一個(gè)connection,然會(huì)把connection 放進(jìn) ThreadLocal 中,也就和線程綁定了,事務(wù)需要提交或者回滾,只要從 ThreadLocal 中拿到 connection 進(jìn)行操作。為何 Spring的事務(wù)要借助 ThreadLocal 類?
以 JDBC 為例,正常的事務(wù)代碼可能如下:
dbc = new DataBaseConnection();//第 1 行 Connection con = dbc.getConnection();//第 2 行 con.setAutoCommit(false);// //第 3 行 con.executeUpdate(...);//第 4 行 con.executeUpdate(...);//第 5 行 con.executeUpdate(...);//第 6 行 con.commit();////第 7 行
上述代碼,可以分成三個(gè)部分: 事務(wù)準(zhǔn)備階段:第 1~3 行 業(yè)務(wù)處理階段:第 4~6 行 事務(wù)提交階段:第 7 行 可以很明顯的看到,不管我們開啟事務(wù)還是執(zhí)行具體的 sql 都需要一個(gè)具體的數(shù)據(jù)庫連接?,F(xiàn)在我們開發(fā)應(yīng)用一般都采用三層結(jié)構(gòu),如果我們控制事務(wù)的代碼都放在DAO(DataAccessObject)對象中,在 DAO 對象的每個(gè)方法當(dāng)中去打開事務(wù)和關(guān)閉事務(wù),當(dāng) Service 對象在調(diào)用 DAO 時(shí),如果只調(diào)用一個(gè) DAO,那我們這樣實(shí)現(xiàn)則效果不錯(cuò),但往往我們的 Service 會(huì)調(diào)用一系列的 DAO 對數(shù)據(jù)庫進(jìn)行多次操作,那么,這個(gè)時(shí)候我們就無法控制事務(wù)的邊界了,因?yàn)閷?shí)際應(yīng)用當(dāng)中,我們的 Service調(diào)用的 DAO 的個(gè)數(shù)是不確定的,可根據(jù)需求而變化,而且還可能出現(xiàn) Service 調(diào)用 Service 的情況。
如果不使用 ThreadLocal,代碼大概就會(huì)是這個(gè)樣子:
但是需要注意一個(gè)問題,如何讓三個(gè) DAO 使用同一個(gè)數(shù)據(jù)源連接呢?我們就必須為每個(gè) DAO 傳遞同一個(gè)數(shù)據(jù)庫連接,要么就是在 DAO 實(shí)例化的時(shí)候作為構(gòu)造方法的參數(shù)傳遞,要么在每個(gè) DAO 的實(shí)例方法中作為方法的參數(shù)傳遞。這兩種方式無疑對我們的 Spring 框架或者開發(fā)人員來說都不合適。為了讓這個(gè)數(shù)據(jù)庫連接可以跨階段傳遞,又不顯示的進(jìn)行參數(shù)傳遞,就必須使用別的辦法。 Web 容器中,每個(gè)完整的請求周期會(huì)由一個(gè)線程來處理。因此,如果我們能將一些參數(shù)綁定到線程的話,就可以實(shí)現(xiàn)在軟件架構(gòu)中跨層次的參數(shù)共享(是隱式的共享)。而 JAVA 中恰好提供了綁定的方法--使用 ThreadLocal。 結(jié)合使用 Spring 里的 IOC 和 AOP,就可以很好的解決這一點(diǎn)。 只要將一個(gè)數(shù)據(jù)庫連接放入 ThreadLocal 中,當(dāng)前線程執(zhí)行時(shí)只要有使用數(shù)據(jù)庫連接的地方就從 ThreadLocal 獲得就行了。
ThreadLocal的使用
ThreadLocal 類接口很簡單,只有 4 個(gè)方法,我們先來了解一下: • void set(Object value) 設(shè)置當(dāng)前線程的線程局部變量的值。 • public Object get() 該方法返回當(dāng)前線程所對應(yīng)的線程局部變量。 • public void remove() 將當(dāng)前線程局部變量的值刪除,目的是為了減少內(nèi)存的占用,該方法是 JDK5.0 新增的方法。需要指出的是,當(dāng)線程結(jié)束后,對應(yīng)該線程的局部變量將自動(dòng)被垃圾回收,所以顯式調(diào)用該方法清除線程的局部變量并不是必須的操作,但它可以加快內(nèi)存回收的速度。 • protected Object initialValue() 返回該線程局部變量的初始值,該方法是一個(gè) protected 的方法,顯然是為了讓子類覆蓋而設(shè)計(jì)的。這個(gè)方法是一個(gè)延遲調(diào)用方法,在線程第 1 次調(diào)用 get()或 set(Object)時(shí)才執(zhí)行,并且僅執(zhí)行 1 次。ThreadLocal 中的缺省實(shí)現(xiàn)直接返回一個(gè) null。
public final static ThreadLocal RESOURCE = new ThreadLocal();RESOURCE 代表一個(gè)能夠存放 String 類型的 ThreadLocal 對象。此時(shí)不論什么一個(gè)線程能夠并發(fā)訪問這個(gè)變量,對它進(jìn)行寫入、讀取操作,都是線程安全的。
代碼示例:
/** *類說明:演示ThreadLocal的使用 */ public class UseThreadLocal { private static ThreadLocal<Integer> intLocal = new ThreadLocal<Integer>(){ @Override protected Integer initialValue() { return 1; } }; private static ThreadLocal<String> stringThreadLocal; /** * 運(yùn)行3個(gè)線程 */ public void StartThreadArray(){ Thread[] runs = new Thread[3]; for(int i=0;i<runs.length;i++){ runs[i]=new Thread(new TestThread(i)); } for(int i=0;i<runs.length;i++){ runs[i].start(); } } /** *類說明:測試線程,線程的工作是將ThreadLocal變量的值變化,并寫回,看看線程之間是否會(huì)互相影響 */ public static class TestThread implements Runnable{ int id; public TestThread(int id){ this.id = id; } public void run() { System.out.println(Thread.currentThread().getName()+":start"); Integer s = intLocal.get(); s = s+id; intLocal.set(s); System.out.println(Thread.currentThread().getName() +":"+ intLocal.get()); //intLocal.remove(); } } public static void main(String[] args){ UseThreadLocal test = new UseThreadLocal(); test.StartThreadArray(); } }
/** * 類說明: */ public class NoThreadLocal { static Integer count = new Integer(1); /** * 運(yùn)行3個(gè)線程 */ public void StartThreadArray(){ Thread[] runs = new Thread[3]; for(int i=0;i<runs.length;i++){ runs[i]=new Thread(new TestTask(i)); } for(int i=0;i<runs.length;i++){ runs[i].start(); } } /** *類說明: */ public static class TestTask implements Runnable{ int id; public TestTask(int id){ this.id = id; } public void run() { System.out.println(Thread.currentThread().getName()+":start"); count = count+id; System.out.println(Thread.currentThread().getName()+":" +count); } } public static void main(String[] args){ NoThreadLocal test = new NoThreadLocal(); test.StartThreadArray(); } }
實(shí)現(xiàn)解析
上面先取到當(dāng)前線程,然后調(diào)用 getMap 方法獲取對應(yīng)的 ThreadLocalMap,ThreadLocalMap 是 ThreadLocal 的靜態(tài)內(nèi)部類,然后 Thread 類中有一個(gè)這樣類型成員,所以 getMap 是直接返回 Thread 的成員。
看下 ThreadLocal 的內(nèi)部類 ThreadLocalMap 源碼:
可以看到有個(gè) Entry 內(nèi)部靜態(tài)類,它繼承了 WeakReference,總之它記錄了兩個(gè)信息,一個(gè)是 ThreadLocal<?>類型,一個(gè)是 Object 類型的值。getEntry 方法則是獲取某個(gè) ThreadLocal 對應(yīng)的值,set 方法就是更新或賦值相應(yīng)的 ThreadLocal對應(yīng)的值。
回顧我們的 get 方法,其實(shí)就是拿到每個(gè)線程獨(dú)有的 ThreadLocalMap,然后再用 ThreadLocal 的當(dāng)前實(shí)例,拿到 Map 中的相應(yīng)的 Entry,然后就可以拿到相應(yīng)的值返回出去。當(dāng)然,如果 Map 為空,還會(huì)先進(jìn)行 map 的創(chuàng)建,初始化等工作。
引發(fā)的內(nèi)存泄漏分析
引用 Object o = new Object(); 這個(gè) o,我們可以稱之為對象引用,而 new Object()我們可以稱之為在內(nèi)存中產(chǎn)生了一個(gè)對象實(shí)例。
當(dāng)寫下 o=null 時(shí),只是表示 o 不再指向堆中 object 的對象實(shí)例,不代表這個(gè)對象實(shí)例不存在了。
強(qiáng)引用就是指在程序代碼之中普遍存在的,類似“Object obj=new Object()”這類的引用,只要強(qiáng)引用還存在,垃圾收集器永遠(yuǎn)不會(huì)回收掉被引用的對象實(shí)例。
軟引用是用來描述一些還有用但并非必需的對象。對于軟引用關(guān)聯(lián)著的對象,在系統(tǒng)將要發(fā)生內(nèi)存溢出異常之前,將會(huì)把這些對象實(shí)例列進(jìn)回收范圍之中進(jìn)行第二次回收。如果這次回收還沒有足夠的內(nèi)存,才會(huì)拋出內(nèi)存溢出異常。在 JDK1.2 之后,提供了 SoftReference 類來實(shí)現(xiàn)軟引用。
弱引用也是用來描述非必需對象的,但是它的強(qiáng)度比軟引用更弱一些,被弱引用關(guān)聯(lián)的對象實(shí)例只能生存到下一次垃圾收集發(fā)生之前。當(dāng)垃圾收集器工作時(shí),無論當(dāng)前內(nèi)存是否足夠,都會(huì)回收掉只被弱引用關(guān)聯(lián)的對象實(shí)例。在 JDK 1.2 之后,提供了WeakReference 類來實(shí)現(xiàn)弱引用。
虛引用也稱為幽靈引用或者幻影引用,它是最弱的一種引用關(guān)系。一個(gè)對象實(shí)例是否有虛引用的存在,完全不會(huì)對其生存時(shí)間構(gòu)成影響,也無法通過虛引用來取得一個(gè)對象實(shí)例。為一個(gè)對象設(shè)置虛引用關(guān)聯(lián)的唯一目的就是能在這個(gè)對象實(shí)例被收集器回收時(shí)收到一個(gè)系統(tǒng)通知。在 JDK 1.2 之后,提供了PhantomReference 類來實(shí)現(xiàn)虛引用。
內(nèi)存泄漏的現(xiàn)象
將堆內(nèi)存大小設(shè)置為-Xmx256m 我們啟用一個(gè)線程池,大小固定為 5 個(gè)線程
final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
場景 1,首先任務(wù)中不執(zhí)行任何有意義的代碼,當(dāng)所有的任務(wù)提交執(zhí)行完成后,可以看見,我們這個(gè)應(yīng)用的內(nèi)存占用基本上為 25M 左右
場景 2,然后我們只簡單的在每個(gè)任務(wù)中 new 出一個(gè)數(shù)組,執(zhí)行完成后我們可以看見,內(nèi)存占用基本和場景 1 同
場景 3,當(dāng)我們啟用了 ThreadLocal 以后:
執(zhí)行完成后我們可以看見,內(nèi)存占用變?yōu)榱?100M 左右場景 4,于是,我們加入一行代碼,再執(zhí)行,看看內(nèi)存情況:
可以看見,內(nèi)存占用基本和場景 1 同。 這就充分說明,場景 3,當(dāng)我們啟用了 ThreadLocal 以后確實(shí)發(fā)生了內(nèi)存泄漏。
分析
根據(jù)我們前面對 ThreadLocal 的分析,我們可以知道每個(gè) Thread 維護(hù)一個(gè)ThreadLocalMap,這個(gè)映射表的 key 是 ThreadLocal 實(shí)例本身,value 是真正需要存儲(chǔ)的 Object,也就是說 ThreadLocal 本身并不存儲(chǔ)值,它只是作為一個(gè) key來讓線程從ThreadLocalMap 獲取 value。仔細(xì)觀察 ThreadLocalMap,這個(gè) map是使用 ThreadLocal 的弱引用作為 Key 的,弱引用的對象在 GC 時(shí)會(huì)被回收。
因此使用了 ThreadLocal 后,引用鏈如圖所示 :
圖中的虛線表示弱引用。 這樣,當(dāng)把 threadlocal 變量置為 null 以后,沒有任何強(qiáng)引用指向 threadlocal實(shí)例,所以 threadlocal 將會(huì)被 gc 回收。這樣一來,ThreadLocalMap 中就會(huì)出現(xiàn)key 為 null 的 Entry,就沒有辦法訪問這些 key 為 null 的 Entry 的 value,如果當(dāng)前線程再遲遲不結(jié)束的話,這些 key 為 null 的 Entry 的 value 就會(huì)一直存在一條強(qiáng)引用鏈:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value,而這塊 value 永遠(yuǎn)不會(huì)被訪問到了,所以存在著內(nèi)存泄露。 只有當(dāng)前 thread 結(jié)束以后,current thread 就不會(huì)存在棧中,強(qiáng)引用斷開,Current Thread、Map value 將全部被 GC 回收。最好的做法是不在需要使用ThreadLocal 變量后,都調(diào)用它的 remove()方法,清除數(shù)據(jù)。 所以回到我們前面的實(shí)驗(yàn)場景,場景 3 中,雖然線程池里面的任務(wù)執(zhí)行完畢了,但是線程池里面的 5 個(gè)線程會(huì)一直存在直到 JVM 退出,我們 set 了線程的localVariable 變量后沒有調(diào)用 localVariable.remove()方法,導(dǎo)致線程池里面的 5 個(gè)線程的 threadLocals 變量里面的 new LocalVariable()實(shí)例沒有被釋放。 其實(shí)考察 ThreadLocal 的實(shí)現(xiàn),我們可以看見,無論是 get()、set()在某些時(shí)候,調(diào)用了 expungeStaleEntry 方法用來清除 Entry 中 Key 為 null 的 Value,但是這是不及時(shí)的,也不是每次都會(huì)執(zhí)行的,所以一些情況下還是會(huì)發(fā)生內(nèi)存泄露。只有 remove()方法中顯式調(diào)用了 expungeStaleEntry 方法。 從表面上看內(nèi)存泄漏的根源在于使用了弱引用,但是另一個(gè)問題也同樣值得
思考:為什么使用弱引用而不是強(qiáng)引用?
下面我們分兩種情況討論:
key 使用強(qiáng)引用:引用 ThreadLocal 的對象被回收了,但是 ThreadLocalMap還持有 ThreadLocal 的強(qiáng)引用,如果沒有手動(dòng)刪除,ThreadLocal 的對象實(shí)例不會(huì)被回收,導(dǎo)致 Entry 內(nèi)存泄漏。
key 使用弱引用:引用的 ThreadLocal 的對象被回收了,由于 ThreadLocalMap持有 ThreadLocal 的弱引用,即使沒有手動(dòng)刪除,ThreadLocal 的對象實(shí)例也會(huì)被回收。value 在下一次 ThreadLocalMap 調(diào)用 set,get,remove 都有機(jī)會(huì)被回收。
比較兩種情況,我們可以發(fā)現(xiàn):由于 ThreadLocalMap 的生命周期跟 Thread一樣長,如果都沒有手動(dòng)刪除對應(yīng) key,都會(huì)導(dǎo)致內(nèi)存泄漏,但是使用弱引用可以多一層保障。 因此,ThreadLocal 內(nèi)存泄漏的根源是:由于 ThreadLocalMap 的生命周期跟Thread 一樣長,如果沒有手動(dòng)刪除對應(yīng) key 就會(huì)導(dǎo)致內(nèi)存泄漏,而不是因?yàn)槿跻谩?/p>
總結(jié) JVM 利用設(shè)置 ThreadLocalMap 的 Key 為弱引用,來避免內(nèi)存泄露。 JVM 利用調(diào)用 remove、get、set 方法的時(shí)候,回收弱引用。 當(dāng) ThreadLocal 存儲(chǔ)很多 Key 為 null 的 Entry 的時(shí)候,而不再去調(diào)用 remove、get、set 方法,那么將導(dǎo)致內(nèi)存泄漏。 使用線程池+ ThreadLocal 時(shí)要小心,因?yàn)檫@種情況下,線程是一直在不斷的重復(fù)運(yùn)行的,從而也就造成了 value 可能造成累積的情況。
/** * 類說明:ThreadLocal造成的內(nèi)存泄漏演示 */ public class ThreadLocalOOM { private static final int TASK_LOOP_SIZE = 500; final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); static class LocalVariable { private byte[] a = new byte[1024*1024*5];/*5M大小的數(shù)組*/ } final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<>(); public static void main(String[] args) throws InterruptedException { Object o = new Object(); /*5*5=25*/ for (int i = 0; i < TASK_LOOP_SIZE; ++i) { poolExecutor.execute(new Runnable() { public void run() { //localVariable.set(new LocalVariable()); new LocalVariable(); System.out.println("use local varaible"); //localVariable.remove(); } }); Thread.sleep(100); } System.out.println("pool execute over"); } }
錯(cuò)誤使用ThreadLocal導(dǎo)致線程不安全
/** * 類說明:ThreadLocal的線程不安全演示 */ public class ThreadLocalUnsafe implements Runnable { public static Number number = new Number(0); public void run() { //每個(gè)線程計(jì)數(shù)加一 number.setNum(number.getNum()+1); //將其存儲(chǔ)到ThreadLocal中 value.set(number); SleepTools.ms(2); //輸出num值 System.out.println(Thread.currentThread().getName()+"="+value.get().getNum()); } public static ThreadLocal<Number> value = new ThreadLocal<Number>() { }; public static void main(String[] args) { for (int i = 0; i < 5; i++) { new Thread(new ThreadLocalUnsafe()).start(); } } private static class Number { public Number(int num) { this.num = num; } private int num; public int getNum() { return num; } public void setNum(int num) { this.num = num; } @Override public String toString() { return "Number [num=" + num + "]"; } } }
為什么每個(gè)線程都輸出 5?難道他們沒有獨(dú)自保存自己的 Number 副本嗎?為什么其他線程還是能夠修改這個(gè)值?仔細(xì)考察 ThreadLocal 和 Thead 的代碼,我們發(fā)現(xiàn) ThreadLocalMap 中保存的其實(shí)是對象的一個(gè)引用,這樣的話,當(dāng)有其他線程對這個(gè)引用指向的對象實(shí)例做修改時(shí),其實(shí)也同時(shí)影響了所有的線程持有的對象引用所指向的同一個(gè)對象實(shí)例。這也就是為什么上面的程序?yàn)槭裁磿?huì)輸出一樣的結(jié)果:5 個(gè)線程中保存的是同一 Number 對象的引用,在線程睡眠的時(shí)候,其他線程將 num 變量進(jìn)行了修改,而修改的對象 Number 的實(shí)例是同一份,因此它們最終輸出的結(jié)果是相同的。 而上面的程序要正常的工作,應(yīng)該的用法是讓每個(gè)線程中的 ThreadLocal 都應(yīng)該持有一個(gè)新的 Number 對象。
線程間的協(xié)作
線程之間相互配合,完成某項(xiàng)工作,比如:一個(gè)線程修改了一個(gè)對象的值,而另一個(gè)線程感知到了變化,然后進(jìn)行相應(yīng)的操作,整個(gè)過程開始于一個(gè)線程,而最終執(zhí)行又是另一個(gè)線程。前者是生產(chǎn)者,后者就是消費(fèi)者,這種模式隔離了“做什么”(what)和“怎么做”(How),簡單的辦法是讓消費(fèi)者線程不斷地循環(huán)檢查變量是否符合預(yù)期在 while 循環(huán)中設(shè)置不滿足的條件,如果條件滿足則退出 while 循環(huán),從而完成消費(fèi)者的工作。卻存在如下問題: 1) 難以確保及時(shí)性。 2)難以降低開銷。如果降低睡眠的時(shí)間,比如休眠 1 毫秒,這樣消費(fèi)者能更加迅速地發(fā)現(xiàn)條件變化,但是卻可能消耗更多的處理器資源,造成了無端的浪費(fèi)。
等待/通知機(jī)制
是指一個(gè)線程 A 調(diào)用了對象 O 的 wait()方法進(jìn)入等待狀態(tài),而另一個(gè)線程 B調(diào)用了對象 O 的 notify()或者 notifyAll()方法,線程 A 收到通知后從對象 O 的 wait()方法返回,進(jìn)而執(zhí)行后續(xù)操作。上述兩個(gè)線程通過對象 O 來完成交互,而對象上的 wait()和notify/notifyAll()的關(guān)系就如同開關(guān)信號一樣,用來完成等待方和通知方之間的交互工作。
notify(): 通知一個(gè)在對象上等待的線程,使其從 wait 方法返回,而返回的前提是該線程獲取到了對象的鎖,沒有獲得鎖的線程重新進(jìn)入 WAITING 狀態(tài)。
notifyAll(): 通知所有等待在該對象上的線程
wait() 調(diào)用該方法的線程進(jìn)入 WAITING 狀態(tài),只有等待另外線程的通知或被中斷才會(huì)返回.需要注意,調(diào)用 wait()方法后,會(huì)釋放對象的鎖
wait(long) 超時(shí)等待一段時(shí)間,這里的參數(shù)時(shí)間是毫秒,也就是等待長達(dá) n 毫秒,如果沒有通知就超時(shí)返回
wait (long,int) 對于超時(shí)時(shí)間更細(xì)粒度的控制,可以達(dá)到納秒
等待和通知的標(biāo)準(zhǔn)范式
等待方遵循如下原則:
- 1)獲取對象的鎖。
- 2)如果條件不滿足,那么調(diào)用對象的 wait()方法,被通知后仍要檢查條件。
- 3)條件滿足則執(zhí)行對應(yīng)的邏輯。
通知方遵循如下原則:
- 1)獲得對象的鎖。
- 2)改變條件。
- 3)通知所有等待在對象上的線程。
在調(diào)用 wait()、notify()系列方法之前,線程必須要獲得該對象的對象級別鎖,即只能在同步方法或同步塊中調(diào)用 wait()方法、notify()系列方法,進(jìn)入 wait()方法后,當(dāng)前線程釋放鎖,在從 wait()返回前,線程與其他線程競爭重新獲得鎖,執(zhí)行 notify()系列方法的線程退出調(diào)用了 notifyAll 的 synchronized代碼塊的時(shí)候后,他們就會(huì)去競爭。如果其中一個(gè)線程獲得了該對象鎖,它就會(huì)繼續(xù)往下執(zhí)行,在它退出 synchronized 代碼塊,釋放鎖后,其他的已經(jīng)被喚醒的線程將會(huì)繼續(xù)競爭獲取該鎖,一直進(jìn)行下去,直到所有被喚醒的線程都執(zhí)行完畢。
notify 和 notifyAll 應(yīng)該用誰
盡可能用 notifyall(),謹(jǐn)慎使用 notify(),因?yàn)?notify()只會(huì)喚醒一個(gè)線程,我們無法確保被喚醒的這個(gè)線程一定就是我們需要喚醒的線程
代碼示例:
/** *類說明:快遞實(shí)體類 */ public class Express { public final static String CITY = "ShangHai"; private int km;/*快遞運(yùn)輸里程數(shù)*/ private String site;/*快遞到達(dá)地點(diǎn)*/ public Express() { } public Express(int km, String site) { this.km = km; this.site = site; } /* 變化公里數(shù),然后通知處于wait狀態(tài)并需要處理公里數(shù)的線程進(jìn)行業(yè)務(wù)處理*/ public synchronized void changeKm(){ this.km = 101; notify(); } /* 變化地點(diǎn),然后通知處于wait狀態(tài)并需要處理地點(diǎn)的線程進(jìn)行業(yè)務(wù)處理*/ public synchronized void changeSite(){ this.site = "BeiJing"; notifyAll(); } /*線程等待公里的變化*/ public synchronized void waitKm(){ while(this.km<100){ try { wait(); System.out.println("Check Site thread[" +Thread.currentThread().getId() +"] is be notified"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the Km is "+this.km+",I will change db"); } /*線程等待目的地的變化*/ public synchronized void waitSite(){ while(this.site.equals(CITY)){//快遞到達(dá)目的地 try { wait(); System.out.println("Check Site thread["+Thread.currentThread().getId() +"] is be notified"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the site is "+this.site+",I will call user"); } }
/** *類說明:測試wait/notify/notifyAll */ public class TestWN { private static Express express = new Express(0,Express.CITY); /*檢查里程數(shù)變化的線程,不滿足條件,線程一直等待*/ private static class CheckKm extends Thread{ @Override public void run() { express.waitKm(); } } /*檢查地點(diǎn)變化的線程,不滿足條件,線程一直等待*/ private static class CheckSite extends Thread{ @Override public void run() { express.waitSite(); } } public static void main(String[] args) throws InterruptedException { for(int i=0;i<3;i++){ new CheckSite().start(); } for(int i=0;i<3;i++){ new CheckKm().start(); } Thread.sleep(1000); express.changeKm();//快遞地點(diǎn)變化 } }
等待超時(shí)模式實(shí)現(xiàn)一個(gè)連接池
調(diào)用場景:調(diào)用一個(gè)方法時(shí)等待一段時(shí)間(一般來說是給定一個(gè)時(shí)間段),如果該方法能夠在給定的時(shí)間段之內(nèi)得到結(jié)果,那么將結(jié)果立刻返回,反之,超時(shí)返回默認(rèn)結(jié)果 假設(shè)等待時(shí)間段是 T,那么可以推斷出在當(dāng)前時(shí)間 now+T 之后就會(huì)超時(shí) 等待持續(xù)時(shí)間:REMAINING=T。 超時(shí)時(shí)間:FUTURE=now+T。
/** *類說明:連接池的實(shí)現(xiàn) */ public class DBPool { /*容器,存放連接*/ private static LinkedList<Connection> pool = new LinkedList<Connection>(); /*限制了池的大小=20*/ public DBPool(int initialSize) { if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } } /*釋放連接,通知其他的等待連接的線程*/ public void releaseConnection(Connection connection) { if (connection != null) { synchronized (pool){ pool.addLast(connection); //通知其他等待連接的線程 pool.notifyAll(); } } } /*獲取*/ // 在mills內(nèi)無法獲取到連接,將會(huì)返回null 1S public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ //永不超時(shí) if(mills<=0){ while(pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else{ /*超時(shí)時(shí)刻*/ long future = System.currentTimeMillis()+mills; /*等待時(shí)長*/ long remaining = mills; while(pool.isEmpty()&&remaining>0){ pool.wait(remaining); /*喚醒一次,重新計(jì)算等待時(shí)長*/ remaining = future-System.currentTimeMillis(); } Connection connection = null; if(!pool.isEmpty()){ connection = pool.removeFirst(); } return connection; } } } }
/** *類說明: */ public class DBPoolTest { static DBPool pool = new DBPool(10); // 控制器:控制main線程將會(huì)等待所有Woker結(jié)束后才能繼續(xù)執(zhí)行 static CountDownLatch end; public static void main(String[] args) throws Exception { // 線程數(shù)量 int threadCount = 50; end = new CountDownLatch(threadCount); int count = 20;//每個(gè)線程的操作次數(shù) AtomicInteger got = new AtomicInteger();//計(jì)數(shù)器:統(tǒng)計(jì)可以拿到連接的線程 AtomicInteger notGot = new AtomicInteger();//計(jì)數(shù)器:統(tǒng)計(jì)沒有拿到連接的線程 for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new Worker(count, got, notGot), "worker_"+i); thread.start(); } end.await();// main線程在此處等待 System.out.println("總共嘗試了: " + (threadCount * count)); System.out.println("拿到連接的次數(shù): " + got); System.out.println("沒能連接的次數(shù): " + notGot); } static class Worker implements Runnable { int count; AtomicInteger got; AtomicInteger notGot; public Worker(int count, AtomicInteger got, AtomicInteger notGot) { this.count = count; this.got = got; this.notGot = notGot; } public void run() { while (count > 0) { try { // 從線程池中獲取連接,如果1000ms內(nèi)無法獲取到,將會(huì)返回null // 分別統(tǒng)計(jì)連接獲取的數(shù)量got和未獲取到的數(shù)量notGot Connection connection = pool.fetchConnection(1000); if (connection != null) { try { connection.createStatement(); // PreparedStatement preparedStatement // = connection.prepareStatement(""); // preparedStatement.execute(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } } else { notGot.incrementAndGet(); System.out.println(Thread.currentThread().getName() +"等待超時(shí)!"); } } catch (Exception ex) { } finally { count--; } } end.countDown(); } } }
/** *類說明: */ public class SqlConnectImpl implements Connection{ /*拿一個(gè)數(shù)據(jù)庫連接*/ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } .........
客戶端獲取連接的過程被設(shè)定為等待超時(shí)的模式,也就是在 1000 毫秒內(nèi)如果無法獲取到可用連接,將會(huì)返回給客戶端一個(gè) null。設(shè)定連接池的大小為 10個(gè),然后通過調(diào)節(jié)客戶端的線程數(shù)來模擬無法獲取連接的場景。 它通過構(gòu)造函數(shù)初始化連接的最大上限,通過一個(gè)雙向隊(duì)列來維護(hù)連接,調(diào)用方需要先調(diào)用 fetchConnection(long)方法來指定在多少毫秒內(nèi)超時(shí)獲取連接,當(dāng)連接使用完成后,需要調(diào)用 releaseConnection(Connection)方法將連接放回線程池
面試題
調(diào)用 yield() 、sleep()、wait()、notify()等方法對鎖有何影響?
yield() 、sleep()被調(diào)用后,都不會(huì)釋放當(dāng)前線程所持有的鎖。
調(diào)用 wait()方法后,會(huì)釋放當(dāng)前線程持有的鎖,而且當(dāng)前被喚醒后,會(huì)重新去競爭鎖,鎖競爭到后才會(huì)執(zhí)行 wait 方法后面的代碼。 調(diào)用 notify()系列方法后,對鎖無影響,線程只有在 syn 同步代碼執(zhí)行完后才會(huì)自然而然的釋放鎖,所以 notify()系列方法一般都是 syn 同步代碼的最后一行。
到此這篇關(guān)于Java線程間共享與協(xié)作詳細(xì)介紹的文章就介紹到這了,更多相關(guān)Java共享與協(xié)作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決@Api注解不展示controller內(nèi)容的問題
這篇文章主要介紹了解決@Api注解不展示controller內(nèi)容的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01Java通過正則表達(dá)式獲取字符串中數(shù)字的方法示例
最近工作中遇到了一個(gè)需求,需要利用java獲取字符串中的數(shù)字,嘗試幾種方法后發(fā)現(xiàn)利用正則表達(dá)式實(shí)現(xiàn)最為方法,下面這篇文章就主要介紹了Java通過正則表達(dá)式獲取字符串中數(shù)字的方法,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考下。2017-03-03Spring Boot定時(shí)+多線程執(zhí)行過程解析
這篇文章主要介紹了Spring Boot定時(shí)+多線程執(zhí)行過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01解決springboot整合cxf-jaxrs中json轉(zhuǎn)換的問題
這篇文章主要介紹了解決springboot整合cxf-jaxrs中json轉(zhuǎn)換的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Java下SpringBoot創(chuàng)建定時(shí)任務(wù)詳解
這篇文章主要介紹了Java下SpringBoot創(chuàng)建定時(shí)任務(wù)詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07