欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

深入線程安全容器的實(shí)現(xiàn)方法

 更新時(shí)間:2013年05月13日 10:48:36   作者:  
本篇文章是對(duì)線程安全容器的實(shí)現(xiàn)方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下

最近寫了個(gè)小程序用到了C#4.0中的線程安全集合。想起很久以前用C#2.0開發(fā)的時(shí)候?qū)懞笈_(tái)windows服務(wù),為了利用多線程實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型,經(jīng)常要封裝一些線程安全的容器,比如泛型隊(duì)列和字典等等。下面就結(jié)合部分MS的源碼和自己的開發(fā)經(jīng)驗(yàn)淺顯地分析一下如何實(shí)現(xiàn)線程安全容器以及實(shí)現(xiàn)線程安全容器容易產(chǎn)生的問題。

一、ArrayList

在C#早期版本中已經(jīng)實(shí)現(xiàn)了線程安全的ArrayList,可以通過下面的方式構(gòu)造線程安全的數(shù)組列表:

var array = ArrayList.Synchronized(new ArrayList());

我們從Synchronized方法入手,分析它的源代碼看是如何實(shí)現(xiàn)線程安全的:

復(fù)制代碼 代碼如下:

Synchronized        /// <summary>Returns an <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</summary>
        /// <returns>An <see cref="T:System.Collections.ArrayList" /> wrapper that is synchronized (thread safe).</returns>
        /// <param name="list">The <see cref="T:System.Collections.ArrayList" /> to synchronize. </param>
        /// <exception cref="T:System.ArgumentNullException">
        ///   <paramref name="list" /> is null. </exception>
        /// <filterpriority>2</filterpriority>
        [HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
        public static ArrayList Synchronized(ArrayList list)
        {
            if (list == null)
            {
                throw new ArgumentNullException("list");
            }
            return new ArrayList.SyncArrayList(list);
        }


繼續(xù)跟進(jìn)去,發(fā)現(xiàn)SyncArrayList是一個(gè)繼承自ArrayList的私有類,內(nèi)部線程安全方法的實(shí)現(xiàn)經(jīng)過分析,很多都是像下面這樣lock(注意是lock_root對(duì)象而不是數(shù)組列表實(shí)例對(duì)象)一下完事:

lock (this._root)

有心的你可以查看SyncArrayList的源碼:

復(fù)制代碼 代碼如下:

SyncArrayList        [Serializable]
        private class SyncArrayList : ArrayList
        {
            private ArrayList _list;
            private object _root;
            public override int Capacity
            {
                get
                {
                    int capacity;
                    lock (this._root)
                    {
                        capacity = this._list.Capacity;
                    }
                    return capacity;
                }
                set
                {
                    lock (this._root)
                    {
                        this._list.Capacity = value;
                    }
                }
            }
            public override int Count
            {
                get
                {
                    int count;
                    lock (this._root)
                    {
                        count = this._list.Count;
                    }
                    return count;
                }
            }
            public override bool IsReadOnly
            {
                get
                {
                    return this._list.IsReadOnly;
                }
            }
            public override bool IsFixedSize
            {
                get
                {
                    return this._list.IsFixedSize;
                }
            }
            public override bool IsSynchronized
            {
                get
                {
                    return true;
                }
            }
            public override object this[int index]
            {
                get
                {
                    object result;
                    lock (this._root)
                    {
                        result = this._list[index];
                    }
                    return result;
                }
                set
                {
                    lock (this._root)
                    {
                        this._list[index] = value;
                    }
                }
            }
            public override object SyncRoot
            {
                get
                {
                    return this._root;
                }
            }
            internal SyncArrayList(ArrayList list)
                : base(false)
            {
                this._list = list;
                this._root = list.SyncRoot;
            }
            public override int Add(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.Add(value);
                }
                return result;
            }
            public override void AddRange(ICollection c)
            {
                lock (this._root)
                {
                    this._list.AddRange(c);
                }
            }
            public override int BinarySearch(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(value);
                }
                return result;
            }
            public override int BinarySearch(object value, IComparer comparer)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(value, comparer);
                }
                return result;
            }
            public override int BinarySearch(int index, int count, object value, IComparer comparer)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(index, count, value, comparer);
                }
                return result;
            }
            public override void Clear()
            {
                lock (this._root)
                {
                    this._list.Clear();
                }
            }
            public override object Clone()
            {
                object result;
                lock (this._root)
                {
                    result = new ArrayList.SyncArrayList((ArrayList)this._list.Clone());
                }
                return result;
            }
            public override bool Contains(object item)
            {
                bool result;
                lock (this._root)
                {
                    result = this._list.Contains(item);
                }
                return result;
            }
            public override void CopyTo(Array array)
            {
                lock (this._root)
                {
                    this._list.CopyTo(array);
                }
            }
            public override void CopyTo(Array array, int index)
            {
                lock (this._root)
                {
                    this._list.CopyTo(array, index);
                }
            }
            public override void CopyTo(int index, Array array, int arrayIndex, int count)
            {
                lock (this._root)
                {
                    this._list.CopyTo(index, array, arrayIndex, count);
                }
            }
            public override IEnumerator GetEnumerator()
            {
                IEnumerator enumerator;
                lock (this._root)
                {
                    enumerator = this._list.GetEnumerator();
                }
                return enumerator;
            }
            public override IEnumerator GetEnumerator(int index, int count)
            {
                IEnumerator enumerator;
                lock (this._root)
                {
                    enumerator = this._list.GetEnumerator(index, count);
                }
                return enumerator;
            }
            public override int IndexOf(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value);
                }
                return result;
            }
            public override int IndexOf(object value, int startIndex)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value, startIndex);
                }
                return result;
            }
            public override int IndexOf(object value, int startIndex, int count)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value, startIndex, count);
                }
                return result;
            }
            public override void Insert(int index, object value)
            {
                lock (this._root)
                {
                    this._list.Insert(index, value);
                }
            }
            public override void InsertRange(int index, ICollection c)
            {
                lock (this._root)
                {
                    this._list.InsertRange(index, c);
                }
            }
            public override int LastIndexOf(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value);
                }
                return result;
            }
            public override int LastIndexOf(object value, int startIndex)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value, startIndex);
                }
                return result;
            }
            public override int LastIndexOf(object value, int startIndex, int count)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value, startIndex, count);
                }
                return result;
            }
            public override void Remove(object value)
            {
                lock (this._root)
                {
                    this._list.Remove(value);
                }
            }
            public override void RemoveAt(int index)
            {
                lock (this._root)
                {
                    this._list.RemoveAt(index);
                }
            }
            public override void RemoveRange(int index, int count)
            {
                lock (this._root)
                {
                    this._list.RemoveRange(index, count);
                }
            }
            public override void Reverse(int index, int count)
            {
                lock (this._root)
                {
                    this._list.Reverse(index, count);
                }
            }
            public override void SetRange(int index, ICollection c)
            {
                lock (this._root)
                {
                    this._list.SetRange(index, c);
                }
            }
            public override ArrayList GetRange(int index, int count)
            {
                ArrayList range;
                lock (this._root)
                {
                    range = this._list.GetRange(index, count);
                }
                return range;
            }
            public override void Sort()
            {
                lock (this._root)
                {
                    this._list.Sort();
                }
            }
            public override void Sort(IComparer comparer)
            {
                lock (this._root)
                {
                    this._list.Sort(comparer);
                }
            }
            public override void Sort(int index, int count, IComparer comparer)
            {
                lock (this._root)
                {
                    this._list.Sort(index, count, comparer);
                }
            }
            public override object[] ToArray()
            {
                object[] result;
                lock (this._root)
                {
                    result = this._list.ToArray();
                }
                return result;
            }
            public override Array ToArray(Type type)
            {
                Array result;
                lock (this._root)
                {
                    result = this._list.ToArray(type);
                }
                return result;
            }
            public override void TrimToSize()
            {
                lock (this._root)
                {
                    this._list.TrimToSize();
                }
            }
        }


ArrayList線程安全實(shí)現(xiàn)過程小結(jié):定義ArrayList的私有實(shí)現(xiàn)SyncArrayList,子類內(nèi)部通過lock同步構(gòu)造實(shí)現(xiàn)線程安全,在ArrayList中通過Synchronized對(duì)外間接調(diào)用子類。 

二、Hashtable

同樣,在C#早期版本中實(shí)現(xiàn)了線程安全的Hashtable,它也是早期開發(fā)中經(jīng)常用到的緩存容器,可以通過下面的方式構(gòu)造線程安全的哈希表:

var ht = Hashtable.Synchronized(new Hashtable());

同樣地,我們從Synchronized方法入手,分析它的源代碼看是如何實(shí)現(xiàn)線程安全的:

復(fù)制代碼 代碼如下:

Synchronized        /// <summary>Returns a synchronized (thread-safe) wrapper for the <see cref="T:System.Collections.Hashtable" />.</summary>
        /// <returns>A synchronized (thread-safe) wrapper for the <see cref="T:System.Collections.Hashtable" />.</returns>
        /// <param name="table">The <see cref="T:System.Collections.Hashtable" /> to synchronize. </param>
        /// <exception cref="T:System.ArgumentNullException">
        ///   <paramref name="table" /> is null. </exception>
        /// <filterpriority>1</filterpriority>
        [HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
        public static Hashtable Synchronized(Hashtable table)
        {
            if (table == null)
            {
                throw new ArgumentNullException("table");
            }
            return new Hashtable.SyncHashtable(table);
        }

繼續(xù)跟進(jìn)去,發(fā)現(xiàn)SyncHashtable是一個(gè)繼承自Hashtable和IEnumerable接口的私有類,內(nèi)部線程安全方法的實(shí)現(xiàn)經(jīng)過分析,很多都是像下面這樣lock(注意是lock哈希表的SyncRoot Object實(shí)例對(duì)象而不是哈希表實(shí)例)一下完事:

lock (this._table.SyncRoot)

貼一下SyncHashtable的源碼:

復(fù)制代碼 代碼如下:

 [Serializable]
        private class SyncHashtable : Hashtable, IEnumerable
        {
            protected Hashtable _table;
            public override int Count
            {
                get
                {
                    return this._table.Count;
                }
            }
            public override bool IsReadOnly
            {
                get
                {
                    return this._table.IsReadOnly;
                }
            }
            public override bool IsFixedSize
            {
                get
                {
                    return this._table.IsFixedSize;
                }
            }
            public override bool IsSynchronized
            {
                get
                {
                    return true;
                }
            }
            public override object this[object key]
            {
                [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
                get
                {
                    return this._table[key];
                }
                set
                {
                    lock (this._table.SyncRoot)
                    {
                        this._table[key] = value;
                    }
                }
            }
            public override object SyncRoot
            {
                get
                {
                    return this._table.SyncRoot;
                }
            }
            public override ICollection Keys
            {
                get
                {
                    ICollection keys;
                    lock (this._table.SyncRoot)
                    {
                        keys = this._table.Keys;
                    }
                    return keys;
                }
            }
            public override ICollection Values
            {
                get
                {
                    ICollection values;
                    lock (this._table.SyncRoot)
                    {
                        values = this._table.Values;
                    }
                    return values;
                }
            }
            internal SyncHashtable(Hashtable table)
                : base(false)
            {
                this._table = table;
            }
            internal SyncHashtable(SerializationInfo info, StreamingContext context)
                : base(info, context)
            {
                this._table = (Hashtable)info.GetValue("ParentTable", typeof(Hashtable));
                if (this._table == null)
                {
                    throw new SerializationException(Environment.GetResourceString("Serialization_InsufficientState"));
                }
            }
            [SecurityCritical]
            public override void GetObjectData(SerializationInfo info, StreamingContext context)
            {
                if (info == null)
                {
                    throw new ArgumentNullException("info");
                }
                lock (this._table.SyncRoot)
                {
                    info.AddValue("ParentTable", this._table, typeof(Hashtable));
                }
            }
            public override void Add(object key, object value)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Add(key, value);
                }
            }
            public override void Clear()
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Clear();
                }
            }
            [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
            public override bool Contains(object key)
            {
                return this._table.Contains(key);
            }
            [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
            public override bool ContainsKey(object key)
            {
                return this._table.ContainsKey(key);
            }
            public override bool ContainsValue(object key)
            {
                bool result;
                lock (this._table.SyncRoot)
                {
                    result = this._table.ContainsValue(key);
                }
                return result;
            }
            public override void CopyTo(Array array, int arrayIndex)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.CopyTo(array, arrayIndex);
                }
            }
            public override object Clone()
            {
                object result;
                lock (this._table.SyncRoot)
                {
                    result = Hashtable.Synchronized((Hashtable)this._table.Clone());
                }
                return result;
            }
            IEnumerator IEnumerable.GetEnumerator()
            {
                return this._table.GetEnumerator();
            }
            public override IDictionaryEnumerator GetEnumerator()
            {
                return this._table.GetEnumerator();
            }
            public override void Remove(object key)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Remove(key);
                }
            }
            public override void OnDeserialization(object sender)
            {
            }
            internal override KeyValuePairs[] ToKeyValuePairsArray()
            {
                return this._table.ToKeyValuePairsArray();
            }
        }


Hashtable線程安全實(shí)現(xiàn)過程小結(jié):定義Hashtable的私有實(shí)現(xiàn)SyncHashtable,子類內(nèi)部通過lock同步構(gòu)造實(shí)現(xiàn)線程安全,在Hashtable中通過Synchronized對(duì)外間接調(diào)用子類。

三、4.0中的線程安全容器

1、ConcurrentQueue

從上面的實(shí)現(xiàn)分析來說,封裝一個(gè)線程安全的容器看起來并不是什么難事,除了對(duì)線程安全容器的異常處理心有余悸,其他的似乎按步就班就可以了,不是嗎?也許還有更高明的實(shí)現(xiàn)吧?

在4.0中,多了一個(gè)System.Collections.Concurrent命名空間,懷著忐忑的心情查看C#4.0其中的一個(gè)線程安全集合ConcurrentQueue的源碼,發(fā)現(xiàn)它繼承自IProducerConsumerCollection<T>, IEnumerable<T>, ICollection, IEnumerable接口,內(nèi)部實(shí)現(xiàn)線程安全的時(shí)候,通過SpinWait和通過互鎖構(gòu)造(Interlocked)及SpinWait封裝的Segment,間接實(shí)現(xiàn)了線程安全。Segment的實(shí)現(xiàn)比較復(fù)雜,和線程安全密切相關(guān)的方法就是TryXXX那幾個(gè)方法,源碼如下:

復(fù)制代碼 代碼如下:

      private class Segment
        {
            internal T[] m_array;
            private int[] m_state;
            private ConcurrentQueue<T>.Segment m_next;
            internal readonly long m_index;
            private int m_low;
            private int m_high;
            internal ConcurrentQueue<T>.Segment Next
            {
                get
                {
                    return this.m_next;
                }
            }
            internal bool IsEmpty
            {
                get
                {
                    return this.Low > this.High;
                }
            }
            internal int Low
            {
                get
                {
                    return Math.Min(this.m_low, 32);
                }
            }
            internal int High
            {
                get
                {
                    return Math.Min(this.m_high, 31);
                }
            }
            internal Segment(long index)
            {
                this.m_array = new T[32];
                this.m_state = new int[32];
                this.m_high = -1;
                this.m_index = index;
            }
            internal void UnsafeAdd(T value)
            {
                this.m_high++;
                this.m_array[this.m_high] = value;
                this.m_state[this.m_high] = 1;
            }
            internal ConcurrentQueue<T>.Segment UnsafeGrow()
            {
                ConcurrentQueue<T>.Segment segment = new ConcurrentQueue<T>.Segment(this.m_index + 1L);
                this.m_next = segment;
                return segment;
            }
            internal void Grow(ref ConcurrentQueue<T>.Segment tail)
            {
                ConcurrentQueue<T>.Segment next = new ConcurrentQueue<T>.Segment(this.m_index + 1L);
                this.m_next = next;
                tail = this.m_next;
            }
            internal bool TryAppend(T value, ref ConcurrentQueue<T>.Segment tail)
            {
                if (this.m_high >= 31)
                {
                    return false;
                }
                int num = 32;
                try
                {
                }
                finally
                {
                    num = Interlocked.Increment(ref this.m_high);
                    if (num <= 31)
                    {
                        this.m_array[num] = value;
                        this.m_state[num] = 1;
                    }
                    if (num == 31)
                    {
                        this.Grow(ref tail);
                    }
                }
                return num <= 31;
            }
            internal bool TryRemove(out T result, ref ConcurrentQueue<T>.Segment head)
            {
                SpinWait spinWait = default(SpinWait);
                int i = this.Low;
                int high = this.High;
                while (i <= high)
                {
                    if (Interlocked.CompareExchange(ref this.m_low, i + 1, i) == i)
                    {
                        SpinWait spinWait2 = default(SpinWait);
                        while (this.m_state[i] == 0)
                        {
                            spinWait2.SpinOnce();
                        }
                        result = this.m_array[i];
                        if (i + 1 >= 32)
                        {
                            spinWait2 = default(SpinWait);
                            while (this.m_next == null)
                            {
                                spinWait2.SpinOnce();
                            }
                            head = this.m_next;
                        }
                        return true;
                    }
                    spinWait.SpinOnce();
                    i = this.Low;
                    high = this.High;
                }
                result = default(T);
                return false;
            }
            internal bool TryPeek(out T result)
            {
                result = default(T);
                int low = this.Low;
                if (low > this.High)
                {
                    return false;
                }
                SpinWait spinWait = default(SpinWait);
                while (this.m_state[low] == 0)
                {
                    spinWait.SpinOnce();
                }
                result = this.m_array[low];
                return true;
            }
            internal List<T> ToList(int start, int end)
            {
                List<T> list = new List<T>();
                for (int i = start; i <= end; i++)
                {
                    SpinWait spinWait = default(SpinWait);
                    while (this.m_state[i] == 0)
                    {
                        spinWait.SpinOnce();
                    }
                    list.Add(this.m_array[i]);
                }
                return list;
            }
        }


上面的代碼稍微分析一下就知道它的作用。ConcurrentQueue的線程安全的Enqueue方法實(shí)現(xiàn)如下:
復(fù)制代碼 代碼如下:

        /// <summary>Adds an object to the end of the <see cref="T:System.Collections.Concurrent.ConcurrentQueue`1" />.</summary>
        /// <param name="item">The object to add to the end of the <see cref="T:System.Collections.Concurrent.ConcurrentQueue`1" />. The value can be a null reference (Nothing in Visual Basic) for reference types.</param>
        public void Enqueue(T item)
        {
            SpinWait spinWait = default(SpinWait);
            while (true)
            {
                ConcurrentQueue<T>.Segment tail = this.m_tail;
                if (tail.TryAppend(item, ref this.m_tail))
                {
                    break;
                }
                spinWait.SpinOnce();
            }
        }

ConcurrentQueue<T>線程安全實(shí)現(xiàn)過程小結(jié):繼承接口,子類內(nèi)部通過同步構(gòu)造實(shí)現(xiàn)接口的線程安全,直接對(duì)外公開調(diào)用。

和ArrayList以及Hashtable線程安全的“曲折”實(shí)現(xiàn)有點(diǎn)不同,ConcurrentQueue<T>一開始就是朝著線程安全方向?qū)崿F(xiàn)去的。它沒有使用lock,因?yàn)榇蠹抑朗褂胠ock性能略差,對(duì)于讀和寫操作,應(yīng)該分開,不能一概而論。ConcurrentQueue<T>具體實(shí)現(xiàn)在性能和異常處理上應(yīng)該已經(jīng)考慮的更全面周到一點(diǎn)。

在我看來,ConcurrentQueue<T>線程安全的具體實(shí)現(xiàn)有多吸引人在其次,IProducerConsumerCollection<T>接口的抽象和提取非常值得稱道,查看源碼發(fā)現(xiàn)ConcurrentStack<T>和ConcurrentBag<T>也繼承自該接口。<<CLR via C#>>一書中在談到接口和抽象類的時(shí)候特別舉了集合和流(Stream)的例子,微軟為什么如此設(shè)計(jì),想起來果然很有深意。

復(fù)制代碼 代碼如下:

      public bool TryAdd(TKey key, TValue value)
        {
            if (key == null)
            {
                throw new ArgumentNullException("key");
            }
            TValue tValue;
            return this.TryAddInternal(key, value, false, true, out tValue);
        }

其中內(nèi)部方法TryAdd的主要實(shí)現(xiàn)如下:
復(fù)制代碼 代碼如下:

private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
        {
            checked
            {
                int hashCode = this.m_comparer.GetHashCode(key);
                ConcurrentDictionary<TKey, TValue>.Node[] buckets;
                bool flag;
                bool result;
                while (true)
                {
                    buckets = this.m_buckets;
                    int num;
                    int num2;
                    this.GetBucketAndLockNo(hashCode, out num, out num2, buckets.Length);
                    flag = false;
                    bool flag2 = false;
                    try
                    {
                        if (acquireLock)
                        {
                            Monitor.Enter(this.m_locks[num2], ref flag2);
                        }
                        if (buckets != this.m_buckets)
                        {
                            continue;
                        }
                        ConcurrentDictionary<TKey, TValue>.Node node = null;
                        for (ConcurrentDictionary<TKey, TValue>.Node node2 = buckets[num]; node2 != null; node2 = node2.m_next)
                        {
                            if (this.m_comparer.Equals(node2.m_key, key))
                            {
                                if (updateIfExists)
                                {
                                    ConcurrentDictionary<TKey, TValue>.Node node3 = new ConcurrentDictionary<TKey, TValue>.Node(node2.m_key, value, hashCode, node2.m_next);
                                    if (node == null)
                                    {
                                        buckets[num] = node3;
                                    }
                                    else
                                    {
                                        node.m_next = node3;
                                    }
                                    resultingValue = value;
                                }
                                else
                                {
                                    resultingValue = node2.m_value;
                                }
                                result = false;
                                return result;
                            }
                            node = node2;
                        }
                        buckets[num] = new ConcurrentDictionary<TKey, TValue>.Node(key, value, hashCode, buckets[num]);
                        this.m_countPerLock[num2]++;
                        if (this.m_countPerLock[num2] > buckets.Length / this.m_locks.Length)
                        {
                            flag = true;
                        }
                    }
                    finally
                    {
                        if (flag2)
                        {
                            Monitor.Exit(this.m_locks[num2]);
                        }
                    }
                    break;
                }
                if (flag)
                {
                    this.GrowTable(buckets);
                    goto IL_131;
                }
                goto IL_131;
                return result;
            IL_131:
                resultingValue = value;
                return true;
            }
        }


同步構(gòu)造Monitor瞬間吸引眼球,然后它的try…finally異常處理方式是不是也很眼熟?


 

 

2、ConcurrentDictionary<TKey, TValue>

對(duì)于線程安全的泛型字典ConcurrentDictionary<TKey, TValue>,我們也可以查看它的源碼看它的具體實(shí)現(xiàn)方式??丛创a有1200多行,實(shí)現(xiàn)稍微復(fù)雜一些。我們僅從最簡(jiǎn)單的TryAdd方法分析:

四、如法炮制

如果讓我來構(gòu)造實(shí)現(xiàn)線程安全容器,最簡(jiǎn)單直接快速高效的方式就是參考ArrayList和 Hashtable,我們完全可以模仿它們的處理方式,通過繼承一個(gè)容器,然后內(nèi)部通過lock一個(gè)SyncRoot對(duì)象,中規(guī)中矩地實(shí)現(xiàn)framework中其他容器的線程安全。比如要實(shí)現(xiàn)線程安全的泛型隊(duì)列Queue<T>,貼一下大致的偽代碼

復(fù)制代碼 代碼如下:

  private class SyncQueue<T> : Queue<T>
    {
        #region fields and properties

        private Queue<T> queue = null;
        private object syncRoot = null;
        internal object SyncRoot
        {
            get
            {
                return syncRoot;
            }
        }

        #endregion

        #region constructors

        public SyncQueue()
        {
            syncRoot = new object();
            queue = new Queue<T>();
        }

        public SyncQueue(IEnumerable<T> collection)
        {
            syncRoot = new object();
            queue = new Queue<T>(collection);
        }

        public SyncQueue(int capacity)
        {
            syncRoot = new object();
            queue = new Queue<T>(capacity);
        }

        #endregion

        #region methods

        public new void Enqueue(T item)
        {
            lock (SyncRoot)
            {
                this.Enqueue(item);
            }
        }

        public new T Dequeue()
        {
            T result = default(T);
            lock (SyncRoot)
            {
                result = this.queue.Dequeue();
            }
            return result;
        }

        public new void Clear()
        {
            lock (SyncRoot)
            {
                this.queue.Clear();
            }
        }

        public new bool Contains(T item)
        {
            var exists = false;
            lock (SyncRoot)
            {
                exists = this.queue.Contains(item);
            }
            return exists;
        }

        #endregion

    }


通過類繼承我們可以得到泛型隊(duì)列的所有特點(diǎn),需要實(shí)現(xiàn)線程安全的地方只要按需重寫它即可,對(duì)外調(diào)用也很簡(jiǎn)單,直接模仿ArrayList和Hashtable,添加Synchronized方法間接調(diào)用隊(duì)列的子類即可,多么清晰簡(jiǎn)潔啊,關(guān)鍵時(shí)刻copy-paste也很有效嘛!

你可能覺得上面這樣不動(dòng)腦的方式似乎很傻很天真,但這絕對(duì)是一種正常人都能想到的思路,誰(shuí)讓MS的數(shù)組列表和哈希表就是這么實(shí)現(xiàn)的呢?

當(dāng)然,我們還能想到的一種常見實(shí)現(xiàn)方式就是通過組合而不是類繼承,實(shí)現(xiàn)的偽代碼類似下面這樣:

復(fù)制代碼 代碼如下:

public class SyncQueue<T>
    {
        #region fields and properties

        private Queue<T> queue = null;
        private object syncRoot = null;
        internal object SyncRoot
        {
            get
            {
                return syncRoot;
            }
        }

        #endregion

        #region constructors

        public SyncQueue()
        {
            syncRoot = new object();
            queue = new Queue<T>();
        }

        public SyncQueue(IEnumerable<T> collection)
        {
            syncRoot = new object();
            queue = new Queue<T>(collection);
        }

        public SyncQueue(int capacity)
        {
            syncRoot = new object();
            queue = new Queue<T>(capacity);
        }

        #endregion

        #region methods

        public void Enqueue(T item)
        {
            lock (SyncRoot)
            {
                this.Enqueue(item);
            }
        }

        public T Dequeue()
        {
            T result = default(T);
            lock (SyncRoot)
            {
                result = this.queue.Dequeue();
            }
            return result;
        }

        public void Clear()
        {
            lock (SyncRoot)
            {
                this.queue.Clear();
            }
        }

        public bool Contains(T item)
        {
            var exists = false;
            lock (SyncRoot)
            {
                exists = this.queue.Contains(item);
            }
            return exists;
        }

        #endregion

    }


上面這種方式和類繼承的實(shí)現(xiàn)方式又有不同。它是通過在內(nèi)部包裝一個(gè)容器,然后按需進(jìn)行方法、屬性等等的線程安全處理,其他的所有特點(diǎn)依賴于那一個(gè)私有泛型隊(duì)列組合對(duì)象queue。這種情況下泛型SyncQueue和泛型隊(duì)列是組合關(guān)系,依賴性和耦合性更低,相對(duì)更靈活,封裝性更好,是一種較為通用的設(shè)計(jì),實(shí)際開發(fā)和使用中這種方式比較常見。

到這里,我們至少可以分析得出,實(shí)現(xiàn)一般的線程安全容器的思路至少有兩種:類繼承(內(nèi)部實(shí)現(xiàn)偏向使用組合)和(或)組合,線程安全的地方只要通過framework的同步構(gòu)造如lock、Interlocked等實(shí)現(xiàn)即可。

思考:如果讓您實(shí)現(xiàn)線程安全容器,您優(yōu)先會(huì)怎么實(shí)現(xiàn)呢?

五、線程安全并不真正安全

1、foreach遍歷

CacheUtil緩存實(shí)現(xiàn)的偽代碼如下:

復(fù)制代碼 代碼如下:

   public class CacheUtil
    {
        private static readonly Hashtable ht = Hashtable.Synchronized(new Hashtable());

        public static bool TryAdd(object key, object value)
        {
            ht[key] = value; //set方法是線程安全的
            return true;
        }

        public static bool TryGet(object key, out object result)
        {
            result = null;
            lock (ht.SyncRoot)
            {
                if (ht.ContainsKey(key))
                {
                    result = ht[key];
                }
            }
            return true;
        }
    }


foreach的代碼很簡(jiǎn)單,從哈希表構(gòu)造的緩存中取數(shù)據(jù)并遍歷,如下:
復(fù)制代碼 代碼如下:

          object obj = null;
            var isOK = CacheUtil.TryGet("key", out obj);
            if (isOK == false)
            {
                return;
            }
            var list = obj as IList<T>;
            if (list == null)
            {
                return;
            }
            foreach (var item in list) //遍歷
            {
                //do something
            }

上面的遍歷代碼一般情況下是不會(huì)有問題的。但是在多線程修改哈希表的Value的情況下,上面的foreach遍歷有可能發(fā)生異常。為什么呢?下面來簡(jiǎn)單分析一下:

從代碼中可以看出來,哈希表中的Value存放的是IList類型,那么值所保存的應(yīng)該是一個(gè)引用(也就是指針)。
(1)、當(dāng)線程1通過索引器得到這個(gè)IList時(shí),這個(gè)TryGet讀取操作是線程安全的。接著線程1進(jìn)行的操作是列表遍歷。在foreach進(jìn)行遍歷不為空的List的時(shí)候,遍歷的其實(shí)是存放在IList指針指向的引用。

(2)、在foreach遍歷集合的時(shí)候,這時(shí)候線程2如果正好對(duì)哈希表的key所對(duì)應(yīng)的Value進(jìn)行修改,IList的指針?biāo)赶虻囊酶淖兞?,所以線程1的遍歷操作就會(huì)拋出異常。

這是一個(gè)簡(jiǎn)單而又經(jīng)典的陷阱,在哈希表的MSDN線程安全塊有一段說明:

Enumerating through a collection is intrinsically not a thread safe procedure. Even when a collection is synchronized, other threads can still modify the collection, which causes the enumerator to throw an exception. To guarantee thread safety during enumeration, you can either lock the collection during the entire enumeration or catch the exceptions resulting from changes made by other threads.

 

2、通過索引取集合中的數(shù)據(jù)

列表通過索引取值,一個(gè)簡(jiǎn)單的示例代碼如下:

復(fù)制代碼 代碼如下:

        static int GetFirstOrDefault(ThreadSafeList<int> list)
        {
            if (list.Count > 0)
            {
                return list[0];
            }
            return 0;
        }

當(dāng)列表中的元素為1個(gè)的時(shí)候,上面的操作非常容易進(jìn)入一個(gè)無厘頭的陷阱之中。有人會(huì)問怎么會(huì)有陷阱,你看取數(shù)據(jù)之前都判斷了啊,邏輯正確了啊,這哪里還有問題嗎?

按照類似于1中的分析,GetFirstOrDefault應(yīng)該可以分為下面兩步:

(1)線程1取數(shù)據(jù),判斷l(xiāng)ist.Count的時(shí)候發(fā)現(xiàn)列表內(nèi)有1個(gè)元素,這一步線程安全,沒有任何問題,然后準(zhǔn)備返回索引為0的元素;

(2)線程2在線程1將要取索引為0的元素之前移除了列表中的唯一元素或者直接將list指向null,這樣線程1通過索引取元素就拋出異常了。

 

3、如何保證容器數(shù)據(jù)操作安全?

從上面的兩個(gè)示例,我們得知通常所看到的線程安全實(shí)際上并不一定安全。不安全的主要原因就是容器內(nèi)的數(shù)據(jù)很容易被其他線程改變,或者可以簡(jiǎn)要概括為:一段時(shí)間差引發(fā)的血案。實(shí)際上,我們平時(shí)所做的業(yè)務(wù)系統(tǒng),歸根結(jié)底很多bug或者隱藏的缺陷都是由不起眼的一小段時(shí)間差引起的。

保證容器內(nèi)的數(shù)據(jù)和操作都安全,一種簡(jiǎn)單而有效的方法就是將你所要進(jìn)行的操作進(jìn)行“事務(wù)”處理。比如示例1中哈希表的Value的遍歷操作,通常情況下,我們分作兩步:

(1)、(安全地)讀取數(shù)據(jù)

(2)、(不安全地)遍歷;

為了達(dá)到遍歷操作不拋出異常,我們可以把兩步合并為一步,抽象出一個(gè)線程安全的新方法TryGetAndEnumerate,這樣可以保證線程安全地取數(shù)據(jù)和遍歷,具體實(shí)現(xiàn)無非是lock一下SyncRoot類似的這種思路。但是這種線程安全的遍歷可能代價(jià)很高,而且極其不通用。

線程安全集合容易產(chǎn)生的問題和解決方法,請(qǐng)參考JaredPar MSFT的Why are thread safe collections so hard?,這篇文章對(duì)設(shè)計(jì)一個(gè)線程安全的容器的指導(dǎo)原則是:

1、Don't add an decision procedures(procedures like Count as decision procedures).  They lead users down the path to bad code.
2、Methods which query the object can always fail and the API should reflect this.

實(shí)際上大家都知道利用事務(wù)處理思想多用TryXXX方法一般是沒錯(cuò)的。

相關(guān)文章

最新評(píng)論