欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

ConcurrentHashMap線程安全及實現(xiàn)原理實例解析

 更新時間:2023年11月06日 09:37:59   作者:jacheut  
這篇文章主要介紹了ConcurrentHashMap線程安全及實現(xiàn)原理實例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

背景

  • 線程不安全的HashMap

因為多線程環(huán)境下,使用Hashmap進行put操作會引起死循環(huán),導致CPU利用率接近100%(jdk1.7,1.8引入紅黑樹優(yōu)化了),多線程put可能會導致元素丟失。所以在并發(fā)情況下不能使用HashMap。

  • 效率低下的HashTable容器

JDK1.7的實現(xiàn)

在JDK1.7版本中,ConcurrentHashMap的數(shù)據(jù)結構是由一個Segment數(shù)組和多個HashEntry組成,如下圖所示:

Segment數(shù)組的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分段技術,而每一個Segment元素存儲的是HashEntry數(shù)組+鏈表,這個和HashMap的數(shù)據(jù)存儲結構一樣。Segment(桶)

應用場景

當有一個大數(shù)組時需要在多個線程共享時就可以考慮是否把它給分層多個節(jié)點了,避免大鎖。并可以考慮通過hash算法進行一些模塊定位。 其實不止用于線程,當設計數(shù)據(jù)表的事務時(事務某種意義上也是同步機制的體現(xiàn)),可以把一個表看成一個需要同步的數(shù)組,如果操作的表數(shù)據(jù)太多時就可以考慮事務分離了(這也是為什么要避免大表的出現(xiàn)),比如把數(shù)據(jù)進行字段拆分,水平分表等.

初始化

ConcurrentHashMap的初始化是會通過位與運算來初始化Segment的大小,用ssize來表示,如下所示

int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}

如上所示,因為ssize用位于運算來計算(ssize <<=1),所以Segment的大小取值都是以2的N次方,無關concurrencyLevel的取值,當然concurrencyLevel最大只能用16位的二進制來表示,即65536,換句話說,Segment的大小最多65536個,沒有指定concurrencyLevel元素初始化,Segment的大小ssize默認為16

每一個Segment元素下的HashEntry的初始化也是按照位于運算來計算,用cap來表示,如下所示

int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
    cap <<= 1;

如上所示,HashEntry大小的計算也是2的N次方(cap <<=1), cap的初始值為1,所以HashEntry最小的容量為2

JDK1.8的實現(xiàn)

JDK1.8的實現(xiàn)已經摒棄了Segment的概念,而是直接用Node數(shù)組+鏈表+紅黑樹的數(shù)據(jù)結構來實現(xiàn),并發(fā)控制使用Synchronized和CAS來操作,整個看起來就像是優(yōu)化過且線程安全的HashMap,雖然在JDK1.8中還能看到Segment的數(shù)據(jù)結構,但是已經簡化了屬性,只是為了兼容舊版本。

基本屬性:

// 默認初始值,必須是2的幕數(shù)
private static final int DEFAULT_CAPACITY = 16;
// 負載因子
private static final float LOAD_FACTOR = 0.75f;
// 鏈表轉紅黑樹閥值,> 8 鏈表轉換為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
//樹轉鏈表閥值,小于等于6 ,僅在擴容tranfer時才可能樹轉鏈表
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大線程數(shù)
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中記錄size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED     = -1; 
// 樹根節(jié)點的hash值
static final int TREEBIN   = -2; 
// ReservationNode的hash值
static final int RESERVED  = -3; 
// 可用處理器數(shù)量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數(shù)組 第一次插入數(shù)據(jù)的時候初始化,大小是2的冪次方
transient volatile Node<K,V>[] table;
/*控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
 *當為負數(shù)時:-1代表正在初始化,-N代表有N-1個線程正在 進行擴容
 *當為0時:代表當時的table還沒有被初始化
 *當為正數(shù)時:表示初始化或者下一次進行擴容的大小
,類似于擴容閾值。它的值始終是當前ConcurrentHashMap容量的0.75倍,這與loadfactor是對應的。實際容量>=sizeCtl,則擴容。
private transient volatile int sizeCtl;

put操作

public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode()); //兩次hash,減少hash沖突,可以均勻分布
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //對這個table進行迭代
        Node<K,V> f; int n, i, fh;
        //這里就是上面構造方法沒有進行初始化,在這里進行判斷,為null就調用initTable進行初始化,屬于懶漢模式初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒有數(shù)據(jù),就直接無鎖插入
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)//如果在進行擴容,則先進行擴容操作
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //如果以上條件都不滿足,那就要進行加鎖操作,也就是存在hash沖突,鎖住鏈表或者紅黑樹的頭結點
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { //表示該節(jié)點是鏈表結構
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //這里涉及到相同的key進行put就會覆蓋原先的value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {  //插入鏈表尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {//紅黑樹結構
                        Node<K,V> p;
                        binCount = 2;
                        //紅黑樹結構旋轉插入
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { //如果鏈表的長度大于8時就會進行紅黑樹的轉換
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);//統(tǒng)計size,并且檢查是否需要擴容
    return null;
}

這個put的過程很清晰,對當前的table進行無條件自循環(huán)直到put成功,可以分成以下六步流程來概述。

  • 如果沒有初始化就先調用initTable()方法來進行初始化過程
  • 如果沒有hash沖突就直接CAS插入
  • 如果還在進行擴容操作就先進行擴容
  • 如果存在hash沖突,就加鎖來保證線程安全,這里有兩種情況,一種是鏈表形式就直接遍歷到尾端插入,一種是紅黑樹就按照紅黑樹結構插入,
  • 最后一個如果該鏈表的數(shù)量大于閾值8,就要先轉換成黑紅樹的結構,break再一次進入循環(huán)
  • 如果添加成功就調用addCount()方法統(tǒng)計size,并且檢查是否需要擴容

現(xiàn)在我們來對每一步的細節(jié)進行源碼分析,在第一步中,符合條件會進行初始化操作,我們來看看initTable()方法

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {//空的table才能進入初始化操作
        if ((sc = sizeCtl) < 0) //sizeCtl<0表示其他線程已經在初始化了或者擴容了,掛起當前線程 
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS操作SIZECTL為-1,表示初始化狀態(tài)
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化
                    table = tab = nt;
                    sc = n - (n >>> 2);//記錄下次擴容的大小
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

在第二步中沒有hash沖突就直接調用Unsafe的方法CAS插入該元素,進入第三步如果容器正在擴容,則會調用helpTransfer()方法幫助擴容,現(xiàn)在我們跟進helpTransfer()方法看看

/**
 *幫助從舊的table的元素復制到新的table中
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //新的table nextTba已經存在前提下才能幫助擴容
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);//調用擴容方法
                break;
            }
        }
        return nextTab;
    }
    return table;
}

其實helpTransfer()方法的目的就是調用多個工作線程一起幫助進行擴容,這樣的效率就會更高,而不是只有檢查到要擴容的那個線程進行擴容操作,其他線程就要等待擴容操作完成才能工作。

既然這里涉及到擴容的操作,我們也一起來看看擴容方法transfer()

擴容原理

該方法的執(zhí)行邏輯如下:

  • 計算每個線程可以處理的桶區(qū)間。默認 16.
  • 初始化臨時變量 nextTable,擴容 2 倍。
  • 死循環(huán),計算下標。完成總體判斷。

如果桶內有數(shù)據(jù),同步轉移數(shù)據(jù)。通常會像鏈表拆成 2 份。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 將 length / 8 然后除以 CPU核心數(shù)。如果得到的結果小于 16,那么就使用 16。
    // 這里的目的是讓每個 CPU 處理的桶一樣多,避免出現(xiàn)轉移任務不均勻的現(xiàn)象,如果桶較少的話,默認一個 CPU(一個線程)處理 16 個桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // stride 一步的距離
    // 新的 table 尚未初始化
    if (nextTab == null) {            // initiating
        try {
            //構建一個nextTable對象,其容量為原來容量的兩倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];        
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            // 擴容失敗, sizeCtl 使用 int 最大值。
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 更新成員變量
        nextTable = nextTab;
        // 更新轉移下標,就是 老的 tab 的 length
        transferIndex = n;
    }
    // 新 tab 的 length
    int nextn = nextTab.length;
    // 創(chuàng)建一個 fwd 節(jié)點,用于占位。當別的線程發(fā)現(xiàn)這個槽位中是 fwd 類型的節(jié)點,則跳過這個節(jié)點。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 當advance == true時,表明該節(jié)點已經處理過了
    boolean advance = true;
    // 完成狀態(tài),如果是 true,就結束此方法。
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 死循環(huán),i 表示下標,bound 表示當前線程可以處理的當前桶區(qū)間最小下標
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 控制 --i ,遍歷原h(huán)ash表中的節(jié)點
        while (advance) {
            int nextIndex, nextBound;
            // 對 i 減一,判斷是否大于等于 bound (正常情況下,如果大于 bound 不成立,說明該線程上次領取的任務已經完成了。那么,需要在下面繼續(xù)領取任務)
            // 如果對 i 減一大于等于 bound(還需要繼續(xù)做任務),或者完成了,修改推進狀態(tài)為 false,不能推進了。任務成功后修改推進狀態(tài)為 true。
            // 通常,第一次進入循環(huán),i-- 這個判斷會無法通過,從而走下面的 nextIndex 賦值操作(獲取最新的轉移下標)。其余情況都是:如果可以推進,將 i 減一,然后修改成不可推進。如果 i 對應的桶處理成功了,改成可以推進。
            if (--i >= bound || finishing)
                advance = false; // 這里設置 false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進
            // 這里的目的是:1. 當一個線程進入時,會選取最新的轉移下標。2. 當一個線程處理完自己的區(qū)間時,如果還有剩余區(qū)間的沒有別的線程處理。再次獲取區(qū)間。
            else if ((nextIndex = transferIndex) <= 0) {
                // 如果小于等于0,說明沒有區(qū)間了 ,i 改成 -1,推進狀態(tài)變成 false,不再推進,表示,擴容結束了,當前線程可以退出了
                // 這個 -1 會在下面的 if 塊里判斷,從而進入完成狀態(tài)判斷
                i = -1;
                advance = false;
            }
            // CAS 修改 transferIndex,即 length - 區(qū)間值,留下剩余的區(qū)間值供后面的線程使用
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound; // 這個值就是當前線程可以處理的最小當前區(qū)間最小下標
                i = nextIndex - 1;  // 初次對i 賦值,這個就是當前線程可以處理的當前區(qū)間的最大下標
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 已經完成所有節(jié)點復制了
            if (finishing) {
                nextTable = null;
                table = nextTab;        // table 指向nextTable
                sizeCtl = (n << 1) - (n >>> 1);     // sizeCtl閾值為原來的1.5倍
                return;     // 跳出死循環(huán),
            }
            // CAS 更擴容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴容操作
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 遍歷的節(jié)點為null,則放入到ForwardingNode 指針節(jié)點
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // f.hash == -1 表示遍歷到了ForwardingNode節(jié)點,意味著該節(jié)點已經處理過了
        // 這里是控制并發(fā)擴容的核心
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 節(jié)點加鎖
            synchronized (f) {
                // 節(jié)點復制工作
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // fh >= 0 ,表示為鏈表節(jié)點
                    if (fh >= 0) {
                        // 構造兩個鏈表  一個是原鏈表  另一個是原鏈表的反序排列
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 在nextTable i 位置處插上鏈表
                        setTabAt(nextTab, i, ln);
                        // 在nextTable i + n 位置處插上鏈表
                        setTabAt(nextTab, i + n, hn);
                        // 在table i 位置處插上ForwardingNode 表示該節(jié)點已經處理過了
                        setTabAt(tab, i, fwd);
                        // advance = true 可以執(zhí)行--i動作,遍歷節(jié)點
                        advance = true;
                    }
                    // 如果是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 擴容后樹節(jié)點個數(shù)若<=6,將樹轉鏈表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

擴容過程有點復雜,這里主要涉及到多線程并發(fā)擴容,ForwardingNode的作用就是支持擴容操作,將已處理的節(jié)點和空節(jié)點置為ForwardingNode,并發(fā)處理時多個線程經過ForwardingNode就表示已經遍歷了,就往后遍歷,下圖是多線程合作擴容的過程:

介紹完擴容過程,我們再次回到put流程,在第四步中是向鏈表或者紅黑樹里加節(jié)點,到第五步,會調用treeifyBin()方法進行鏈表轉紅黑樹的過程。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //如果整個table的數(shù)量小于64,就擴容至原來的一倍,不轉紅黑樹了
        //因為這個閾值擴容可以減少hash沖突,不必要去轉紅黑樹
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY) 
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        //封裝成TreeNode
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //通過TreeBin對象對TreeNode轉換成紅黑樹
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

到第六步表示已經數(shù)據(jù)加入成功了,現(xiàn)在調用addCount()方法計算ConcurrentHashMap的size,在原來的基礎上加一,現(xiàn)在來看看addCount()方法。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //更新baseCount,table的數(shù)量,counterCells表示元素個數(shù)的變化
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        //如果多個線程都在執(zhí)行,則CAS失敗,執(zhí)行fullAddCount,全部加入count
        if (as == null || (m = as.length - 1) < 0 || 
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
     //check>=0表示需要進行擴容操作
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            //當前線程發(fā)起庫哦哦讓操作,nextTable=null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

put的流程現(xiàn)在已經分析完了,你可以從中發(fā)現(xiàn),他在并發(fā)處理中使用的是樂觀鎖,當有沖突的時候才進行并發(fā)處理,而且流程步驟很清晰,但是細節(jié)設計的很復雜,畢竟多線程的場景也復雜。

get操作

我們現(xiàn)在要回到開始的例子中,我們對個人信息進行了新增之后,我們要獲取所新增的信息,使用String name = map.get(“name”)獲取新增的name信息,現(xiàn)在我們依舊用debug的方式來分析下ConcurrentHashMap的獲取方法get()

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode()); //計算兩次hash
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {//讀取首節(jié)點的Node元素
        if ((eh = e.hash) == h) { //如果該節(jié)點就是首節(jié)點就返回
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //hash值為負值表示正在擴容,這個時候查的是ForwardingNode的find方法來定位到nextTable來
        //查找,查找到就返回
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {//既不是首節(jié)點也不是ForwardingNode,那就往下遍歷
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

ConcurrentHashMap的get操作的流程很簡單,也很清晰,可以分為三個步驟來描述

  • 計算hash值,定位到該table索引位置,如果是首節(jié)點符合就返回
  • 如果遇到擴容的時候,會調用標志正在擴容節(jié)點ForwardingNode的find方法,查找該節(jié)點,匹配就返回
  • 以上都不符合的話,就往下遍歷節(jié)點,匹配就返回,否則最后就返回null

size操作

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a; //變化的數(shù)量
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

在JDK1.8版本中,對于size的計算,在擴容和addCount()方法就已經有處理了,JDK1.7是在調用size()方法才去計算,其實在并發(fā)集合中去計算size是沒有多大的意義的,因為size是實時在變的,只能計算某一刻的大小,但是某一刻太快了,人的感知是一個時間段,所以并不是很精確。

總結與思考

其實可以看出JDK1.8版本的ConcurrentHashMap的數(shù)據(jù)結構已經接近HashMap,相對而言,ConcurrentHashMap只是增加了同步的操作來控制并發(fā),從JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+紅黑樹,相對而言,總結如下思考:

  • JDK1.8的實現(xiàn)降低鎖的粒度,JDK1.7版本鎖的粒度是基于Segment的,包含多個HashEntry,而JDK1.8鎖的粒度就是HashEntry(首節(jié)點)
  • JDK1.8版本的數(shù)據(jù)結構變得更加簡單,使得操作也更加清晰流暢,因為已經使用synchronized來進行同步,所以不需要分段鎖的概念,也就不需要Segment這種數(shù)據(jù)結構了,由于粒度的降低,實現(xiàn)的復雜度也增加了
  • JDK1.8使用紅黑樹來優(yōu)化鏈表,基于長度很長的鏈表的遍歷是一個很漫長的過程,而紅黑樹的遍歷效率是很快的,代替一定閾值的鏈表,這樣形成一個最佳拍檔
  • JDK1.8為什么使用內置鎖synchronized來代替重入鎖ReentrantLock,我覺得有以下幾點:
  • 因為粒度降低了,在相對而言的低粒度加鎖方式,synchronized并不比ReentrantLock差,在粗粒度加鎖中ReentrantLock可能通過Condition來控制各個低粒度的邊界,更加的靈活,而在低粒度中,Condition的優(yōu)勢就沒有了
  • JVM的開發(fā)團隊從來都沒有放棄synchronized,而且基于JVM的synchronized優(yōu)化空間更大,使用內嵌的關鍵字比使用API更加自然
  • 減少內存開銷 :在大量的數(shù)據(jù)操作下,對于JVM的內存壓力,基于API的ReentrantLock會開銷更多的內存,雖然不是瓶頸,但是也是一個選擇依據(jù)

以上就是ConcurrentHashMap線程安全及實現(xiàn)原理實例解析的詳細內容,更多關于ConcurrentHashMap線程安全的資料請關注腳本之家其它相關文章!

相關文章

  • Java對象方法的調用執(zhí)行過程詳解

    Java對象方法的調用執(zhí)行過程詳解

    這篇文章主要介紹了Java對象方法的調用執(zhí)行過程,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • cmd編譯運行java程序的方法

    cmd編譯運行java程序的方法

    本文主要介紹了cmd編譯運行java程序的方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • spring中websocket定時任務實現(xiàn)實時推送

    spring中websocket定時任務實現(xiàn)實時推送

    本文主要介紹了spring中websocket定時任務實現(xiàn)實時推送,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-01-01
  • Java線程使用同步鎖交替執(zhí)行打印奇數(shù)偶數(shù)的方法

    Java線程使用同步鎖交替執(zhí)行打印奇數(shù)偶數(shù)的方法

    這篇文章主要介紹了Java線程使用同步鎖交替執(zhí)行打印奇數(shù)偶數(shù)的方法。小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • Java Random.nextInt()方法原理解析

    Java Random.nextInt()方法原理解析

    這篇文章主要介紹了Java Random.nextInt()方法原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • Java如何優(yōu)雅關閉異步中的ExecutorService

    Java如何優(yōu)雅關閉異步中的ExecutorService

    在并發(fā)編程領域,Java的ExecutorService是線程池管理的關鍵接口,這篇文章主要為大家介紹了如何優(yōu)雅關閉異步中的ExecutorService,需要的可以了解下
    2025-02-02
  • Java全面細致講解Cookie與Session及kaptcha驗證碼的使用

    Java全面細致講解Cookie與Session及kaptcha驗證碼的使用

    web開發(fā)階段我們主要是瀏覽器和服務器之間來進行交互。瀏覽器和服務器之間的交互就像人和人之間進行交流一樣,但是對于機器來說,在一次請求之間只是會攜帶著本次請求的數(shù)據(jù)的,但是可能多次請求之間是會有聯(lián)系的,所以提供了會話機制
    2022-06-06
  • Struts2+Hibernate實現(xiàn)數(shù)據(jù)分頁的方法

    Struts2+Hibernate實現(xiàn)數(shù)據(jù)分頁的方法

    這篇文章主要介紹了Struts2+Hibernate實現(xiàn)數(shù)據(jù)分頁的方法,結合實例形式分析了Struts2結合Hibernate實現(xiàn)數(shù)據(jù)分頁的原理,步驟與相關實現(xiàn)代碼,需要的朋友可以參考下
    2016-03-03
  • 如何從eureka獲取服務的ip和端口號進行Http的調用

    如何從eureka獲取服務的ip和端口號進行Http的調用

    這篇文章主要介紹了如何從eureka獲取服務的ip和端口號進行Http的調用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java中callable的實現(xiàn)原理

    Java中callable的實現(xiàn)原理

    本文主要介紹了Java里的callable的實現(xiàn)原理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-03-03

最新評論