Java從同步容器到并發(fā)容器的操作過程
引言
容器是Java基礎(chǔ)類庫中使用頻率最高的一部分,Java集合包中提供了大量的容器類來幫組我們簡化開發(fā),我前面的文章中對Java集合包中的關(guān)鍵容器進(jìn)行過一個(gè)系列的分析,但這些集合類都是非線程安全的,即在多線程的環(huán)境下,都需要其他額外的手段來保證數(shù)據(jù)的正確性,最簡單的就是通過synchronized關(guān)鍵字將所有使用到非線程安全的容器代碼全部同步執(zhí)行。這種方式雖然可以達(dá)到線程安全的目的,但存在幾個(gè)明顯的問題:首先編碼上存在一定的復(fù)雜性,相關(guān)的代碼段都需要添加鎖。其次這種一刀切的做法在高并發(fā)情況下性能并不理想,基本相當(dāng)于串行執(zhí)行。JDK1.5中為我們提供了一系列的并發(fā)容器,集中在java.util.concurrent包下,用來解決這兩個(gè)問題,先從同步容器說起。
同步容器Vector和HashTable
為了簡化代碼開發(fā)的過程,早期的JDK在java.util包中提供了Vector和HashTable兩個(gè)同步容器,這兩個(gè)容器的實(shí)現(xiàn)和早期的ArrayList和HashMap代碼實(shí)現(xiàn)基本一樣,不同在于Vector和HashTable在每個(gè)方法上都添加了synchronized關(guān)鍵字來保證同一個(gè)實(shí)例同時(shí)只有一個(gè)線程能訪問,部分源碼如下:
//Vector
public synchronized int size() {};
public synchronized E get(int index) {};
//HashTable
public synchronized V put(K key, V value) {};
public synchronized V remove(Object key) {};
通過對每個(gè)方法添加synchronized,保證了多次操作的串行。這種方式雖然使用起來方便了,但并沒有解決高并發(fā)下的性能問題,與手動(dòng)鎖住ArrayList和HashMap并沒有什么區(qū)別,不論讀還是寫都會鎖住整個(gè)容器。其次這種方式存在另一個(gè)問題:當(dāng)多個(gè)線程進(jìn)行復(fù)合操作時(shí),是線程不安全的。可以通過下面的代碼來說明這個(gè)問題:
public static void deleteVector(){
int index = vectors.size() - 1;
vectors.remove(index);
}
代碼中對Vector進(jìn)行了兩步操作,首先獲取size,然后移除最后一個(gè)元素,多線程情況下如果兩個(gè)線程交叉執(zhí)行,A線程調(diào)用size后,B線程移除最后一個(gè)元素,這時(shí)A線程繼續(xù)remove將會拋出索引超出的錯(cuò)誤。
那么怎么解決這個(gè)問題呢?最直接的修改方案就是對代碼塊加鎖來防止多線程同時(shí)執(zhí)行:
public static void deleteVector(){
synchronized (vectors) {
int index = vectors.size() - 1;
vectors.remove(index);
}
}
如果上面的問題通過加鎖來解決沒有太直觀的影響,那么來看看對vectors進(jìn)行迭代的情況:
public static void foreachVector(){
synchronized (vectors) {
for (int i = 0; i < vectors.size(); i++) {
System.out.println(vectors.get(i).toString());
}
}
}
為了避免多線程情況下在迭代的過程中其他線程對vectors進(jìn)行了修改,就不得不對整個(gè)迭代過程加鎖,想象這么一個(gè)場景,如果迭代操作非常頻繁,或者vectors元素很大,那么所有的修改和讀取操作將不得不在鎖外等待,這將會對多線程性能造成極大的影響。那么有沒有什么方式能夠很好的對容器的迭代操作和修改操作進(jìn)行分離,在修改時(shí)不影響容器的迭代操作呢?這就需要java.util.concurrent包中的各種并發(fā)容器了出場了。
并發(fā)容器CopyOnWrite
CopyOnWrite--寫時(shí)復(fù)制容器是一種常用的并發(fā)容器,它通過多線程下讀寫分離來達(dá)到提高并發(fā)性能的目的,和前面我們講解StampedLock時(shí)所用的解決方案類似:任何時(shí)候都可以進(jìn)行讀操作,寫操作則需要加鎖。不同的是,在CopyOnWrite中,對容器的修改操作加鎖后,通過copy一個(gè)新的容器來進(jìn)行修改,修改完畢后將容器替換為新的容器即可。
這種方式的好處顯而易見:通過copy一個(gè)新的容器來進(jìn)行修改,這樣讀操作就不需要加鎖,可以并發(fā)讀,因?yàn)樵谧x的過程中是采用的舊的容器,即使新容器做了修改對舊容器也沒有影響,同時(shí)也很好的解決了迭代過程中其他線程修改導(dǎo)致的并發(fā)問題。
JDK中提供的并發(fā)容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面通過CopyOnWriteArrayList的部分源碼來理解這種思想:
//添加元素
public boolean add(E e) {
//獨(dú)占鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//復(fù)制一個(gè)新的數(shù)組newElements
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
//修改后指向新的數(shù)組
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
public E get(int index) {
//未加鎖,直接獲取
return get(getArray(), index);
}
代碼很簡單,在add操作中通過一個(gè)共享的ReentrantLock來獲取鎖,這樣可以防止多線程下多個(gè)線程同時(shí)修改容器內(nèi)容。獲取鎖后通過Arrays.copyOf復(fù)制了一個(gè)新的容器,然后對新的容器進(jìn)行了修改,最后直接通過setArray將原數(shù)組引用指向了新的數(shù)組,避免了在修改過程中迭代數(shù)據(jù)出現(xiàn)錯(cuò)誤。get操作由于是讀操作,未加鎖,直接讀取就行。
CopyOnWriteArraySet類似,這里不做過多講解。
CopyOnWrite容器雖然在多線程下使用是安全的,相比較Vector也大大提高了讀寫的性能,但它也有自身的問題。
首先就是性能,在講解ArrayList的文章中提到過,ArrayList的擴(kuò)容由于使用了Arrays.copyOf每次都需要申請更大的空間以及復(fù)制現(xiàn)有的元素到新的數(shù)組,對性能存在一定影響。CopyOnWrite容器也不例外,每次修改操作都會申請新的數(shù)組空間,然后進(jìn)行替換。所以在高并發(fā)頻繁修改容器的情況下,會不斷申請新的空間,同時(shí)會造成頻繁的GC,這時(shí)使用CopyOnWrite容器并不是一個(gè)好的選擇。
其次還有一個(gè)數(shù)據(jù)一致性問題,由于在修改中copy了新的數(shù)組進(jìn)行替換,同時(shí)舊數(shù)組如果還在被使用,那么新的數(shù)據(jù)就不能被及時(shí)讀取到,這樣就造成了數(shù)據(jù)不一致,如果需要強(qiáng)數(shù)據(jù)一致性,CopyOnWrite容器也不太適合。
并發(fā)容器ConcurrentHashMap
ConcurrentHashMap容器相較于CopyOnWrite容器在并發(fā)加鎖粒度上有了更大一步的優(yōu)化,它通過修改對單個(gè)hash桶元素加鎖的達(dá)到了更細(xì)粒度的并發(fā)控制。在了解ConcurrentHashMap容器之前,推薦大家先閱讀我之前對HashMap源碼分析的文章--Java集合(5)一 HashMap與HashSet,因?yàn)樵诘讓訑?shù)據(jù)結(jié)構(gòu)上,ConcurrentHashMap和HashMap都使用了數(shù)組+鏈表+紅黑樹的方式,只是在HashMap的基礎(chǔ)上添加了并發(fā)相關(guān)的一些控制,所以這里只對ConcurrentHashMap中并發(fā)相關(guān)代碼做一些分析。

還是先從ConcurrentHashMap的寫操作開始,這里就是put方法:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //計(jì)算桶的hash值
int binCount = 0;
//循環(huán)插入元素,避免并發(fā)插入失敗
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果當(dāng)前桶無元素,則通過cas操作插入新節(jié)點(diǎn)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
//如果當(dāng)前桶正在擴(kuò)容,則協(xié)助擴(kuò)容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//hash沖突時(shí)鎖住當(dāng)前需要添加節(jié)點(diǎn)的頭元素,可能是鏈表頭節(jié)點(diǎn)或者紅黑樹的根節(jié)點(diǎn)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
在put元素的過程中,有幾個(gè)并發(fā)處理的關(guān)鍵點(diǎn):
•如果當(dāng)前桶對應(yīng)的節(jié)點(diǎn)還沒有元素插入,通過典型的無鎖cas操作嘗試插入新節(jié)點(diǎn),減少加鎖的概率,并發(fā)情況下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。
•如果當(dāng)前桶正在擴(kuò)容,則協(xié)助擴(kuò)容((fh = f.hash) == MOVED)。這里是一個(gè)重點(diǎn),ConcurrentHashMap的擴(kuò)容和HashMap不一樣,它在多線程情況下或使用多個(gè)線程同時(shí)擴(kuò)容,每個(gè)線程擴(kuò)容指定的一部分hash桶,當(dāng)前線程擴(kuò)容完指定桶之后會繼續(xù)獲取下一個(gè)擴(kuò)容任務(wù),直到擴(kuò)容全部完成。擴(kuò)容的大小和HashMap一樣,都是翻倍,這樣可以有效減少移動(dòng)的元素?cái)?shù)量,也就是使用2的冪次方的原因,在HashMap中也一樣。
•在發(fā)生hash沖突時(shí)僅僅只鎖住當(dāng)前需要添加節(jié)點(diǎn)的頭元素即可,可能是鏈表頭節(jié)點(diǎn)或者紅黑樹的根節(jié)點(diǎn),其他桶節(jié)點(diǎn)都不需要加鎖,大大減小了鎖粒度。
通過ConcurrentHashMap添加元素的過程,知道了ConcurrentHashMap容器是通過CAS + synchronized一起來實(shí)現(xiàn)并發(fā)控制的。這里有個(gè)額外的問題:為什么使用synchronized而不使用ReentrantLock?前面我的文章也對synchronized以及ReentrantLock的實(shí)現(xiàn)方式和性能做過分析,在這里我的理解是synchronized在后期優(yōu)化空間上比ReentrantLock更大。
并發(fā)容器ConcurrentSkipListMap
java.util中對應(yīng)的容器在java.util.concurrent包中基本都可以找到對應(yīng)的并發(fā)容器:List和Set有對應(yīng)的CopyOnWriteArrayList與CopyOnWriteArraySet,HashMap有對應(yīng)的ConcurrentHashMap,但是有序的TreeMap或并沒有對應(yīng)的ConcurrentTreeMap。
為什么沒有ConcurrentTreeMap呢?這是因?yàn)門reeMap內(nèi)部使用了紅黑樹來實(shí)現(xiàn),紅黑樹是一種自平衡的二叉樹,當(dāng)樹被修改時(shí),需要重新平衡,重新平衡操作可能會影響樹的大部分節(jié)點(diǎn),如果并發(fā)量非常大的情況下,這就需要在許多樹節(jié)點(diǎn)上添加互斥鎖,那并發(fā)就失去了意義。所以提供了另外一種并發(fā)下的有序map實(shí)現(xiàn):ConcurrentSkipListMap。
ConcurrentSkipListMap內(nèi)部使用跳表(SkipList)這種數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn),他的結(jié)構(gòu)相對紅黑樹來說非常簡單理解,實(shí)現(xiàn)起來也相對簡單,而且在理論上它的查找、插入、刪除時(shí)間復(fù)雜度都為log(n)。在并發(fā)上,ConcurrentSkipListMap采用無鎖的CAS+自旋來控制。
跳表簡單來說就是一個(gè)多層的鏈表,底層是一個(gè)普通的鏈表,然后逐層減少,通常通過一個(gè)簡單的算法實(shí)現(xiàn)每一層元素是下一層的元素的二分之一,這樣當(dāng)搜索元素時(shí)從最頂層開始搜索,可以說是另一種形式的二分查找。
一個(gè)簡單的獲取跳表層數(shù)概率算法實(shí)現(xiàn)如下:
int random_level() {
K = 1;
while (random(0,1))
K++;
return K;
}
通過簡單的0和1獲取概率,1層的概率為50%,2層的概率為25%,3層的概率為12.5%,這樣逐級遞減。
一個(gè)三層的跳表添加元素的過程如下:

插入值為15的節(jié)點(diǎn):

插入后:

維基百科中有一個(gè)添加節(jié)點(diǎn)的動(dòng)圖,這里也貼出來方便理解:

通過分析ConcurrentSkipListMap的put方法來理解跳表以及CAS自旋并發(fā)控制:
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前繼節(jié)點(diǎn)
if (n != null) { //查找到前繼節(jié)點(diǎn)
Object v; int c;
Node<K,V> f = n.next; //獲取后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (n != b.next) //發(fā)生競爭,兩次節(jié)點(diǎn)獲取不一致,并發(fā)導(dǎo)致
break;
if ((v = n.value) == null) { // 節(jié)點(diǎn)已經(jīng)被刪除
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n)
break;
if ((c = cpr(cmp, key, n.key)) > 0) { //進(jìn)行下一輪查找,比當(dāng)前key大
b = n;
n = f;
continue;
}
if (c == 0) { //相等時(shí)直接cas修改值
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
if (!b.casNext(n, z)) //cas修改值
break; // restart if lost race to append to b
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed(); //獲取隨機(jī)數(shù)
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0) // 獲取跳表層級
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) { //如果獲取的調(diào)表層級小于等于當(dāng)前最大層級,則直接添加,并將它們組成一個(gè)上下的鏈表
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level //否則增加一層level,在這里體現(xiàn)為Index<K,V>數(shù)組
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j) //新添加的level層的具體數(shù)據(jù)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// 逐層插入數(shù)據(jù)過程
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
這里的插入方法很復(fù)雜,可以分為3大步來理解:第一步獲取前繼節(jié)點(diǎn)后通過CAS來插入節(jié)點(diǎn);第二步對level層數(shù)進(jìn)行判斷,如果大于最大層數(shù),則插入一層;第三步插入對應(yīng)層的數(shù)據(jù)。整個(gè)插入過程全部通過CAS自旋的方式保證并發(fā)情況下的數(shù)據(jù)正確性。
總結(jié)
JDK中提供了豐富的并發(fā)容器供我們使用,文章中介紹的也并不全面,重點(diǎn)是要通過了解各種并發(fā)容器的原理,明白他們各自獨(dú)特的使用場景。這里簡單做個(gè)總結(jié):當(dāng)并發(fā)讀遠(yuǎn)多于修改的場景下需要使用List和Set時(shí),可以考慮使用CopyOnWriteArrayList和CopyOnWriteArraySet;當(dāng)需要并發(fā)使用<Key, Value>鍵值對存取數(shù)據(jù)時(shí),可以使用ConcurrentHashMap;當(dāng)要保證并發(fā)<Key, Value>鍵值對有序時(shí)可以使用ConcurrentSkipListMap。
以上所述是小編給大家介紹的Java從同步容器到并發(fā)容器的操作過程,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時(shí)回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
springboot 設(shè)置局域網(wǎng)訪問的實(shí)現(xiàn)步驟
Spring Boot是一個(gè)開源Java-based框架,用于創(chuàng)建獨(dú)立的、生產(chǎn)級別的Spring應(yīng)用,它旨在簡化Spring應(yīng)用的初始搭建及開發(fā)過程,通過提供各種自動(dòng)配置的starter包,Spring Boot使得項(xiàng)目配置變得簡單快速,感興趣的朋友一起看看吧2024-02-02
jasypt 集成SpringBoot 數(shù)據(jù)庫密碼加密操作
這篇文章主要介紹了jasypt 集成SpringBoot 數(shù)據(jù)庫密碼加密操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11
Mybatis?sqlMapConfig.xml中的mappers標(biāo)簽使用
這篇文章主要介紹了Mybatis?sqlMapConfig.xml中的mappers標(biāo)簽使用方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01
Spring導(dǎo)入properties配置文件代碼示例
這篇文章主要介紹了Spring導(dǎo)入properties配置文件代碼示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10
vue 實(shí)現(xiàn)刪除對象的元素 delete
這篇文章主要介紹了vue 實(shí)現(xiàn)刪除對象的元素delete,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03

