欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C#實(shí)現(xiàn)萬物皆可排序的隊(duì)列方法詳解

 更新時(shí)間:2022年07月04日 08:08:55   作者:波斯馬  
本文詳細(xì)講解了C#實(shí)現(xiàn)萬物皆可排序隊(duì)列的方法,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

需求

產(chǎn)品中需要向不同的客戶推送數(shù)據(jù),原來的實(shí)現(xiàn)是每條數(shù)據(jù)產(chǎn)生后就立即向客戶推送數(shù)據(jù),走的的是HTTP協(xié)議。因?yàn)槊織l數(shù)據(jù)都比較小,而數(shù)據(jù)生成的頻次也比較高,這就會(huì)頻繁的建立HTTP連接,而且每次HTTP傳輸中攜帶的業(yè)務(wù)數(shù)據(jù)都很小,對(duì)網(wǎng)絡(luò)的實(shí)際利用率不高。希望能夠提高網(wǎng)絡(luò)的利用率,并降低系統(tǒng)的負(fù)載。

分析

一個(gè)很自然的想法就是將多條數(shù)據(jù)一起發(fā)送,這里有幾個(gè)關(guān)鍵點(diǎn):

1、多條數(shù)據(jù)的聚合邏輯: 是攢夠幾條發(fā)送,還是按照時(shí)間周期發(fā)送。如果是攢夠幾條發(fā)送,在數(shù)據(jù)比較稀疏或者產(chǎn)生頻率不那么穩(wěn)定的時(shí)候,攢夠需要的數(shù)據(jù)條數(shù)可能比較困難,這時(shí)候還得需要一個(gè)過期時(shí)間,因?yàn)榭蛻艨赡芙邮懿涣颂嗟难舆t。既然不管怎樣都需要使用時(shí)間進(jìn)行控制,我這里索性就選擇按照時(shí)間周期發(fā)送了。思路是:自上次發(fā)送時(shí)間起,經(jīng)過了某個(gè)時(shí)長(zhǎng)之后,就發(fā)送客戶在這段時(shí)間內(nèi)產(chǎn)生的所有數(shù)據(jù)。

2、數(shù)據(jù)到期判斷方法:既然選擇了按照時(shí)間周期發(fā)送,那么就必須有辦法判斷是否到了發(fā)送時(shí)間。一個(gè)很簡(jiǎn)單的想法就是輪詢,把所有客戶輪詢一遍,看看誰的數(shù)據(jù)到期了,就發(fā)送誰的。這個(gè)算法的時(shí)間復(fù)雜度是O(N),如果客戶比較多,就會(huì)消耗過多的時(shí)間在這上邊。還有一個(gè)辦法:如果客戶按照時(shí)間排序好了,那么只需要取時(shí)間最早的客戶的數(shù)據(jù)時(shí)間判斷就好了,滿足就發(fā)送,一直向后找,直到獲取的客戶數(shù)據(jù)時(shí)間不符合條件,則退出處理,然后等一會(huì)再進(jìn)行判斷處理。這就需要有一個(gè)支持排序的數(shù)據(jù)結(jié)構(gòu),寫入數(shù)據(jù)時(shí)自動(dòng)排序,這種數(shù)據(jù)結(jié)構(gòu)的時(shí)間復(fù)雜度一般可以做到O(log(n))。對(duì)于這個(gè)數(shù)據(jù)結(jié)構(gòu)的讀寫操作原理上就是隊(duì)列的操作方式,只不過是個(gè)可排序的隊(duì)列。

3、區(qū)分客戶:不同客戶的數(shù)據(jù)接收地址不同,向具體某個(gè)客戶發(fā)送數(shù)據(jù)時(shí),應(yīng)該能比較方便的聚合他的數(shù)據(jù),最好是直接就能拿到需要發(fā)送的數(shù)據(jù)??梢允褂米值鋽?shù)據(jù)結(jié)構(gòu)來滿足這個(gè)需求,取某個(gè)客戶數(shù)據(jù)的時(shí)間復(fù)雜度可以降低到O(1)。

4、數(shù)據(jù)的安全性問題:如果程序在數(shù)據(jù)發(fā)送成功之前退出了,未發(fā)送的數(shù)據(jù)怎么辦?是還能繼續(xù)發(fā)送,還是就丟掉不管了。如果要在程序重啟后恢復(fù)未發(fā)送成功的數(shù)據(jù),則必須將數(shù)據(jù)同步到別的地方,比如持久化到磁盤。因?yàn)槲疫@里的數(shù)據(jù)安全性要求不高,丟失一些數(shù)據(jù)也是允許的,所以要發(fā)送的數(shù)據(jù)收到之后放到內(nèi)存就行了。

實(shí)現(xiàn)

上文提到可排序的數(shù)據(jù)結(jié)構(gòu),可以使用SortedList<TKey,TValue>,鍵是時(shí)間,值是這個(gè)時(shí)間產(chǎn)生了數(shù)據(jù)的客戶標(biāo)識(shí)列表。不過它的讀寫操作不是線程安全的,需要自己做同步,這里簡(jiǎn)單點(diǎn)就使用lock了。

對(duì)于不同客戶的數(shù)據(jù),為了方便獲取,使用Dictionary<TKey,TValue>來滿足,鍵是客戶的標(biāo)識(shí),值是累積的未發(fā)送客戶數(shù)據(jù)。這個(gè)數(shù)據(jù)讀寫也不是線程安全的,可以和SortedList的讀寫放到同一個(gè)lock中。

下邊是它們的定義:

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

插入數(shù)據(jù)的時(shí)候,需要先寫入SortedList,然后再寫入Dictionary。代碼邏輯比較簡(jiǎn)單,請(qǐng)看:

    public void Publish(TKey key, TValue value)
    {
        DateTime now = DateTime.Now;
        lock (_lock)
        {
            if (_queue.TryGetValue(now, out List<TKey>? keys))
            {
                if (!keys!.Contains(key))
                {
                    keys.Add(key);
                }
            }
            else
            {
                _queue.Add(now, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

對(duì)于消費(fèi)數(shù)據(jù),這里采用拉數(shù)據(jù)的模式。最開始寫的方法邏輯是:讀取一條數(shù)據(jù),處理它,然后從隊(duì)列中刪除。但是這個(gè)邏輯需要對(duì)隊(duì)列進(jìn)行讀寫,所以必須加鎖。一般處理數(shù)據(jù)比較耗時(shí),比如這里要通過HTTP發(fā)送數(shù)據(jù),加鎖的話就可能導(dǎo)致寫數(shù)據(jù)到隊(duì)列時(shí)阻塞的時(shí)間比較長(zhǎng)。所以這里實(shí)現(xiàn)的是把可以發(fā)送的數(shù)據(jù)全部提取出來,然后就釋放鎖,數(shù)據(jù)的處理放到鎖的外部實(shí)現(xiàn),這樣隊(duì)列的讀寫性能就比較好了。

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        DateTime now = DateTime.Now;

        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var first = _queue.First();
                var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
                if (diffMillseconds < _valueDequeueMillseconds)
                {
                    break;
                }

                var keys = first.Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }

這段代碼比較長(zhǎng)一些,我梳理下邏輯:取隊(duì)列的第一條數(shù)據(jù),判斷時(shí)間是否達(dá)到發(fā)送周期,未達(dá)到則直接退出,方法返回空列表。如果達(dá)到發(fā)送周期,則取出第一條數(shù)據(jù)中存儲(chǔ)的客戶標(biāo)識(shí),然后根據(jù)這些標(biāo)識(shí)獲取對(duì)應(yīng)的客戶未發(fā)送數(shù)據(jù),將這些數(shù)據(jù)按照客戶維度添加到返回列表中,將這些客戶及其數(shù)據(jù)從隊(duì)列中移除,返回有數(shù)據(jù)的列表。這里還增加了一個(gè)拉取數(shù)據(jù)的條數(shù)限制,方便根據(jù)業(yè)務(wù)實(shí)際情況進(jìn)行控制。

再來看一下怎么使用這個(gè)隊(duì)列,這里模擬多個(gè)生產(chǎn)者加一個(gè)消費(fèi)者,其實(shí)可以任意多個(gè)生產(chǎn)者和消費(fèi)者:

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
{
    var j = i;
    publishTasks.Add(Task.Factory.StartNew(() =>
    {
        int k = 0;
        while (true)
        {
            queue.Publish($"key_{k}", $"value_{j}_{k}");
            Thread.Sleep(15);
            k++;
        }
    }, TaskCreationOptions.LongRunning));
}

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var list = queue.Pull(100);
        if (list.Count <= 0)
        {
            Thread.Sleep(100);
            continue;
        }

        foreach (var item in list)
        {
            Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
        }
    }

}, TaskCreationOptions.LongRunning);

Task.WaitAll(publishTasks.ToArray());

以上就是針對(duì)這個(gè)特定需求實(shí)現(xiàn)的一個(gè)按照時(shí)間進(jìn)行排序的隊(duì)列。

萬物皆可排序的隊(duì)列
我們很容易想到,既然可以按照時(shí)間排序,那么按照別的數(shù)據(jù)類型排序也是可以的。這個(gè)數(shù)據(jù)結(jié)構(gòu)可以應(yīng)用的場(chǎng)景很多,比如按照權(quán)重排序的隊(duì)列、按照優(yōu)先級(jí)排序的隊(duì)列、按照年齡排序的隊(duì)列、按照銀行存款排序的隊(duì)列,等等。這就是一個(gè)萬物皆可排序的隊(duì)列。

我這里把主要代碼貼出來(完整代碼和示例請(qǐng)看文末):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
    Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();

    SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();

    readonly object _lock = new object();

    /// <summary>
    /// Create a new instance of SortedQueue
    /// </summary>
    public SortedQueue(int maxNumberOfMessageConsumedOnce)
    {
    }

    /// <summary>
    /// Publish a message to queue
    /// </summary>
    /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
    /// <param name="key">The message key.</param>
    /// <param name="value">The message value.</param>
    public void Publish(TSortKey sortKey, TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
            {
                keys.Add(key);
            }
            else
            {
                _queue.Add(sortKey, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }


    /// <summary>
    /// Pull a batch of messages.
    /// </summary>
    /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
    /// <returns></returns>
    public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var keys = _queue.First().Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }
}

代碼邏輯還是比較簡(jiǎn)單的,就不羅嗦了,如有問題歡迎留言交流。

再說數(shù)據(jù)安全

因?yàn)樵谶@個(gè)實(shí)現(xiàn)中所有待處理的數(shù)據(jù)都在內(nèi)存中,丟失數(shù)據(jù)會(huì)帶來一定的風(fēng)險(xiǎn),因?yàn)槲疫@個(gè)程序前邊還有一個(gè)隊(duì)列,即使程序崩潰了,也只損失沒處理的一小部分?jǐn)?shù)據(jù),業(yè)務(wù)上可以接受,所以這樣做沒有問題。如果你對(duì)這個(gè)程序感興趣,需要慎重考慮你的應(yīng)用場(chǎng)景。

來看看數(shù)據(jù)丟失可能發(fā)生的兩種情況:

一是數(shù)據(jù)還在隊(duì)列中時(shí)程序重啟了:對(duì)于這種情況,前文提到將數(shù)據(jù)同步到其它地方,比如寫入Redis、寫入數(shù)據(jù)庫、寫入磁盤等等。不過因?yàn)榫W(wǎng)絡(luò)IO、磁盤IO較慢,這往往會(huì)帶來吞吐量的大幅下降,想要保證一定的吞吐量,還得引入一些分片機(jī)制,又因?yàn)榉植际降牟豢煽?,可能還得增加一些容錯(cuò)容災(zāi)機(jī)制,比較復(fù)雜,可以參考Kafka。

二是數(shù)據(jù)處理的時(shí)候失敗了:對(duì)于這種情況,可以讓程序重試;但是如果異常導(dǎo)致程序崩潰了,數(shù)據(jù)已經(jīng)從內(nèi)存或者其它存儲(chǔ)中移除了,數(shù)據(jù)還是會(huì)發(fā)生丟失。這時(shí)候可以采用一個(gè)ACK機(jī)制,處理成功后向隊(duì)列發(fā)送一個(gè)ACK,攜帶已經(jīng)處理的數(shù)據(jù)標(biāo)識(shí),隊(duì)列根據(jù)標(biāo)識(shí)刪除數(shù)據(jù)。否則消費(fèi)者還能消費(fèi)到這些數(shù)據(jù)。

這些問題并不一定要完全解決,還是得看業(yè)務(wù)場(chǎng)景,有可能你把數(shù)據(jù)持久化到Redis就夠了,或者你也不用引入ACK機(jī)制,記錄下處理到哪一條了就行了。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • C#實(shí)現(xiàn)gRPC服務(wù)和調(diào)用示例詳解

    C#實(shí)現(xiàn)gRPC服務(wù)和調(diào)用示例詳解

    gRPC?是一種與語言無關(guān)的高性能遠(yuǎn)程過程調(diào)用?(RPC)?框架,這篇文章主要為大家詳細(xì)介紹了C#如何實(shí)現(xiàn)gRPC服務(wù)和調(diào)用,需要的可以參考一下
    2024-01-01
  • c# 委托的本質(zhì)是什么

    c# 委托的本質(zhì)是什么

    這篇文章主要介紹了c# 委托的本質(zhì)是什么,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07
  • .net實(shí)現(xiàn)序列化與反序列化實(shí)例解析

    .net實(shí)現(xiàn)序列化與反序列化實(shí)例解析

    這篇文章主要介紹了.net實(shí)現(xiàn)序列化與反序列化實(shí)例解析,需要的朋友可以參考下
    2014-08-08
  • C# Ado.net實(shí)現(xiàn)讀取SQLServer數(shù)據(jù)庫存儲(chǔ)過程列表及參數(shù)信息示例

    C# Ado.net實(shí)現(xiàn)讀取SQLServer數(shù)據(jù)庫存儲(chǔ)過程列表及參數(shù)信息示例

    這篇文章主要介紹了C# Ado.net實(shí)現(xiàn)讀取SQLServer數(shù)據(jù)庫存儲(chǔ)過程列表及參數(shù)信息,結(jié)合實(shí)例形式總結(jié)分析了C#針對(duì)SQLServer數(shù)據(jù)庫存儲(chǔ)過程及參數(shù)信息的各種常見操作技巧,需要的朋友可以參考下
    2019-02-02
  • C#中調(diào)用Servlet示例

    C#中調(diào)用Servlet示例

    這篇文章主要介紹了C#中調(diào)用Servlet示例,本文實(shí)現(xiàn)通用消息接口使用servlet作為服務(wù)器端服務(wù)接口,第三方應(yīng)用程序通過http post的方式調(diào)用servlet,實(shí)現(xiàn)與通用消息接口的調(diào)用連接,需要的朋友可以參考下
    2015-05-05
  • C#處理文本文件TXT實(shí)例詳解

    C#處理文本文件TXT實(shí)例詳解

    這篇文章主要介紹了C#處理文本文件TXT的方法,以實(shí)例形式詳細(xì)分析了txt文本文件的讀取、修改及打印等功能的實(shí)現(xiàn)技巧,需要的朋友可以參考下
    2015-02-02
  • C# XML字符串包含特殊字符的處理轉(zhuǎn)換方法小結(jié)

    C# XML字符串包含特殊字符的處理轉(zhuǎn)換方法小結(jié)

    今天用C#輸出XML文件時(shí),發(fā)現(xiàn)報(bào)錯(cuò),經(jīng)過反復(fù)檢查調(diào)試,發(fā)現(xiàn)是因?yàn)槟程巸?nèi)容含有某些特殊字符,這些特殊字符是在XML里不被允許的
    2020-07-07
  • C#中的is和as操作符區(qū)別小結(jié)

    C#中的is和as操作符區(qū)別小結(jié)

    這篇文章主要介紹了C#中的is和as操作符區(qū)別小結(jié),is是驗(yàn)證操作對(duì)象是不是自己希望的,as是將對(duì)象轉(zhuǎn)換成指定類型,需要的朋友可以參考下
    2015-01-01
  • 淺析WPF中Popup彈出層的使用

    淺析WPF中Popup彈出層的使用

    這篇文章將通過一個(gè)簡(jiǎn)單的小例子,為大家詳細(xì)介紹一下如何在WPF開發(fā)中,通過Popup實(shí)現(xiàn)鼠標(biāo)點(diǎn)擊彈出浮動(dòng)停靠窗口,有需要的小伙伴可以了解下
    2024-01-01
  • C# 調(diào)用WebApi的實(shí)現(xiàn)

    C# 調(diào)用WebApi的實(shí)現(xiàn)

    這篇文章主要介紹了C# 調(diào)用WebApi的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04

最新評(píng)論