C#請(qǐng)求唯一性校驗(yàn)支持高并發(fā)的實(shí)現(xiàn)方法
使用場(chǎng)景描述:
網(wǎng)絡(luò)請(qǐng)求中經(jīng)常會(huì)遇到發(fā)送的請(qǐng)求,服務(wù)端響應(yīng)是成功的,但是返回的時(shí)候出現(xiàn)網(wǎng)絡(luò)故障,導(dǎo)致客戶端無(wú)法接收到請(qǐng)求結(jié)果,那么客戶端程序可能判斷為網(wǎng)絡(luò)故障,而重復(fù)發(fā)送同一個(gè)請(qǐng)求。當(dāng)然如果接口中定義了請(qǐng)求結(jié)果查詢接口,那么這種重復(fù)會(huì)相對(duì)少一些。特別是交易類的數(shù)據(jù),這種操作更是需要避免重復(fù)發(fā)送請(qǐng)求。另外一種情況是用戶過(guò)于快速的點(diǎn)擊界面按鈕,產(chǎn)生連續(xù)的相同內(nèi)容請(qǐng)求,那么后端也需要進(jìn)行過(guò)濾,這種一般出現(xiàn)在系統(tǒng)對(duì)接上,無(wú)法去控制第三方系統(tǒng)的業(yè)務(wù)邏輯,需要從自身業(yè)務(wù)邏輯里面去限定。
其他需求描述:
這類請(qǐng)求一般存在時(shí)間范圍和高并發(fā)的特點(diǎn),就是短時(shí)間內(nèi)會(huì)出現(xiàn)重復(fù)的請(qǐng)求,因此對(duì)模塊需要支持高并發(fā)性。
技術(shù)實(shí)現(xiàn):
對(duì)請(qǐng)求的業(yè)務(wù)內(nèi)容進(jìn)行MD5摘要,并且將MD5摘要存儲(chǔ)到緩存中,每個(gè)請(qǐng)求數(shù)據(jù)都通過(guò)這個(gè)一個(gè)公共的調(diào)用的方法進(jìn)行判斷。
代碼實(shí)現(xiàn):
公共調(diào)用代碼 UniqueCheck 采用單例模式創(chuàng)建唯一對(duì)象,便于在多線程調(diào)用的時(shí)候,只訪問(wèn)一個(gè)統(tǒng)一的緩存庫(kù)
/* * volatile就像大家更熟悉的const一樣,volatile是一個(gè)類型修飾符(type specifier)。 * 它是被設(shè)計(jì)用來(lái)修飾被不同線程訪問(wèn)和修改的變量。 * 如果沒(méi)有volatile,基本上會(huì)導(dǎo)致這樣的結(jié)果:要么無(wú)法編寫(xiě)多線程程序,要么編譯器失去大量?jī)?yōu)化的機(jī)會(huì)。 */ private static readonly object lockHelper = new object(); private volatile static UniqueCheck _instance; /// <summary> /// 獲取單一實(shí)例 /// </summary> /// <returns></returns> public static UniqueCheck GetInstance() { if (_instance == null) { lock (lockHelper) { if (_instance == null) _instance = new UniqueCheck(); } } return _instance; }
這里需要注意volatile的修飾符,在實(shí)際測(cè)試過(guò)程中,如果沒(méi)有此修飾符,在高并發(fā)的情況下會(huì)出現(xiàn)報(bào)錯(cuò)。
自定義一個(gè)可以進(jìn)行并發(fā)處理隊(duì)列,代碼如下:ConcurrentLinkedQueue
using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace PackgeUniqueCheck { /// <summary> /// 非加鎖并發(fā)隊(duì)列,處理100個(gè)并發(fā)數(shù)以內(nèi) /// </summary> /// <typeparam name="T"></typeparam> public class ConcurrentLinkedQueue<T> { private class Node<K> { internal K Item; internal Node<K> Next; public Node(K item, Node<K> next) { this.Item = item; this.Next = next; } } private Node<T> _head; private Node<T> _tail; public ConcurrentLinkedQueue() { _head = new Node<T>(default(T), null); _tail = _head; } public bool IsEmpty { get { return (_head.Next == null); } } /// <summary> /// 進(jìn)入隊(duì)列 /// </summary> /// <param name="item"></param> public void Enqueue(T item) { Node<T> newNode = new Node<T>(item, null); while (true) { Node<T> curTail = _tail; Node<T> residue = curTail.Next; //判斷_tail是否被其他process改變 if (curTail == _tail) { //A 有其他process執(zhí)行C成功,_tail應(yīng)該指向新的節(jié)點(diǎn) if (residue == null) { //C 其他process改變了tail節(jié)點(diǎn),需要重新取tail節(jié)點(diǎn) if (Interlocked.CompareExchange<Node<T>>( ref curTail.Next, newNode, residue) == residue) { //D 嘗試修改tail Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); return; } } else { //B 幫助其他線程完成D操作 Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); } } } } /// <summary> /// 隊(duì)列取數(shù)據(jù) /// </summary> /// <param name="result"></param> /// <returns></returns> public bool TryDequeue(out T result) { Node<T> curHead; Node<T> curTail; Node<T> next; while (true) { curHead = _head; curTail = _tail; next = curHead.Next; if (curHead == _head) { if (next == null) //Queue為空 { result = default(T); return false; } if (curHead == curTail) //Queue處于Enqueue第一個(gè)node的過(guò)程中 { //嘗試幫助其他Process完成操作 Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); } else { //取next.Item必須放到CAS之前 result = next.Item; //如果_head沒(méi)有發(fā)生改變,則將_head指向next并退出 if (Interlocked.CompareExchange<Node<T>>(ref _head, next, curHead) == curHead) break; } } } return true; } /// <summary> /// 嘗試獲取最后一個(gè)對(duì)象 /// </summary> /// <param name="result"></param> /// <returns></returns> public bool TryGetTail(out T result) { result = default(T); if (_tail == null) { return false; } result = _tail.Item; return true; } } }
雖然是一個(gè)非常簡(jiǎn)單的唯一性校驗(yàn)邏輯,但是要做到高效率,高并發(fā)支持,高可靠性,以及低內(nèi)存占用,需要實(shí)現(xiàn)這樣的需求,需要做細(xì)致的模擬測(cè)試。
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Collections; namespace PackgeUniqueCheck { public class UniqueCheck { /* * volatile就像大家更熟悉的const一樣,volatile是一個(gè)類型修飾符(type specifier)。 * 它是被設(shè)計(jì)用來(lái)修飾被不同線程訪問(wèn)和修改的變量。 * 如果沒(méi)有volatile,基本上會(huì)導(dǎo)致這樣的結(jié)果:要么無(wú)法編寫(xiě)多線程程序,要么編譯器失去大量?jī)?yōu)化的機(jī)會(huì)。 */ private static readonly object lockHelper = new object(); private volatile static UniqueCheck _instance; /// <summary> /// 獲取單一實(shí)例 /// </summary> /// <returns></returns> public static UniqueCheck GetInstance() { if (_instance == null) { lock (lockHelper) { if (_instance == null) _instance = new UniqueCheck(); } } return _instance; } private UniqueCheck() { //創(chuàng)建一個(gè)線程安全的哈希表,作為字典緩存 _DataKey = Hashtable.Synchronized(new Hashtable()); Queue myqueue = new Queue(); _DataQueue = Queue.Synchronized(myqueue); _Myqueue = new ConcurrentLinkedQueue<string>(); _Timer = new Thread(DoTicket); _Timer.Start(); } #region 公共屬性設(shè)置 /// <summary> /// 設(shè)定定時(shí)線程的休眠時(shí)間長(zhǎng)度:默認(rèn)為1分鐘 /// 時(shí)間范圍:1-7200000,值為1毫秒到2小時(shí) /// </summary> /// <param name="value"></param> public void SetTimeSpan(int value) { if (value > 0&& value <=7200000) { _TimeSpan = value; } } /// <summary> /// 設(shè)定緩存Cache中的最大記錄條數(shù) /// 值范圍:1-5000000,1到500萬(wàn) /// </summary> /// <param name="value"></param> public void SetCacheMaxNum(int value) { if (value > 0 && value <= 5000000) { _CacheMaxNum = value; } } /// <summary> /// 設(shè)置是否在控制臺(tái)中顯示日志 /// </summary> /// <param name="value"></param> public void SetIsShowMsg(bool value) { Helper.IsShowMsg = value; } /// <summary> /// 線程請(qǐng)求阻塞增量 /// 值范圍:1-CacheMaxNum,建議設(shè)置為緩存最大值的10%-20% /// </summary> /// <param name="value"></param> public void SetBlockNumExt(int value) { if (value > 0 && value <= _CacheMaxNum) { _BlockNumExt = value; } } /// <summary> /// 請(qǐng)求阻塞時(shí)間 /// 值范圍:1-max,根據(jù)阻塞增量設(shè)置請(qǐng)求阻塞時(shí)間 /// 阻塞時(shí)間越長(zhǎng),阻塞增量可以設(shè)置越大,但是請(qǐng)求實(shí)時(shí)響應(yīng)就越差 /// </summary> /// <param name="value"></param> public void SetBlockSpanTime(int value) { if (value > 0) { _BlockSpanTime = value; } } #endregion #region 私有變量 /// <summary> /// 內(nèi)部運(yùn)行線程 /// </summary> private Thread _runner = null; /// <summary> /// 可處理高并發(fā)的隊(duì)列 /// </summary> private ConcurrentLinkedQueue<string> _Myqueue = null; /// <summary> /// 唯一內(nèi)容的時(shí)間健值對(duì) /// </summary> private Hashtable _DataKey = null; /// <summary> /// 內(nèi)容時(shí)間隊(duì)列 /// </summary> private Queue _DataQueue = null; /// <summary> /// 定時(shí)線程的休眠時(shí)間長(zhǎng)度:默認(rèn)為1分鐘 /// </summary> private int _TimeSpan = 3000; /// <summary> /// 定時(shí)計(jì)時(shí)器線程 /// </summary> private Thread _Timer = null; /// <summary> /// 緩存Cache中的最大記錄條數(shù) /// </summary> private int _CacheMaxNum = 500000; /// <summary> /// 線程請(qǐng)求阻塞增量 /// </summary> private int _BlockNumExt = 10000; /// <summary> /// 請(qǐng)求阻塞時(shí)間 /// </summary> private int _BlockSpanTime = 100; #endregion #region 私有方法 private void StartRun() { _runner = new Thread(DoAction); _runner.Start(); Helper.ShowMsg("內(nèi)部線程啟動(dòng)成功!"); } private string GetItem() { string tp = string.Empty; bool result = _Myqueue.TryDequeue(out tp); return tp; } /// <summary> /// 執(zhí)行循環(huán)操作 /// </summary> private void DoAction() { while (true) { while (!_Myqueue.IsEmpty) { string item = GetItem(); _DataQueue.Enqueue(item); if (!_DataKey.ContainsKey(item)) { _DataKey.Add(item, DateTime.Now); } } //Helper.ShowMsg("當(dāng)前數(shù)組已經(jīng)為空,處理線程進(jìn)入休眠狀態(tài)..."); Thread.Sleep(2); } } /// <summary> /// 執(zhí)行定時(shí)器的動(dòng)作 /// </summary> private void DoTicket() { while (true) { Helper.ShowMsg("當(dāng)前數(shù)據(jù)隊(duì)列個(gè)數(shù):" + _DataQueue.Count.ToString()); if (_DataQueue.Count > _CacheMaxNum) { while (true) { Helper.ShowMsg(string.Format("當(dāng)前隊(duì)列數(shù):{0},已經(jīng)超出最大長(zhǎng)度:{1},開(kāi)始進(jìn)行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString())); string item = _DataQueue.Dequeue().ToString(); if (!string.IsNullOrEmpty(item)) { if (_DataKey.ContainsKey(item)) { _DataKey.Remove(item); } if (_DataQueue.Count <= _CacheMaxNum) { Helper.ShowMsg("清理完成,開(kāi)始休眠清理線程..."); break; } } } } Thread.Sleep(_TimeSpan); } } /// <summary> /// 線程進(jìn)行睡眠等待 /// 如果當(dāng)前負(fù)載壓力大大超出了線程的處理能力 /// 那么需要進(jìn)行延時(shí)調(diào)用 /// </summary> private void BlockThread() { if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt) { Thread.Sleep(_BlockSpanTime); } } #endregion #region 公共方法 /// <summary> /// 開(kāi)啟服務(wù)線程 /// </summary> public void Start() { if (_runner == null) { StartRun(); } else { if (_runner.IsAlive == false) { StartRun(); } } } /// <summary> /// 關(guān)閉服務(wù)線程 /// </summary> public void Stop() { if (_runner != null) { _runner.Abort(); _runner = null; } } /// <summary> /// 添加內(nèi)容信息 /// </summary> /// <param name="item">內(nèi)容信息</param> /// <returns>true:緩存中不包含此值,隊(duì)列添加成功,false:緩存中包含此值,隊(duì)列添加失敗</returns> public bool AddItem(string item) { BlockThread(); item = Helper.MakeMd5(item); if (_DataKey.ContainsKey(item)) { return false; } else { _Myqueue.Enqueue(item); return true; } } /// <summary> /// 判斷內(nèi)容信息是否已經(jīng)存在 /// </summary> /// <param name="item">內(nèi)容信息</param> /// <returns>true:信息已經(jīng)存在于緩存中,false:信息不存在于緩存中</returns> public bool CheckItem(string item) { item = Helper.MakeMd5(item); return _DataKey.ContainsKey(item); } #endregion } }
模擬測(cè)試代碼:
private static string _example = Guid.NewGuid().ToString(); private static UniqueCheck _uck = null; static void Main(string[] args) { _uck = UniqueCheck.GetInstance(); _uck.Start(); _uck.SetIsShowMsg(false); _uck.SetCacheMaxNum(20000000); _uck.SetBlockNumExt(1000000); _uck.SetTimeSpan(6000); _uck.AddItem(_example); Thread[] threads = new Thread[20]; for (int i = 0; i < 20; i++) { threads[i] = new Thread(AddInfo); threads[i].Start(); } Thread checkthread = new Thread(CheckInfo); checkthread.Start(); string value = Console.ReadLine(); checkthread.Abort(); for (int i = 0; i < 50; i++) { threads[i].Abort(); } _uck.Stop(); } static void AddInfo() { while (true) { _uck.AddItem(Guid.NewGuid().ToString()); } } static void CheckInfo() { while (true) { Console.WriteLine("開(kāi)始時(shí)間:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); Console.WriteLine("插入結(jié)果:{0}", _uck.AddItem(_example)); Console.WriteLine("結(jié)束時(shí)間:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); //調(diào)整進(jìn)程休眠時(shí)間,可以測(cè)試高并發(fā)的情況 //Thread.Sleep(1000); } }
測(cè)試截圖:
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
解析使用enumerator模式簡(jiǎn)化異步操作的詳解
本篇文章是對(duì)使用enumerator模式簡(jiǎn)化異步操作進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05如何在C# 中查找或結(jié)束程序域中的主、子進(jìn)程
這篇文章主要介紹了如何在C# 中查找或結(jié)束程序域中的主、子進(jìn)程,幫助大家更好的理解和使用c#編程語(yǔ)言,感興趣的朋友可以了解下2020-11-11C#使用GUID(全局統(tǒng)一標(biāo)識(shí)符)
這篇文章介紹了C#使用GUID(全局統(tǒng)一標(biāo)識(shí)符)的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-04-04Winform中實(shí)現(xiàn)圖片格式轉(zhuǎn)換
這篇文章主要介紹了Winform中實(shí)現(xiàn)圖片格式轉(zhuǎn)換的示例代碼,幫助大家更好的理解和使用winform開(kāi)發(fā),感興趣的朋友可以了解下2020-12-12C#觀察者模式(Observer Pattern)實(shí)例教程
這篇文章主要介紹了C#觀察者模式(Observer Pattern),主要以一個(gè)實(shí)例的形式講述了C#觀察者模式的實(shí)現(xiàn)過(guò)程,詳細(xì)講述了接口的定義、通知及動(dòng)作的實(shí)現(xiàn)方法,需要的朋友可以參考下2014-09-09C# 開(kāi)發(fā)圓角控件(窗體)的具體實(shí)現(xiàn)
這篇文章主要介紹了C# 開(kāi)發(fā)圓角控件的具體實(shí)現(xiàn),需要的朋友可以參考下2014-02-02C#實(shí)現(xiàn)數(shù)據(jù)去重的方式總結(jié)
這篇文章主要來(lái)和大家一起來(lái)討論一下關(guān)于C#數(shù)據(jù)去重的常見(jiàn)的幾種方式,每種方法都有其特點(diǎn)和適用場(chǎng)景,感興趣的小伙伴可以了解一下2023-07-07