Java從同步容器到并發(fā)容器的操作過(guò)程
引言
容器是Java基礎(chǔ)類庫(kù)中使用頻率最高的一部分,Java集合包中提供了大量的容器類來(lái)幫組我們簡(jiǎn)化開(kāi)發(fā),我前面的文章中對(duì)Java集合包中的關(guān)鍵容器進(jìn)行過(guò)一個(gè)系列的分析,但這些集合類都是非線程安全的,即在多線程的環(huán)境下,都需要其他額外的手段來(lái)保證數(shù)據(jù)的正確性,最簡(jiǎn)單的就是通過(guò)synchronized關(guān)鍵字將所有使用到非線程安全的容器代碼全部同步執(zhí)行。這種方式雖然可以達(dá)到線程安全的目的,但存在幾個(gè)明顯的問(wèn)題:首先編碼上存在一定的復(fù)雜性,相關(guān)的代碼段都需要添加鎖。其次這種一刀切的做法在高并發(fā)情況下性能并不理想,基本相當(dāng)于串行執(zhí)行。JDK1.5中為我們提供了一系列的并發(fā)容器,集中在java.util.concurrent包下,用來(lái)解決這兩個(gè)問(wèn)題,先從同步容器說(shuō)起。
同步容器Vector和HashTable
為了簡(jiǎn)化代碼開(kāi)發(fā)的過(guò)程,早期的JDK在java.util包中提供了Vector和HashTable兩個(gè)同步容器,這兩個(gè)容器的實(shí)現(xiàn)和早期的ArrayList和HashMap代碼實(shí)現(xiàn)基本一樣,不同在于Vector和HashTable在每個(gè)方法上都添加了synchronized關(guān)鍵字來(lái)保證同一個(gè)實(shí)例同時(shí)只有一個(gè)線程能訪問(wèn),部分源碼如下:
//Vector public synchronized int size() {}; public synchronized E get(int index) {}; //HashTable public synchronized V put(K key, V value) {}; public synchronized V remove(Object key) {};
通過(guò)對(duì)每個(gè)方法添加synchronized,保證了多次操作的串行。這種方式雖然使用起來(lái)方便了,但并沒(méi)有解決高并發(fā)下的性能問(wèn)題,與手動(dòng)鎖住ArrayList和HashMap并沒(méi)有什么區(qū)別,不論讀還是寫(xiě)都會(huì)鎖住整個(gè)容器。其次這種方式存在另一個(gè)問(wèn)題:當(dāng)多個(gè)線程進(jìn)行復(fù)合操作時(shí),是線程不安全的。可以通過(guò)下面的代碼來(lái)說(shuō)明這個(gè)問(wèn)題:
public static void deleteVector(){ int index = vectors.size() - 1; vectors.remove(index); }
代碼中對(duì)Vector進(jìn)行了兩步操作,首先獲取size,然后移除最后一個(gè)元素,多線程情況下如果兩個(gè)線程交叉執(zhí)行,A線程調(diào)用size后,B線程移除最后一個(gè)元素,這時(shí)A線程繼續(xù)remove將會(huì)拋出索引超出的錯(cuò)誤。
那么怎么解決這個(gè)問(wèn)題呢?最直接的修改方案就是對(duì)代碼塊加鎖來(lái)防止多線程同時(shí)執(zhí)行:
public static void deleteVector(){ synchronized (vectors) { int index = vectors.size() - 1; vectors.remove(index); } }
如果上面的問(wèn)題通過(guò)加鎖來(lái)解決沒(méi)有太直觀的影響,那么來(lái)看看對(duì)vectors進(jìn)行迭代的情況:
public static void foreachVector(){ synchronized (vectors) { for (int i = 0; i < vectors.size(); i++) { System.out.println(vectors.get(i).toString()); } } }
為了避免多線程情況下在迭代的過(guò)程中其他線程對(duì)vectors進(jìn)行了修改,就不得不對(duì)整個(gè)迭代過(guò)程加鎖,想象這么一個(gè)場(chǎng)景,如果迭代操作非常頻繁,或者vectors元素很大,那么所有的修改和讀取操作將不得不在鎖外等待,這將會(huì)對(duì)多線程性能造成極大的影響。那么有沒(méi)有什么方式能夠很好的對(duì)容器的迭代操作和修改操作進(jìn)行分離,在修改時(shí)不影響容器的迭代操作呢?這就需要java.util.concurrent包中的各種并發(fā)容器了出場(chǎng)了。
并發(fā)容器CopyOnWrite
CopyOnWrite--寫(xiě)時(shí)復(fù)制容器是一種常用的并發(fā)容器,它通過(guò)多線程下讀寫(xiě)分離來(lái)達(dá)到提高并發(fā)性能的目的,和前面我們講解StampedLock時(shí)所用的解決方案類似:任何時(shí)候都可以進(jìn)行讀操作,寫(xiě)操作則需要加鎖。不同的是,在CopyOnWrite中,對(duì)容器的修改操作加鎖后,通過(guò)copy一個(gè)新的容器來(lái)進(jìn)行修改,修改完畢后將容器替換為新的容器即可。
這種方式的好處顯而易見(jiàn):通過(guò)copy一個(gè)新的容器來(lái)進(jìn)行修改,這樣讀操作就不需要加鎖,可以并發(fā)讀,因?yàn)樵谧x的過(guò)程中是采用的舊的容器,即使新容器做了修改對(duì)舊容器也沒(méi)有影響,同時(shí)也很好的解決了迭代過(guò)程中其他線程修改導(dǎo)致的并發(fā)問(wèn)題。
JDK中提供的并發(fā)容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面通過(guò)CopyOnWriteArrayList的部分源碼來(lái)理解這種思想:
//添加元素 public boolean add(E e) { //獨(dú)占鎖 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //復(fù)制一個(gè)新的數(shù)組newElements Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; //修改后指向新的數(shù)組 setArray(newElements); return true; } finally { lock.unlock(); } } public E get(int index) { //未加鎖,直接獲取 return get(getArray(), index); }
代碼很簡(jiǎn)單,在add操作中通過(guò)一個(gè)共享的ReentrantLock來(lái)獲取鎖,這樣可以防止多線程下多個(gè)線程同時(shí)修改容器內(nèi)容。獲取鎖后通過(guò)Arrays.copyOf復(fù)制了一個(gè)新的容器,然后對(duì)新的容器進(jìn)行了修改,最后直接通過(guò)setArray將原數(shù)組引用指向了新的數(shù)組,避免了在修改過(guò)程中迭代數(shù)據(jù)出現(xiàn)錯(cuò)誤。get操作由于是讀操作,未加鎖,直接讀取就行。
CopyOnWriteArraySet類似,這里不做過(guò)多講解。
CopyOnWrite容器雖然在多線程下使用是安全的,相比較Vector也大大提高了讀寫(xiě)的性能,但它也有自身的問(wèn)題。
首先就是性能,在講解ArrayList的文章中提到過(guò),ArrayList的擴(kuò)容由于使用了Arrays.copyOf每次都需要申請(qǐng)更大的空間以及復(fù)制現(xiàn)有的元素到新的數(shù)組,對(duì)性能存在一定影響。CopyOnWrite容器也不例外,每次修改操作都會(huì)申請(qǐng)新的數(shù)組空間,然后進(jìn)行替換。所以在高并發(fā)頻繁修改容器的情況下,會(huì)不斷申請(qǐng)新的空間,同時(shí)會(huì)造成頻繁的GC,這時(shí)使用CopyOnWrite容器并不是一個(gè)好的選擇。
其次還有一個(gè)數(shù)據(jù)一致性問(wèn)題,由于在修改中copy了新的數(shù)組進(jìn)行替換,同時(shí)舊數(shù)組如果還在被使用,那么新的數(shù)據(jù)就不能被及時(shí)讀取到,這樣就造成了數(shù)據(jù)不一致,如果需要強(qiáng)數(shù)據(jù)一致性,CopyOnWrite容器也不太適合。
并發(fā)容器ConcurrentHashMap
ConcurrentHashMap容器相較于CopyOnWrite容器在并發(fā)加鎖粒度上有了更大一步的優(yōu)化,它通過(guò)修改對(duì)單個(gè)hash桶元素加鎖的達(dá)到了更細(xì)粒度的并發(fā)控制。在了解ConcurrentHashMap容器之前,推薦大家先閱讀我之前對(duì)HashMap源碼分析的文章--Java集合(5)一 HashMap與HashSet,因?yàn)樵诘讓訑?shù)據(jù)結(jié)構(gòu)上,ConcurrentHashMap和HashMap都使用了數(shù)組+鏈表+紅黑樹(shù)的方式,只是在HashMap的基礎(chǔ)上添加了并發(fā)相關(guān)的一些控制,所以這里只對(duì)ConcurrentHashMap中并發(fā)相關(guān)代碼做一些分析。
還是先從ConcurrentHashMap的寫(xiě)操作開(kāi)始,這里就是put方法:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //計(jì)算桶的hash值 int binCount = 0; //循環(huán)插入元素,避免并發(fā)插入失敗 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果當(dāng)前桶無(wú)元素,則通過(guò)cas操作插入新節(jié)點(diǎn) if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } //如果當(dāng)前桶正在擴(kuò)容,則協(xié)助擴(kuò)容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //hash沖突時(shí)鎖住當(dāng)前需要添加節(jié)點(diǎn)的頭元素,可能是鏈表頭節(jié)點(diǎn)或者紅黑樹(shù)的根節(jié)點(diǎn) synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
在put元素的過(guò)程中,有幾個(gè)并發(fā)處理的關(guān)鍵點(diǎn):
•如果當(dāng)前桶對(duì)應(yīng)的節(jié)點(diǎn)還沒(méi)有元素插入,通過(guò)典型的無(wú)鎖cas操作嘗試插入新節(jié)點(diǎn),減少加鎖的概率,并發(fā)情況下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。
•如果當(dāng)前桶正在擴(kuò)容,則協(xié)助擴(kuò)容((fh = f.hash) == MOVED)。這里是一個(gè)重點(diǎn),ConcurrentHashMap的擴(kuò)容和HashMap不一樣,它在多線程情況下或使用多個(gè)線程同時(shí)擴(kuò)容,每個(gè)線程擴(kuò)容指定的一部分hash桶,當(dāng)前線程擴(kuò)容完指定桶之后會(huì)繼續(xù)獲取下一個(gè)擴(kuò)容任務(wù),直到擴(kuò)容全部完成。擴(kuò)容的大小和HashMap一樣,都是翻倍,這樣可以有效減少移動(dòng)的元素?cái)?shù)量,也就是使用2的冪次方的原因,在HashMap中也一樣。
•在發(fā)生hash沖突時(shí)僅僅只鎖住當(dāng)前需要添加節(jié)點(diǎn)的頭元素即可,可能是鏈表頭節(jié)點(diǎn)或者紅黑樹(shù)的根節(jié)點(diǎn),其他桶節(jié)點(diǎn)都不需要加鎖,大大減小了鎖粒度。
通過(guò)ConcurrentHashMap添加元素的過(guò)程,知道了ConcurrentHashMap容器是通過(guò)CAS + synchronized一起來(lái)實(shí)現(xiàn)并發(fā)控制的。這里有個(gè)額外的問(wèn)題:為什么使用synchronized而不使用ReentrantLock?前面我的文章也對(duì)synchronized以及ReentrantLock的實(shí)現(xiàn)方式和性能做過(guò)分析,在這里我的理解是synchronized在后期優(yōu)化空間上比ReentrantLock更大。
并發(fā)容器ConcurrentSkipListMap
java.util中對(duì)應(yīng)的容器在java.util.concurrent包中基本都可以找到對(duì)應(yīng)的并發(fā)容器:List和Set有對(duì)應(yīng)的CopyOnWriteArrayList與CopyOnWriteArraySet,HashMap有對(duì)應(yīng)的ConcurrentHashMap,但是有序的TreeMap或并沒(méi)有對(duì)應(yīng)的ConcurrentTreeMap。
為什么沒(méi)有ConcurrentTreeMap呢?這是因?yàn)門(mén)reeMap內(nèi)部使用了紅黑樹(shù)來(lái)實(shí)現(xiàn),紅黑樹(shù)是一種自平衡的二叉樹(shù),當(dāng)樹(shù)被修改時(shí),需要重新平衡,重新平衡操作可能會(huì)影響樹(shù)的大部分節(jié)點(diǎn),如果并發(fā)量非常大的情況下,這就需要在許多樹(shù)節(jié)點(diǎn)上添加互斥鎖,那并發(fā)就失去了意義。所以提供了另外一種并發(fā)下的有序map實(shí)現(xiàn):ConcurrentSkipListMap。
ConcurrentSkipListMap內(nèi)部使用跳表(SkipList)這種數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn),他的結(jié)構(gòu)相對(duì)紅黑樹(shù)來(lái)說(shuō)非常簡(jiǎn)單理解,實(shí)現(xiàn)起來(lái)也相對(duì)簡(jiǎn)單,而且在理論上它的查找、插入、刪除時(shí)間復(fù)雜度都為log(n)。在并發(fā)上,ConcurrentSkipListMap采用無(wú)鎖的CAS+自旋來(lái)控制。
跳表簡(jiǎn)單來(lái)說(shuō)就是一個(gè)多層的鏈表,底層是一個(gè)普通的鏈表,然后逐層減少,通常通過(guò)一個(gè)簡(jiǎn)單的算法實(shí)現(xiàn)每一層元素是下一層的元素的二分之一,這樣當(dāng)搜索元素時(shí)從最頂層開(kāi)始搜索,可以說(shuō)是另一種形式的二分查找。
一個(gè)簡(jiǎn)單的獲取跳表層數(shù)概率算法實(shí)現(xiàn)如下:
int random_level() { K = 1; while (random(0,1)) K++; return K; }
通過(guò)簡(jiǎn)單的0和1獲取概率,1層的概率為50%,2層的概率為25%,3層的概率為12.5%,這樣逐級(jí)遞減。
一個(gè)三層的跳表添加元素的過(guò)程如下:
插入值為15的節(jié)點(diǎn):
插入后:
維基百科中有一個(gè)添加節(jié)點(diǎn)的動(dòng)圖,這里也貼出來(lái)方便理解:
通過(guò)分析ConcurrentSkipListMap的put方法來(lái)理解跳表以及CAS自旋并發(fā)控制:
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前繼節(jié)點(diǎn) if (n != null) { //查找到前繼節(jié)點(diǎn) Object v; int c; Node<K,V> f = n.next; //獲取后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn) if (n != b.next) //發(fā)生競(jìng)爭(zhēng),兩次節(jié)點(diǎn)獲取不一致,并發(fā)導(dǎo)致 break; if ((v = n.value) == null) { // 節(jié)點(diǎn)已經(jīng)被刪除 n.helpDelete(b, f); break; } if (b.value == null || v == n) break; if ((c = cpr(cmp, key, n.key)) > 0) { //進(jìn)行下一輪查找,比當(dāng)前key大 b = n; n = f; continue; } if (c == 0) { //相等時(shí)直接cas修改值 if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); //9. n.key > key > b.key if (!b.casNext(n, z)) //cas修改值 break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); //獲取隨機(jī)數(shù) if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) // 獲取跳表層級(jí) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { //如果獲取的調(diào)表層級(jí)小于等于當(dāng)前最大層級(jí),則直接添加,并將它們組成一個(gè)上下的鏈表 for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level //否則增加一層level,在這里體現(xiàn)為Index<K,V>數(shù)組 level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) //新添加的level層的具體數(shù)據(jù) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // 逐層插入數(shù)據(jù)過(guò)程 splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
這里的插入方法很復(fù)雜,可以分為3大步來(lái)理解:第一步獲取前繼節(jié)點(diǎn)后通過(guò)CAS來(lái)插入節(jié)點(diǎn);第二步對(duì)level層數(shù)進(jìn)行判斷,如果大于最大層數(shù),則插入一層;第三步插入對(duì)應(yīng)層的數(shù)據(jù)。整個(gè)插入過(guò)程全部通過(guò)CAS自旋的方式保證并發(fā)情況下的數(shù)據(jù)正確性。
總結(jié)
JDK中提供了豐富的并發(fā)容器供我們使用,文章中介紹的也并不全面,重點(diǎn)是要通過(guò)了解各種并發(fā)容器的原理,明白他們各自獨(dú)特的使用場(chǎng)景。這里簡(jiǎn)單做個(gè)總結(jié):當(dāng)并發(fā)讀遠(yuǎn)多于修改的場(chǎng)景下需要使用List和Set時(shí),可以考慮使用CopyOnWriteArrayList和CopyOnWriteArraySet;當(dāng)需要并發(fā)使用<Key, Value>鍵值對(duì)存取數(shù)據(jù)時(shí),可以使用ConcurrentHashMap;當(dāng)要保證并發(fā)<Key, Value>鍵值對(duì)有序時(shí)可以使用ConcurrentSkipListMap。
以上所述是小編給大家介紹的Java從同步容器到并發(fā)容器的操作過(guò)程,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
相關(guān)文章
springboot 設(shè)置局域網(wǎng)訪問(wèn)的實(shí)現(xiàn)步驟
Spring Boot是一個(gè)開(kāi)源Java-based框架,用于創(chuàng)建獨(dú)立的、生產(chǎn)級(jí)別的Spring應(yīng)用,它旨在簡(jiǎn)化Spring應(yīng)用的初始搭建及開(kāi)發(fā)過(guò)程,通過(guò)提供各種自動(dòng)配置的starter包,Spring Boot使得項(xiàng)目配置變得簡(jiǎn)單快速,感興趣的朋友一起看看吧2024-02-02SpringBoot解決mysql連接8小時(shí)問(wèn)題
服務(wù)連接mysql數(shù)據(jù)庫(kù),8小時(shí)沒(méi)有數(shù)據(jù)庫(kù)的操作時(shí)候,數(shù)據(jù)庫(kù)會(huì)主動(dòng)斷開(kāi)連接釋放資源,本文就詳細(xì)的介紹一下解決方法,感興趣的可以了解一下2023-08-08jasypt 集成SpringBoot 數(shù)據(jù)庫(kù)密碼加密操作
這篇文章主要介紹了jasypt 集成SpringBoot 數(shù)據(jù)庫(kù)密碼加密操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11Mybatis?sqlMapConfig.xml中的mappers標(biāo)簽使用
這篇文章主要介紹了Mybatis?sqlMapConfig.xml中的mappers標(biāo)簽使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。2022-01-01Spring導(dǎo)入properties配置文件代碼示例
這篇文章主要介紹了Spring導(dǎo)入properties配置文件代碼示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10vue 實(shí)現(xiàn)刪除對(duì)象的元素 delete
這篇文章主要介紹了vue 實(shí)現(xiàn)刪除對(duì)象的元素delete,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03