Java多線程編程中的兩種常用并發(fā)容器講解
ConcurrentHashMap并發(fā)容器
ConcurrentHashMap可以做到讀取數(shù)據(jù)不加鎖,并且其內(nèi)部的結(jié)構(gòu)可以讓其在進行寫操作的時候能夠?qū)㈡i的粒度保持地盡量地小,不用對整個ConcurrentHashMap加鎖。
ConcurrentHashMap的內(nèi)部結(jié)構(gòu)
ConcurrentHashMap為了提高本身的并發(fā)能力,在內(nèi)部采用了一個叫做Segment的結(jié)構(gòu),一個Segment其實就是一個類Hash Table的結(jié)構(gòu),Segment內(nèi)部維護了一個鏈表數(shù)組,我們用下面這一幅圖來看下ConcurrentHashMap的內(nèi)部結(jié)構(gòu):
從上面的結(jié)構(gòu)我們可以了解到,ConcurrentHashMap定位一個元素的過程需要進行兩次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部,因此,這一種結(jié)構(gòu)的帶來的副作用是Hash的過程要比普通的HashMap要長,但是帶來的好處是寫操作的時候可以只對元素所在的Segment進行加鎖即可,不會影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支持Segment數(shù)量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的Segment上),所以,通過這一種結(jié)構(gòu),ConcurrentHashMap的并發(fā)能力可以大大的提高。
Segment
我們再來具體了解一下Segment的數(shù)據(jù)結(jié)構(gòu):
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry<K,V>[] table; final float loadFactor; }
詳細解釋一下Segment里面的成員變量的意義:
- count:Segment中元素的數(shù)量
- modCount:對table的大小造成影響的操作的數(shù)量(比如put或者remove操作)
- threshold:閾值,Segment里面元素的數(shù)量超過這個值依舊就會對Segment進行擴容
- table:鏈表數(shù)組,數(shù)組中的每一個元素代表了一個鏈表的頭部
- loadFactor:負載因子,用于確定threshold
HashEntry
Segment中的元素是以HashEntry的形式存放在鏈表數(shù)組中的,看一下HashEntry的結(jié)構(gòu):
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
可以看到HashEntry的一個特點,除了value以外,其他的幾個變量都是final的,這樣做是為了防止鏈表結(jié)構(gòu)被破壞,出現(xiàn)ConcurrentModification的情況。
ConcurrentHashMap的初始化
下面我們來結(jié)合源代碼來具體分析一下ConcurrentHashMap的實現(xiàn),先看下初始化方法:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); }
CurrentHashMap的初始化一共有三個參數(shù),一個initialCapacity,表示初始的容量,一個loadFactor,表示負載參數(shù),最后一個是concurrentLevel,代表ConcurrentHashMap內(nèi)部的Segment的數(shù)量,ConcurrentLevel一經(jīng)指定,不可改變,后續(xù)如果ConcurrentHashMap的元素數(shù)量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數(shù)量,而只會增加Segment中鏈表數(shù)組的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment里面的元素做一次rehash就可以了。
整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據(jù)concurrentLevel來new出Segment,這里Segment的數(shù)量是不大于concurrentLevel的最大的2的指數(shù),就是說Segment的數(shù)量永遠是2的指數(shù)個,這樣的好處是方便采用移位操作來進行hash,加快hash的過程。接下來就是根據(jù)intialCapacity確定Segment的容量的大小,每一個Segment的容量大小也是2的指數(shù),同樣使為了加快hash的過程。
這邊需要特別注意一下兩個變量,分別是segmentShift和segmentMask,這兩個變量在后面將會起到很大的作用,假設(shè)構(gòu)造函數(shù)確定了Segment的數(shù)量是2的n次方,那么segmentShift就等于32減去n,而segmentMask就等于2的n次方減一。
ConcurrentHashMap的get操作
前面提到過ConcurrentHashMap的get操作是不用加鎖的,我們這里看一下其實現(xiàn):
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
看第三行,segmentFor這個函數(shù)用于確定操作應(yīng)該在哪一個segment中進行,幾乎對ConcurrentHashMap的所有操作都需要用到這個函數(shù),我們看下這個函數(shù)的實現(xiàn):
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
這個函數(shù)用了位操作來確定Segment,根據(jù)傳入的hash值向右無符號右移segmentShift位,然后和segmentMask進行與操作,結(jié)合我們之前說的segmentShift和segmentMask的值,就可以得出以下結(jié)論:假設(shè)Segment的數(shù)量是2的n次方,根據(jù)元素的hash值的高n位就可以確定元素到底在哪一個Segment中。
在確定了需要在哪一個segment中進行操作以后,接下來的事情就是調(diào)用對應(yīng)的Segment的get方法:
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
先看第二行代碼,這里對count進行了一次判斷,其中count表示Segment中元素的數(shù)量,我們可以來看一下count的定義:
transient volatile int count;
可以看到count是volatile的,實際上這里里面利用了volatile的語義:
對volatile字段的寫入操作happens-before于每一個后續(xù)的同一個字段的讀操作。
因為實際上put、remove等操作也會更新count的值,所以當競爭發(fā)生的時候,volatile的語義可以保證寫操作在讀操作之前,也就保證了寫操作對后續(xù)的讀操作都是可見的,這樣后面get的后續(xù)操作就可以拿到完整的元素內(nèi)容。
然后,在第三行,調(diào)用了getFirst()來取得鏈表的頭部:
HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; }
同樣,這里也是用位操作來確定鏈表的頭部,hash值和HashTable的長度減一做與操作,最后的結(jié)果就是hash值的低n位,其中n是HashTable的長度以2為底的結(jié)果。
在確定了鏈表的頭部以后,就可以對整個鏈表進行遍歷,看第4行,取出key對應(yīng)的value的值,如果拿出的value的值是null,則可能這個key,value對正在put的過程中,如果出現(xiàn)這種情況,那么就加鎖來保證取出的value是完整的,如果不是null,則直接返回value。
ConcurrentHashMap的put操作
看完了get操作,再看下put操作,put操作的前面也是確定Segment的過程,這里不再贅述,直接看關(guān)鍵的segment的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); } }
首先對Segment的put操作是加鎖完成的,然后在第五行,如果Segment中元素的數(shù)量超過了閾值(由構(gòu)造函數(shù)中的loadFactor算出)這需要進行對Segment擴容,并且要進行rehash,關(guān)于rehash的過程大家可以自己去了解,這里不詳細講了。
第8和第9行的操作就是getFirst的過程,確定鏈表頭部的位置。
第11行這里的這個while循環(huán)是在鏈表中尋找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果沒有找到,則進入21行這里,生成一個新的HashEntry并且把它加到整個Segment的頭部,然后再更新count的值。
ConcurrentHashMap的remove操作
Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過程,然后再調(diào)用Segment的remove方法:
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
首先remove操作也是確定需要刪除的元素的位置,不過這里刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向后面一個就完事了,我們之前已經(jīng)說過HashEntry中的next是final的,一經(jīng)賦值以后就不可修改,在定位到待刪除元素的位置以后,程序就將待刪除元素前面的那一些元素全部復制一遍,然后再一個一個重新接到鏈表上去,看一下下面這一幅圖來了解這個過程:
假設(shè)鏈表中原來的元素如上圖所示,現(xiàn)在要刪除元素3,那么刪除元素3以后的鏈表就如下圖所示:
CopyOnWriteArrayList并發(fā)容器
Copy-On-Write簡稱COW,是一種用于程序設(shè)計中的優(yōu)化策略。其基本思路是,從一開始大家都在共享同一個內(nèi)容,當某個人想要修改這個內(nèi)容的時候,才會真正把內(nèi)容Copy出去形成一個新的內(nèi)容然后再改,這是一種延時懶惰策略。從JDK1.5開始Java并發(fā)包里提供了兩個使用CopyOnWrite機制實現(xiàn)的并發(fā)容器,它們是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用,可以在非常多的并發(fā)場景中使用到。
什么是CopyOnWrite容器
CopyOnWrite容器即寫時復制的容器。通俗的理解是當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對CopyOnWrite容器進行并發(fā)的讀,而不需要加鎖,因為當前容器不會添加任何元素。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。
CopyOnWriteArrayList的實現(xiàn)原理
在使用CopyOnWriteArrayList之前,我們先閱讀其源碼了解下它是如何實現(xiàn)的。以下代碼是向CopyOnWriteArrayList中add方法的實現(xiàn)(向CopyOnWriteArrayList里添加元素),可以發(fā)現(xiàn)在添加的時候是需要加鎖的,否則多線程寫的時候會Copy出N個副本出來。
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
讀的時候不需要加鎖,如果讀的時候有多個線程正在向CopyOnWriteArrayList添加數(shù)據(jù),讀還是會讀到舊的數(shù)據(jù),因為寫的時候不會鎖住舊的CopyOnWriteArrayList。
public E get(int index) { return get(getArray(), index); }
JDK中并沒有提供CopyOnWriteMap,我們可以參考CopyOnWriteArrayList來實現(xiàn)一個,基本代碼如下:
import java.util.Collection; import java.util.Map; import java.util.Set; public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable { private volatile Map<K, V> internalMap; public CopyOnWriteMap() { internalMap = new HashMap<K, V>(); } public V put(K key, V value) { synchronized (this) { Map<K, V> newMap = new HashMap<K, V>(internalMap); V val = newMap.put(key, value); internalMap = newMap; return val; } } public V get(Object key) { return internalMap.get(key); } public void putAll(Map<? extends K, ? extends V> newData) { synchronized (this) { Map<K, V> newMap = new HashMap<K, V>(internalMap); newMap.putAll(newData); internalMap = newMap; } } }
實現(xiàn)很簡單,只要了解了CopyOnWrite機制,我們可以實現(xiàn)各種CopyOnWrite容器,并且在不同的應(yīng)用場景中使用。
CopyOnWrite的應(yīng)用場景
CopyOnWrite并發(fā)容器用于讀多寫少的并發(fā)場景。比如白名單,黑名單,商品類目的訪問和更新場景,假如我們有一個搜索網(wǎng)站,用戶在這個網(wǎng)站的搜索框中,輸入關(guān)鍵字搜索內(nèi)容,但是某些關(guān)鍵字不允許被搜索。這些不能被搜索的關(guān)鍵字會被放在一個黑名單當中,黑名單每天晚上更新一次。當用戶搜索時,會檢查當前關(guān)鍵字在不在黑名單當中,如果在,則提示不能搜索。實現(xiàn)代碼如下:
package com.ifeve.book; import java.util.Map; import com.ifeve.book.forkjoin.CopyOnWriteMap; /** * 黑名單服務(wù) * * @author fangtengfei * */ public class BlackListServiceImpl { private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>( 1000); public static boolean isBlackList(String id) { return blackListMap.get(id) == null ? false : true; } public static void addBlackList(String id) { blackListMap.put(id, Boolean.TRUE); } /** * 批量添加黑名單 * * @param ids */ public static void addBlackList(Map<String,Boolean> ids) { blackListMap.putAll(ids); } }
代碼很簡單,但是使用CopyOnWriteMap需要注意兩件事情:
1. 減少擴容開銷。根據(jù)實際需要,初始化CopyOnWriteMap的大小,避免寫時CopyOnWriteMap擴容的開銷。
2. 使用批量添加。因為每次添加,容器每次都會進行復制,所以減少添加次數(shù),可以減少容器的復制次數(shù)。如使用上面代碼里的addBlackList方法。
CopyOnWrite的缺點
CopyOnWrite容器有很多優(yōu)點,但是同時也存在兩個問題,即內(nèi)存占用問題和數(shù)據(jù)一致性問題。所以在開發(fā)的時候需要注意一下。
內(nèi)存占用問題。因為CopyOnWrite的寫時復制機制,所以在進行寫操作的時候,內(nèi)存里會同時駐扎兩個對象的內(nèi)存,舊的對象和新寫入的對象(注意:在復制的時候只是復制容器里的引用,只是在寫的時候會創(chuàng)建新對象添加到新容器里,而舊容器的對象還在使用,所以有兩份對象內(nèi)存)。如果這些對象占用的內(nèi)存比較大,比如說200M左右,那么再寫入100M數(shù)據(jù)進去,內(nèi)存就會占用300M,那么這個時候很有可能造成頻繁的Yong GC和Full GC。之前我們系統(tǒng)中使用了一個服務(wù)由于每晚使用CopyOnWrite機制更新大對象,造成了每晚15秒的Full GC,應(yīng)用響應(yīng)時間也隨之變長。
針對內(nèi)存占用問題,可以通過壓縮容器中的元素的方法來減少大對象的內(nèi)存消耗,比如,如果元素全是10進制的數(shù)字,可以考慮把它壓縮成36進制或64進制?;蛘卟皇褂肅opyOnWrite容器,而使用其他的并發(fā)容器,如ConcurrentHashMap。
數(shù)據(jù)一致性問題。CopyOnWrite容器只能保證數(shù)據(jù)的最終一致性,不能保證數(shù)據(jù)的實時一致性。所以如果你希望寫入的的數(shù)據(jù),馬上能讀到,請不要使用CopyOnWrite容器。
相關(guān)文章
基于Springboot+Mybatis對數(shù)據(jù)訪問層進行單元測試的方式分享
本文將介紹一種快高效、可復用的解決測試方案——對數(shù)據(jù)訪問層做單元測試,文章通過代碼示例介紹的非常詳細,具有一定的參考價值,需要的朋友可以參考下2023-07-07Spring中數(shù)據(jù)訪問對象Data Access Object的介紹
今天小編就為大家分享一篇關(guān)于Spring中數(shù)據(jù)訪問對象Data Access Object的介紹,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-01-01