如何使用JCTools實現(xiàn)Java并發(fā)程序
概述
在本文中,我們將介紹JCTools(Java并發(fā)工具)庫。
簡單地說,這提供了許多適用于多線程環(huán)境的實用數(shù)據(jù)結(jié)構(gòu)。
非阻塞算法
傳統(tǒng)上,在可變共享狀態(tài)下工作的多線程代碼使用鎖來確保數(shù)據(jù)一致性和發(fā)布(一個線程所做的更改對另一個線程可見)。
這種方法有許多缺點:
- 線程在試圖獲取鎖時可能會被阻塞,在另一個線程的操作完成之前不會取得任何進(jìn)展—這有效地防止了并行性
- 鎖爭用越重,JVM處理調(diào)度線程、管理爭用和等待線程隊列的時間就越多,實際工作就越少
- 如果涉及多個鎖,并且它們以錯誤的順序獲取/釋放,則可能出現(xiàn)死鎖
- 優(yōu)先級反轉(zhuǎn)的危險是可能的——高優(yōu)先級線程被鎖定,試圖獲得由低優(yōu)先級線程持有的鎖
- 大多數(shù)情況下,使用粗粒度鎖會嚴(yán)重?fù)p害并行性—細(xì)粒度鎖需要更仔細(xì)的設(shè)計,增加鎖開銷,并且更容易出錯
另一種方法是使用非阻塞算法,即任何線程的故障或掛起都不會導(dǎo)致另一個線程的故障或掛起的算法。
如果所涉及的線程中至少有一個能夠在任意時間段內(nèi)取得進(jìn)展,即在處理過程中不會出現(xiàn)死鎖,則非阻塞算法是無鎖的。
此外,如果保證每個線程的進(jìn)程,這些算法是無等待的。
下面是一個非阻塞堆棧示例,它定義了基本狀態(tài):
public class ConcurrentStack<E> { AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); private static class Node <E> { public E item; public Node<E> next; // standard constructor } }
還有一些API方法:
public void push(E item){ Node<E> newHead = new Node<E>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }
我們可以看到,該算法使用細(xì)粒度比較和交換(CAS)指令,并且是無鎖的(即使多個線程調(diào)用top.compareAndSet()同時,它們中的一個保證會成功)但不能無等待,因為不能保證CAS最終會對任何特定線程成功。
依賴
首先,讓我們將JCTools依賴項添加到pom.xml文件:
<dependency> <groupId>org.jctools</groupId> <artifactId>jctools-core</artifactId> <version>2.1.2</version> </dependency>
請注意,Maven Central上提供了最新的可用版本。
JCTools隊列
該庫提供了許多隊列以在多線程環(huán)境中使用,即一個或多個線程以線程安全的無鎖方式寫入隊列,一個或多個線程以線程安全的無鎖方式從隊列中讀取。
所有隊列實現(xiàn)的通用接口是org.jctools.queues.MessagePassingQueue。
隊列類型
所有隊列都可以根據(jù)其生產(chǎn)者/消費者策略進(jìn)行分類:
- 單個生產(chǎn)者,單個消費者–此類類使用前綴Spsc命名,例如SpscArrayQueue
- 單個生產(chǎn)者,多個消費者–使用Spmc前綴,例如SpmcArrayQueue
- 多個生產(chǎn)者,單個消費者-使用Mpsc前綴,例如MpscArrayQueue
- 多個生產(chǎn)者、多個消費者—使用Mpmc前綴,例如MpmcArrayQueue
需要注意的是,在內(nèi)部沒有策略檢查,也就是說,如果使用不正確,隊列可能會無聲地發(fā)生故障。
例如,下面的測試從兩個線程填充單個生產(chǎn)者隊列并通過,即使不能保證使用者看到來自不同生產(chǎn)者的數(shù)據(jù):
SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set<Integer> fromQueue = new HashSet<>(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);
隊列實現(xiàn)
總結(jié)以上分類,以下是JCTools隊列列表:
- SpscArrayQueue–單個生產(chǎn)者,單個消費者,在內(nèi)部使用一個數(shù)組,限制容量
- SpscLinkedQueue–單個生產(chǎn)者,單個消費者,內(nèi)部使用鏈表,未綁定容量
- SpscChunkedArrayQueue–單生產(chǎn)商、單消費者,從初始容量開始,一直增長到最大容量
- SpscGrowableArrayQueue–單生產(chǎn)者、單消費者,從初始容量開始,一直增長到最大容量。這與SpscChunkedArrayQueue是相同的契約,唯一的區(qū)別是內(nèi)部塊管理。建議使用SpscChunkedArrayQueue,因為它有一個簡化的實現(xiàn)
- SpscUnboundedArrayQueue–單個生產(chǎn)者,單個消費者,在內(nèi)部使用數(shù)組,未綁定容量
- SpmcArrayQueue–單個生產(chǎn)者、多個使用者,在內(nèi)部使用一個陣列,限制容量
- MpscArrayQueue—多個生產(chǎn)者、單個消費者在內(nèi)部使用一個陣列,限制容量
- MpscLinkedQueue–多個生產(chǎn)者,單個消費者,在內(nèi)部使用鏈表,未綁定容量
- MpmcArrayQueue—多個生產(chǎn)者、多個消費者在內(nèi)部使用一個陣列,限制容量
原子隊列
前面提到的所有隊列都使用sun.misc.Unsafe. 然而,隨著java9和JEP-260的出現(xiàn),這個API在默認(rèn)情況下變得不可訪問。
因此,有其他隊列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能較差)而不是sun.misc.Unsafe.
它們是從上面的隊列生成的,它們的名稱中間插入了單詞Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。
如果可能,建議使用“常規(guī)”隊列,并且僅在sun.misc.Unsafe像Hot Java9+和JRockit一樣被禁止/無效。
容量
所有JCTools隊列也可能具有最大容量或未綁定。當(dāng)隊列已滿且受容量限制時,它將停止接受新元素。
在以下示例中,我們:
- 填滿隊列
- 確保在此之后停止接受新元素
- 從中排出,并確保之后可以添加更多元素
請注意,為了可讀性,刪除了幾個代碼語句。
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set<Integer> fromQueue = new HashSet<>(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));
其他數(shù)據(jù)結(jié)構(gòu)工具
JCTools還提供了一些非隊列數(shù)據(jù)結(jié)構(gòu)。
它們都列在下面:
- NonBlockingHashMap–一個無鎖的ConcurrentHashMap替代方案,具有更好的伸縮性和通常更低的突變成本。它是實現(xiàn)sun.misc.Unsafe,因此,不建議在Java9+或JRockit環(huán)境中使用此類
- NonBlockingHashMapLong–與NonBlockingHashMap類似,但使用基本長鍵
- NonBlockingHashSet–一個簡單的包裝器,圍繞著像JDK的java.util.Collections.newSetFromMap()一樣的NonBlockingHashMap
- NonBlockingIdentityHashMap–與NonBlockingHashMap類似,但按標(biāo)識比較鍵。
- NonBlockingSetInt–一個多線程位向量集,實現(xiàn)為一個原始long數(shù)組。在無聲自動裝箱的情況下工作無效
性能測試
讓我們使用JMH來比較JDK的ArrayBlockingQueue和JCTools隊列的性能。JMH是Sun/Oracle JVM gurus提供的一個開源微基準(zhǔn)框架,它保護(hù)我們不受編譯器/JVM優(yōu)化算法的不確定性的影響。
請注意,為了提高可讀性,下面的代碼段遺漏了幾個語句。
public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue<Long> queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }
結(jié)果:
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op
我們可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了兩倍。
使用JCTools的缺點
使用JCTools有一個重要的缺點——不可能強制正確使用庫類。例如,考慮在我們的大型成熟項目中開始使用MpscArrayQueue的情況(注意,必須有一個使用者)。
不幸的是,由于項目很大,有可能有人出現(xiàn)編程或配置錯誤,現(xiàn)在從多個線程讀取隊列。這個系統(tǒng)看起來像以前一樣工作,但現(xiàn)在有可能消費者錯過了一些信息。這是一個真正的問題,可能會有很大的影響,是很難調(diào)試。
理想情況下,應(yīng)該可以運行具有特定系統(tǒng)屬性的系統(tǒng),該屬性強制JCTools確保線程訪問策略。例如,本地/測試/暫存環(huán)境(而不是生產(chǎn)環(huán)境)可能已啟用它。遺憾的是,JCTools沒有提供這樣的屬性。
另一個需要考慮的問題是,盡管我們確保JCTools比JDK的對應(yīng)工具快得多,但這并不意味著我們的應(yīng)用程序獲得了與我們開始使用自定義隊列實現(xiàn)時相同的速度。大多數(shù)應(yīng)用程序不會在線程之間交換很多對象,而且大多是I/O綁定的。
結(jié)論
現(xiàn)在,我們對JCTools提供的實用程序類有了基本的了解,并了解了它們在重載下與JDK的對應(yīng)類相比的性能。
總之,只有當(dāng)我們在線程之間交換大量對象時,才有必要使用該庫,即使這樣,也有必要非常小心地保留線程訪問策略。
以上示例的完整源代碼地址:https://github.com/eugenp/tutorials/tree/master/libraries-5
JCTools git地址:https://github.com/JCTools/JCTools
以上就是如何使用JCTools實現(xiàn)Java并發(fā)程序的詳細(xì)內(nèi)容,更多關(guān)于使用JCTools實現(xiàn)Java并發(fā)程序的資料請關(guān)注腳本之家其它相關(guān)文章!
- Java 實現(xiàn)并發(fā)的幾種方式小結(jié)
- Java并發(fā)編程之ConcurrentLinkedQueue源碼詳解
- JAVA并發(fā)中VOLATILE關(guān)鍵字的神奇之處詳解
- Java并發(fā)編程之CountDownLatch源碼解析
- Java并發(fā)編程之線程之間的共享和協(xié)作
- Java并發(fā)編程之Exchanger方法詳解
- java關(guān)于并發(fā)模型中的兩種鎖知識點詳解
- Java并發(fā)編程之ReadWriteLock讀寫鎖的操作方法
- Java并發(fā)(Runnable+Thread)實現(xiàn)硬盤文件搜索功能
- Java高并發(fā)BlockingQueue重要的實現(xiàn)類詳解
- Java基礎(chǔ)之并發(fā)相關(guān)知識總結(jié)
相關(guān)文章
IntelliJ IDEA : .java文件左下角顯示"J"圖標(biāo)的問題
IntelliJ IDEA : .java文件 左下角顯示“J”,并且不能執(zhí)行代碼,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2020-10-10Java guava monitor監(jiān)視器線程的使用詳解
工作中的場景中是否存在類似這樣的場景,需要提交的線程在某個觸發(fā)條件下執(zhí)行。本文主要就是使用guava中的monitor來優(yōu)雅的實現(xiàn)帶監(jiān)視器的線程2021-11-11SpringBoot4.5.2 整合HikariCP 數(shù)據(jù)庫連接池操作
這篇文章主要介紹了SpringBoot4.5.2 整合HikariCP 數(shù)據(jù)庫連接池操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09SpringBoot集成Redis使用Cache緩存的實現(xiàn)方法
SpringBoot通過配置RedisConfig類和使用Cache注解可以輕松集成Redis實現(xiàn)緩存,主要包括@EnableCaching開啟緩存,自定義key生成器,改變序列化規(guī)則,以及配置RedisCacheManager,本文為使用SpringBoot與Redis處理緩存提供了詳實的指導(dǎo)和示例,感興趣的朋友一起看看吧2024-10-10基于kafka實現(xiàn)Spring Cloud Bus消息總線
消息總線是一種通信工具,可以在機(jī)器之間互相傳輸消息、文件等,這篇文章主要介紹了如何利用kafka實現(xiàn)SpringCloud Bus消息總線,感興趣的可以學(xué)習(xí)一下2022-04-04MybatisPlus?BaseMapper?實現(xiàn)對數(shù)據(jù)庫增刪改查源碼
MybatisPlus?是一款在?Mybatis?基礎(chǔ)上進(jìn)行的增強?orm?框架,可以實現(xiàn)不寫?sql?就完成數(shù)據(jù)庫相關(guān)的操作,這篇文章主要介紹了MybatisPlus?BaseMapper?實現(xiàn)對數(shù)據(jù)庫增刪改查源碼解析,需要的朋友可以參考下2023-01-01