Java實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題與讀者寫者問題詳解
1、生產(chǎn)者消費(fèi)者問題
生產(chǎn)者消費(fèi)者問題是研究多線程程序時(shí)繞不開的經(jīng)典問題之一,它描述是有一塊緩沖區(qū)作為倉(cāng)庫(kù),生產(chǎn)者可以將產(chǎn)品放入倉(cāng)庫(kù),消費(fèi)者則可以從倉(cāng)庫(kù)中取走產(chǎn)品。解決生產(chǎn)者/消費(fèi)者問題的方法可分為兩類:(1)采用某種機(jī)制保護(hù)生產(chǎn)者和消費(fèi)者之間的同步;(2)在生產(chǎn)者和消費(fèi)者之間建立一個(gè)管道。第一種方式有較高的效率,并且易于實(shí)現(xiàn),代碼的可控制性較好,屬于常用的模式。第二種管道緩沖區(qū)不易控制,被傳輸數(shù)據(jù)對(duì)象不易于封裝等,實(shí)用性不強(qiáng)。
同步問題核心在于:如何保證同一資源被多個(gè)線程并發(fā)訪問時(shí)的完整性。常用的同步方法是采用信號(hào)或加鎖機(jī)制,保證資源在任意時(shí)刻至多被一個(gè)線程訪問。Java語言在多線程編程上實(shí)現(xiàn)了完全對(duì)象化,提供了對(duì)同步機(jī)制的良好支持。在Java中一共有五種方法支持同步,其中前四個(gè)是同步方法,一個(gè)是管道方法。
wait() / notify()方法
await() / signal()方法
BlockingQueue阻塞隊(duì)列方法
Semaphore方法
PipedInputStream / PipedOutputStream
1.1 wait() / notify()方法
wait() / nofity()方法是基類Object的兩個(gè)方法,也就意味著所有Java類都會(huì)擁有這兩個(gè)方法,這樣,我們就可以為任何對(duì)象實(shí)現(xiàn)同步機(jī)制。
wait()方法:當(dāng)緩沖區(qū)已滿/空時(shí),生產(chǎn)者/消費(fèi)者線程停止自己的執(zhí)行,放棄鎖,使自己處于等等狀態(tài),讓其他線程執(zhí)行。
notify()方法:當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí),向其他等待的線程發(fā)出可執(zhí)行的通知,同時(shí)放棄鎖,使自己處于等待狀態(tài)。
各起了4個(gè)生產(chǎn)者,4個(gè)消費(fèi)者
package test; public class Hosee { private static Integer count = 0; private final Integer FULL = 10; private static String LOCK = "LOCK"; class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } synchronized (LOCK) { while (count == FULL) { try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有" + count); LOCK.notifyAll(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } synchronized (LOCK) { while (count == 0) { try { LOCK.wait(); } catch (Exception e) { TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有" + count); LOCK.notifyAll(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
(需要注意的是,用什么加鎖就用什么notify和wait,實(shí)例中使用的是LOCK)
部分打印結(jié)果:
由于生產(chǎn)者和消費(fèi)者說明一致,所以最多都是在2左右,當(dāng)減少一個(gè)消費(fèi)者時(shí),則會(huì)加到10。
1.2 await() / signal()方法
首先,我們先來看看await()/signal()與wait()/notify()的區(qū)別:
wait()和notify()必須在synchronized的代碼塊中使用 因?yàn)橹挥性讷@取當(dāng)前對(duì)象的鎖時(shí)才能進(jìn)行這兩個(gè)操作 否則會(huì)報(bào)異常 而await()和signal()一般與Lock()配合使用。
wait是Object的方法,而await只有部分類有,如Condition。
await()/signal()和新引入的鎖定機(jī)制Lock直接掛鉤,具有更大的靈活性。
那么為什么有了synchronized還要提出Lock呢?
1.2.1 對(duì)synchronized的改進(jìn)
synchronized并不完美,它有一些功能性的限制 —— 它無法中斷一個(gè)正在等候獲得鎖的線程,也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖。同步還要求鎖的釋放只能在與獲得鎖所在的堆棧幀相同的堆棧幀中進(jìn)行,多數(shù)情況下,這沒問題(而且與異常處理交互得很好),但是,確實(shí)存在一些非塊結(jié)構(gòu)的鎖定更合適的情況。
1.2.2 ReentrantLock 類
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個(gè)抽象,它允許把鎖定的實(shí)現(xiàn)作為 Java 類,而不是作為語言的特性來實(shí)現(xiàn)(更加面向?qū)ο?。這就為 Lock 的多種實(shí)現(xiàn)留下了空間,各種實(shí)現(xiàn)可能有不同的調(diào)度算法、性能特性或者鎖定語義。 ReentrantLock 類實(shí)現(xiàn)了 Lock ,它擁有與 synchronized 相同的并發(fā)性和內(nèi)存語義,但是添加了類似鎖投票、定時(shí)鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭(zhēng)用情況下更佳的性能。(換句話說,當(dāng)許多線程都想訪問共享資源時(shí),JVM 可以花更少的時(shí)候來調(diào)度線程,把更多時(shí)間用在執(zhí)行線程上。)
reentrant 鎖意味著什么呢?簡(jiǎn)單來說,它有一個(gè)與鎖相關(guān)的獲取計(jì)數(shù)器,如果擁有鎖的某個(gè)線程再次得到鎖,那么獲取計(jì)數(shù)器就加1,然后鎖需要被釋放兩次才能獲得真正釋放(重入鎖)。這模仿了 synchronized 的語義;如果線程進(jìn)入由線程已經(jīng)擁有的監(jiān)控器保護(hù)的 synchronized 塊,就允許線程繼續(xù)進(jìn)行,當(dāng)線程退出第二個(gè)(或者后續(xù)) synchronized 塊的時(shí)候,不釋放鎖,只有線程退出它進(jìn)入的監(jiān)控器保護(hù)的第一個(gè)synchronized 塊時(shí),才釋放鎖。
簡(jiǎn)單解釋下重入鎖:
public class Child extends Father implements Runnable{ final static Child child = new Child();//為了保證鎖唯一 public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start(); } } public synchronized void doSomething() { System.out.println("1child.doSomething()"); doAnotherThing(); // 調(diào)用自己類中其他的synchronized方法 } private synchronized void doAnotherThing() { super.doSomething(); // 調(diào)用父類的synchronized方法 System.out.println("3child.doAnotherThing()"); } @Override public void run() { child.doSomething(); } } class Father { public synchronized void doSomething() { System.out.println("2father.doSomething()"); } }
上述代碼的鎖都是child對(duì)象,當(dāng)執(zhí)行child.doSomething時(shí),該線程獲得child對(duì)象的鎖,在doSomething方法內(nèi)執(zhí)行doAnotherThing時(shí)再次請(qǐng)求child對(duì)象的鎖,因?yàn)閟ynchronized是重入鎖,所以可以得到該鎖,繼續(xù)在doAnotherThing里執(zhí)行父類的doSomething方法時(shí)第三次請(qǐng)求child對(duì)象的鎖,同理可得到,如果不是重入鎖的話,那這后面這兩次請(qǐng)求鎖將會(huì)被一直阻塞,從而導(dǎo)致死鎖。
在查看下面代碼示例時(shí),可以看到 Lock 和 synchronized 有一點(diǎn)明顯的區(qū)別 —— lock 必須在 finally 塊中釋放。否則,如果受保護(hù)的代碼將拋出異常,鎖就有可能永遠(yuǎn)得不到釋放!這一點(diǎn)區(qū)別看起來可能沒什么,但是實(shí)際上,它極為重要。忘記在 finally 塊中釋放鎖,可能會(huì)在程序中留下一個(gè)定時(shí)炸彈,當(dāng)有一天炸彈爆炸時(shí),您要花費(fèi)很大力氣才有找到源頭在哪。而使用同步,JVM 將確保鎖會(huì)獲得自動(dòng)釋放。
Lock lock = new ReentrantLock(); lock.lock(); try { // update object state } finally { lock.unlock(); }
除此之外,與目前的 synchronized 實(shí)現(xiàn)相比,爭(zhēng)用下的 ReentrantLock 實(shí)現(xiàn)更具可伸縮性。(在未來的 JVM 版本中,synchronized 的爭(zhēng)用性能很有可能會(huì)獲得提高。)這意味著當(dāng)許多線程都在爭(zhēng)用同一個(gè)鎖時(shí),使用 ReentrantLock 的總體開支通常要比 synchronized 少得多。
1.2.3 什么時(shí)候選擇用 ReentrantLock 代替 synchronized
在 Java1.5 中,synchronized 是性能低效的。因?yàn)檫@是一個(gè)重量級(jí)操作,需要調(diào)用操作接口,導(dǎo)致有可能加鎖消耗的系統(tǒng)時(shí)間比加鎖以外的操作還多。相比之下使用 Java 提供的 Lock 對(duì)象,性能更高一些。但是到了 Java1.6,發(fā)生了變化。synchronized 在語義上很清晰,可以進(jìn)行很多優(yōu)化,有適應(yīng)自旋,鎖消除,鎖粗化,輕量級(jí)鎖,偏向鎖等等。導(dǎo)致在 Java1.6 上 synchronized 的性能并不比 Lock 差。官方也表示,他們也更支持 synchronized,在未來的版本中還有優(yōu)化余地。
所以在確實(shí)需要一些 synchronized 所沒有的特性的時(shí)候,比如時(shí)間鎖等候、可中斷鎖等候、無塊結(jié)構(gòu)鎖、多個(gè)條件變量或者鎖投票使用ReentrantLock。ReentrantLock 還具有可伸縮性的好處,應(yīng)當(dāng)在高度爭(zhēng)用的情況下使用它,但是請(qǐng)記住,大多數(shù) synchronized 塊幾乎從來沒有出現(xiàn)過爭(zhēng)用,所以可以把高度爭(zhēng)用放在一邊。我建議用 synchronized 開發(fā),直到確實(shí)證明 synchronized 不合適,而不要僅僅是假設(shè)如果使用 ReentrantLock “性能會(huì)更好”。請(qǐng)記住,這些是供高級(jí)用戶使用的高級(jí)工具。(而且,真正的高級(jí)用戶喜歡選擇能夠找到的最簡(jiǎn)單工具,直到他們認(rèn)為簡(jiǎn)單的工具不適用為止。)。一如既往,首先要把事情做好,然后再考慮是不是有必要做得更快。
1.2.4 接下來我們使用ReentrantLock來實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題
package test; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Hosee { private static Integer count = 0; private final Integer FULL = 10; final Lock lock = new ReentrantLock(); final Condition NotFull = lock.newCondition(); final Condition NotEmpty = lock.newCondition(); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } lock.lock(); try { while (count == FULL) { try { NotFull.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有" + count); NotEmpty.signal(); } finally { lock.unlock(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } lock.lock(); try { while (count == 0) { try { NotEmpty.await(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有" + count); NotFull.signal(); } finally { lock.unlock(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
運(yùn)行結(jié)果與第一個(gè)類似。上述代碼用了兩個(gè)Condition,其實(shí)用一個(gè)也是可以的,只不過要signalall()。
1.3 BlockingQueue阻塞隊(duì)列方法
BlockingQueue是JDK5.0的新增內(nèi)容,它是一個(gè)已經(jīng)在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列,實(shí)現(xiàn)方式采用的是我們第2種await() / signal()方法。它可以在生成對(duì)象時(shí)指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:類似于我們上面的生產(chǎn)者線程,容量達(dá)到最大時(shí),自動(dòng)阻塞。
take()方法:類似于我們上面的消費(fèi)者線程,容量為0時(shí),自動(dòng)阻塞。
package test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Hosee { private static Integer count = 0; final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { bq.put(1); count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有" + count); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { bq.take(); count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有" + count); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
其實(shí)這個(gè)BlockingQueue比較難用代碼來演示,因?yàn)閜ut()與take()方法無法與輸出語句保證同步,當(dāng)然你可以自己去實(shí)現(xiàn) BlockingQueue(BlockingQueue是用await()/signal() 實(shí)現(xiàn)的)。所以在輸出結(jié)果上你會(huì)發(fā)現(xiàn)不匹配。
例如:當(dāng)緩沖區(qū)已滿,生產(chǎn)者在put()操作時(shí),put()內(nèi)部調(diào)用了await()方法,放棄了線程的執(zhí)行,然后消費(fèi)者線程執(zhí)行,調(diào)用take()方法,take()內(nèi)部調(diào)用了signal()方法,通知生產(chǎn)者線程可以執(zhí)行,致使在消費(fèi)者的println()還沒運(yùn)行的情況下生產(chǎn)者的println()先被執(zhí)行,所以有了輸出不匹配的情況。
對(duì)于BlockingQueue大家可以放心使用,這可不是它的問題,只是在它和別的對(duì)象之間的同步有問題。
1.4 Semaphore方法
Semaphore 信號(hào)量,就是一個(gè)允許實(shí)現(xiàn)設(shè)置好的令牌。也許有1個(gè),也許有10個(gè)或更多。
誰拿到令牌(acquire)就可以去執(zhí)行了,如果沒有令牌則需要等待。
執(zhí)行完畢,一定要?dú)w還(release)令牌,否則令牌會(huì)被很快用光,別的線程就無法獲得令牌而執(zhí)行下去了。
package test; import java.util.concurrent.Semaphore; public class Hosee { int count = 0; final Semaphore notFull = new Semaphore(10); final Semaphore notEmpty = new Semaphore(0); final Semaphore mutex = new Semaphore(1); class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } try { notFull.acquire();//順序不能顛倒,否則會(huì)造成死鎖。 mutex.acquire(); count++; System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn),目前總共有" + count); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notEmpty.release(); } } } } class Consumer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { Thread.sleep(3000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { notEmpty.acquire();//順序不能顛倒,否則會(huì)造成死鎖。 mutex.acquire(); count--; System.out.println(Thread.currentThread().getName() + "消費(fèi)者消費(fèi),目前總共有" + count); } catch (Exception e) { e.printStackTrace(); } finally { mutex.release(); notFull.release(); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
注意notFull.acquire()與mutex.acquire()的位置不能互換,如果先得到互斥鎖再發(fā)生等待,會(huì)造成死鎖。
1.5 PipedInputStream / PipedOutputStream
這個(gè)類位于java.io包中,是解決同步問題的最簡(jiǎn)單的辦法,一個(gè)線程將數(shù)據(jù)寫入管道,另一個(gè)線程從管道讀取數(shù)據(jù),這樣便構(gòu)成了一種生產(chǎn)者/消費(fèi)者的緩沖區(qū)編程模式。PipedInputStream/PipedOutputStream只能用于多線程模式,用于單線程下可能會(huì)引發(fā)死鎖。
package test; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class Hosee { final PipedInputStream pis = new PipedInputStream(); final PipedOutputStream pos = new PipedOutputStream(); { try { pis.connect(pos); } catch (IOException e) { e.printStackTrace(); } } class Producer implements Runnable { @Override public void run() { try{ while(true){ int b = (int) (Math.random() * 255); System.out.println("Producer: a byte, the value is " + b); pos.write(b); pos.flush(); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } } class Consumer implements Runnable { @Override public void run() { try{ while(true){ int b = pis.read(); System.out.println("Consumer: a byte, the value is " + String.valueOf(b)); } }catch(Exception e){ e.printStackTrace(); }finally{ try{ pos.close(); pis.close(); }catch(IOException e){ System.out.println(e); } } } } public static void main(String[] args) throws Exception { Hosee hosee = new Hosee(); new Thread(hosee.new Producer()).start(); new Thread(hosee.new Consumer()).start(); } }
與阻塞隊(duì)列一樣,由于read()/write()方法與輸出方法不一定同步,輸出結(jié)果方面會(huì)發(fā)生不匹配現(xiàn)象,為了使結(jié)果更加明顯,這里只有1個(gè)消費(fèi)者和1個(gè)生產(chǎn)者。
2、讀者寫者問題
讀者—寫者問題(Readers-Writers problem)也是一個(gè)經(jīng)典的并發(fā)程序設(shè)計(jì)問題,是經(jīng)常出現(xiàn)的一種同步問題。計(jì)算機(jī)系統(tǒng)中的數(shù)據(jù)(文件、記錄)常被多個(gè)進(jìn)程共享,但其中某些進(jìn)程可能只要求讀數(shù)據(jù)(稱為讀者Reader);另一些進(jìn)程則要求修改數(shù)據(jù)(稱為寫者Writer)。就共享數(shù)據(jù)而言,Reader和Writer是兩組并發(fā)進(jìn)程共享一組數(shù)據(jù)區(qū),要求:
(1)允許多個(gè)讀者同時(shí)執(zhí)行讀操作;
(2)不允許讀者、寫者同時(shí)操作;
(3)不允許多個(gè)寫者同時(shí)操作。
Reader和Writer的同步問題分為讀者優(yōu)先、弱寫者優(yōu)先(公平競(jìng)爭(zhēng))和強(qiáng)寫者優(yōu)先三種情況,它們的處理方式不同。
首先我們都只考慮公平競(jìng)爭(zhēng)的情況下,看看Java有哪些方法可以實(shí)現(xiàn)讀者寫者問題
2.1 讀寫鎖
ReentrantReadWriteLock會(huì)使用兩把鎖來解決問題,一個(gè)讀鎖,一個(gè)寫鎖
線程進(jìn)入讀鎖的前提條件:
沒有其他線程的寫鎖,
沒有寫請(qǐng)求或者有寫請(qǐng)求,但調(diào)用線程和持有鎖的線程是同一個(gè)
線程進(jìn)入寫鎖的前提條件:
沒有其他線程的讀鎖
沒有其他線程的寫鎖
到ReentrantReadWriteLock,首先要做的是與ReentrantLock劃清界限。它和后者都是單獨(dú)的實(shí)現(xiàn),彼此之間沒有繼承或?qū)崿F(xiàn)的關(guān)系。然后就是總結(jié)這個(gè)鎖機(jī)制的特性了:
重入(在上文ReentrantLock處已經(jīng)介紹了)方面其內(nèi)部的WriteLock可以獲取ReadLock,但是反過來ReadLock想要獲得WriteLock則永遠(yuǎn)都不要想。
WriteLock可以降級(jí)為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然后釋放WriteLock,這時(shí)候線程將保持Readlock的持有。反過來ReadLock想要升級(jí)為WriteLock則不可能,為什么?參看(1),呵呵.
ReadLock可以被多個(gè)線程持有并且在作用時(shí)排斥任何的WriteLock,而WriteLock則是完全的互斥。這一特性最為重要,因?yàn)閷?duì)于高讀取頻率而相對(duì)較低寫入的數(shù)據(jù)結(jié)構(gòu),使用此類鎖同步機(jī)制則可以提高并發(fā)量。
不管是ReadLock還是WriteLock都支持Interrupt,語義與ReentrantLock一致。
WriteLock支持Condition并且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。
看下ReentrantReadWriteLock這個(gè)類的兩個(gè)構(gòu)造函數(shù)
public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
fair這個(gè)參數(shù)表示是否是創(chuàng)建一個(gè)公平的讀寫鎖,還是非公平的讀寫鎖。也就是搶占式還是非搶占式。
公平和非公平:公平表示獲取的鎖的順序是按照線程加鎖的順序來分配獲取到鎖的線程時(shí)最先加鎖的線程,是按照FIFO的順序來分配鎖的;非公平表示獲取鎖的順序是無需的,后來加鎖的線程可能先獲得鎖,這種情況就導(dǎo)致某些線程可能一直沒獲取到鎖。
公平鎖為啥會(huì)影響性能,從code上來看看公平鎖僅僅是多了一項(xiàng)檢查是否在隊(duì)首會(huì)影響性能,如不是,那么又是在什么地方影響的?假如是闖入的線程,會(huì)排在隊(duì)尾并睡覺(parking)等待前任節(jié)點(diǎn)喚醒,這樣勢(shì)必會(huì)比非公平鎖添加很多paking和unparking的操作
一般的應(yīng)用場(chǎng)景是: 如果有多個(gè)讀線程,一個(gè)寫線程,而且寫線程在操作的時(shí)候需要阻塞讀線程,那么此時(shí)就需要使用公平鎖,要不然可能寫線程一直獲取不到鎖,導(dǎo)致線程餓死。
再簡(jiǎn)單說下鎖降級(jí)
重入還允許從寫入鎖降級(jí)為讀取鎖,其實(shí)現(xiàn)方式是:先獲取寫入鎖,然后獲取讀取鎖,最后釋放寫入鎖。但是,從讀取鎖升級(jí)到寫入鎖是不可能的。
rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); if (!cacheValid) { data = ... cacheValid = true; } rwl.readLock().lock(); rwl.writeLock().unlock(); // 降級(jí):先獲取讀鎖再釋放寫鎖 }
下面我們用讀寫鎖來實(shí)現(xiàn)讀者寫者問題
import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.get(); } } }.start(); } for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3 { private Object data = null;// 共享數(shù)據(jù),只能有一個(gè)線程能寫該數(shù)據(jù),但可以有多個(gè)線程同時(shí)讀該數(shù)據(jù)。 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get() { rwl.readLock().lock();// 上讀鎖,其他線程只能讀不能寫 System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); rwl.readLock().unlock(); // 釋放讀鎖,最好放在finnaly里面 } public void put(Object data) { rwl.writeLock().lock();// 上寫鎖,不允許其他線程讀也不允許寫 System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); rwl.writeLock().unlock();// 釋放寫鎖 } }
運(yùn)行結(jié)果:
Thread-0 be ready to read data! Thread-1 be ready to read data! Thread-2 be ready to read data! Thread-0have read data :null Thread-2have read data :null Thread-1have read data :null Thread-5 be ready to write data! Thread-5 have write data: 6934 Thread-5 be ready to write data! Thread-5 have write data: 8987 Thread-5 be ready to write data! Thread-5 have write data: 8496
2.2 Semaphore信號(hào)量
在1.4中已經(jīng)介紹了用信號(hào)量來實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題,現(xiàn)在我們將用信號(hào)量來實(shí)現(xiàn)讀者寫者問題,信號(hào)量的相關(guān)知識(shí)不再重復(fù),直接看代碼
package test; import java.util.Random; import java.util.concurrent.Semaphore; public class ReadWrite { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } q3.get(); } } }.start(); } for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3 { private Object data = null;// 共享數(shù)據(jù),只能有一個(gè)線程能寫該數(shù)據(jù),但可以有多個(gè)線程同時(shí)讀該數(shù)據(jù)。 private Semaphore wmutex = new Semaphore(1); private Semaphore rmutex = new Semaphore(2); private int count = 0; public void get() { try { rmutex.acquire(); if (count == 0) wmutex.acquire();// 當(dāng)?shù)谝蛔x進(jìn)程欲讀數(shù)據(jù)庫(kù)時(shí),阻止寫進(jìn)程寫 count++; System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); count--; if (count == 0) wmutex.release(); rmutex.release(); } catch (Exception e) { e.printStackTrace(); } } public void put(Object data) { try { wmutex.acquire(); System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } catch (Exception e) { e.printStackTrace(); } finally { wmutex.release(); } } }
單純使用信號(hào)量不能解決讀者與寫者問題,必須引入計(jì)數(shù)器count(可以用CountDownLatch代替 )對(duì)讀進(jìn)程計(jì)數(shù); count與wmutex結(jié)合使用,使讀讀能同時(shí)進(jìn)行,讀寫排斥。count為0時(shí)表示讀進(jìn)程開始,此時(shí)寫進(jìn)程阻塞(wmutex被讀進(jìn)程獲?。?,當(dāng)count不為0時(shí),表示有多個(gè)讀進(jìn)程,就不用操作 wmutex了,因?yàn)榈谝粋€(gè)讀進(jìn)程已經(jīng)獲得了 wmutex。count表示有多少個(gè)讀進(jìn)程在讀,每次有一個(gè)就+1,讀完了-1,當(dāng)count==0時(shí),表示讀進(jìn)程都結(jié)束了。此時(shí) wmutex釋放,寫進(jìn)程才有機(jī)會(huì)獲得wmutex。為了使讀進(jìn)程不要一直占有 wmutex,最好讓讀進(jìn)程sleep一下,讓寫進(jìn)程有機(jī)會(huì)獲得wmutex,使效果更明顯。
總結(jié):
以上就是本文關(guān)于Java實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題與讀者寫者問題詳解的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站:java并發(fā)學(xué)習(xí)之BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者詳解、Java多線程之線程通信生產(chǎn)者消費(fèi)者模式及等待喚醒機(jī)制代碼詳解等,有什么問題可以隨時(shí)留言,小編會(huì)及時(shí)回復(fù)大家的。感謝朋友們對(duì)本站的支持!
- Java實(shí)現(xiàn)Kafka生產(chǎn)者消費(fèi)者代碼實(shí)例
- Java基于Lock的生產(chǎn)者消費(fèi)者模型示例
- kafka生產(chǎn)者和消費(fèi)者的javaAPI的示例代碼
- 基于Java 生產(chǎn)者消費(fèi)者模式(詳細(xì)分析)
- Java多線程之線程通信生產(chǎn)者消費(fèi)者模式及等待喚醒機(jī)制代碼詳解
- JAVA多線程實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的實(shí)例詳解
- Java 生產(chǎn)者/消費(fèi)者問題實(shí)例詳解
- Java多種方式實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
相關(guān)文章
@SpringBootTest 注解報(bào)紅問題及解決
這篇文章主要介紹了@SpringBootTest 注解報(bào)紅問題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11java中加密的實(shí)現(xiàn)方法(MD5,MD2,SHA)
這篇文章主要介紹了java中加密的實(shí)現(xiàn)方法(MD5,MD2,SHA)的相關(guān)資料,這里提供三種實(shí)現(xiàn)加密的方法,大家可以對(duì)比一下,需要的朋友可以參考下2017-08-08