如何實現(xiàn)定時推送的具體方案
詳細內(nèi)容
詳細內(nèi)容大概分為4個部分,1.應用場景 2.遇到問題 3.設計 4.實現(xiàn) 5.運行效果
1.應用場景
需要定時推送數(shù)據(jù),且輕量化的實現(xiàn)。
2.遇到問題
- 如果啟動一個定時器去定時輪詢
- (1)輪詢效率比較低
- (2)每次掃庫,已經(jīng)被執(zhí)行過記錄,仍然會被掃描(只是不會出現(xiàn)在結(jié)果集中),會做重復工作
- (3)時效性不夠好,如果每小時輪詢一次,最差的情況下會有時間誤差
- 如何利用“延時消息”,對于每個任務只觸發(fā)一次,保證效率的同時保證實時性,是今天要討論的問題。
3.設計
高效延時消息,包含兩個重要的數(shù)據(jù)結(jié)構(gòu):
- 環(huán)形隊列,例如可以創(chuàng)建一個包含3600個slot的環(huán)形隊列(本質(zhì)是個數(shù)組)
- 任務集合,環(huán)上每一個slot是一個Set
同時,啟動一個timer,這個timer每隔1s,在上述環(huán)形隊列中移動一格,有一個Current Index指針來標識正在檢測的slot。
Task結(jié)構(gòu)中有兩個很重要的屬性:
- Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執(zhí)行任務
- Task-Function:需要執(zhí)行的任務指針
假設當前Current Index指向第一格,當有延時消息到達之后,例如希望3610秒之后,觸發(fā)一個延時消息任務,只需:
- 計算這個Task應該放在哪一個slot,現(xiàn)在指向1,3610秒之后,應該是第11格,所以這個Task應該放在第11個slot的Set中
- 計算這個Task的Cycle-Num,由于環(huán)形隊列是3600格(每秒移動一格,正好1小時),這個任務是3610秒后執(zhí)行,所以應該繞3610/3600=1圈之后再執(zhí)行,于是Cycle-Num=1
Current Index不停的移動,每秒移動到一個新slot,這個slot中對應的Set,每個Task看Cycle-Num是不是0:
- 如果不是0,說明還需要多移動幾圈,將Cycle-Num減1
- 如果是0,說明馬上要執(zhí)行這個Task了,取出Task-Funciton執(zhí)行(可以用單獨的線程來執(zhí)行Task),并把這個Task從Set中刪除
使用了“延時消息”方案之后,“訂單48小時后關(guān)閉評價”的需求,只需將在訂單關(guān)閉時,觸發(fā)一個48小時之后的延時消息即可:
- 無需再輪詢?nèi)坑唵危矢?/li>
- 一個訂單,任務只執(zhí)行一次
- 時效性好,精確到秒(控制timer移動頻率可以控制精度)
4.實現(xiàn)
首先寫一個方案要理清楚自己的項目結(jié)構(gòu),我做了如下分層。
Interfaces , 這層里主要約束延遲消息隊列的隊列和消息任務行。
public interface IRingQueue<T> { /// <summary> /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles] /// </summary> /// <param name="delayTime">The specified task is executed after N seconds.</param> /// <param name="action">Definitions of callback</param> void Add(long delayTime,Action<T> action); /// <summary> /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles] /// </summary> /// <param name="delayTime">The specified task is executed after N seconds.</param> /// <param name="action">Definitions of callback.</param> /// <param name="data">Parameters used in the callback function.</param> void Add(long delayTime, Action<T> action, T data); /// <summary> /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles] /// </summary> /// <param name="delayTime"></param> /// <param name="action">Definitions of callback</param> /// <param name="data">Parameters used in the callback function.</param> /// <param name="id">Task ID, used when deleting tasks.</param> void Add(long delayTime, Action<T> action, T data, long id); /// <summary> /// Remove tasks [need to know: where the task is, which specific task]. /// </summary> /// <param name="index">Task slot location</param> /// <param name="id">Task ID, used when deleting tasks.</param> void Remove(long id); /// <summary> /// Launch queue. /// </summary> void Start(); } public interface ITask { }
Achieves,這層里實現(xiàn)之前定義的接口,這里寫成抽象類是為了后面方便擴展。
using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; using DelayMessageApp.Interfaces; namespace DelayMessageApp.Achieves.Base { public abstract class BaseQueue<T> : IRingQueue<T> { private long _pointer = 0L; private ConcurrentBag<BaseTask<T>>[] _arraySlot; private int ArrayMax; /// <summary> /// Ring queue. /// </summary> public ConcurrentBag<BaseTask<T>>[] ArraySlot { get { return _arraySlot ?? (_arraySlot = new ConcurrentBag<BaseTask<T>>[ArrayMax]); } } public BaseQueue(int arrayMax) { if (arrayMax < 60 && arrayMax % 60 == 0) throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 ."); ArrayMax = arrayMax; } public void Add(long delayTime, Action<T> action) { Add(delayTime, action, default(T)); } public void Add(long delayTime,Action<T> action,T data) { Add(delayTime, action, data,0); } public void Add(long delayTime, Action<T> action, T data,long id) { NextSlot(delayTime, out long cycle, out long pointer); ArraySlot[pointer] = ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag<BaseTask<T>>()); var baseTask = new BaseTask<T>(cycle, action, data,id); ArraySlot[pointer].Add(baseTask); } /// <summary> /// Remove tasks based on ID. /// </summary> /// <param name="id"></param> public void Remove(long id) { try { Parallel.ForEach(ArraySlot, (ConcurrentBag<BaseTask<T>> collection, ParallelLoopState state) => { var resulTask = collection.FirstOrDefault(p => p.Id == id); if (resulTask != null) { collection.TryTake(out resulTask); state.Break(); } }); } catch (Exception e) { Console.WriteLine(e); } } public void Start() { while (true) { RightMovePointer(); Thread.Sleep(1000); Console.WriteLine(DateTime.Now.ToString()); } } /// <summary> /// Calculate the information of the next slot. /// </summary> /// <param name="delayTime">Delayed execution time.</param> /// <param name="cycle">Number of turns.</param> /// <param name="index">Task location.</param> private void NextSlot(long delayTime, out long cycle,out long index) { try { var circle = delayTime / ArrayMax; var second = delayTime % ArrayMax; var current_pointer = GetPointer(); var queue_index = 0L; if (delayTime - ArrayMax > ArrayMax) { circle = 1; } else if (second > ArrayMax) { circle += 1; } if (delayTime - circle * ArrayMax < ArrayMax) { second = delayTime - circle * ArrayMax; } if (current_pointer + delayTime >= ArrayMax) { cycle = (int)((current_pointer + delayTime) / ArrayMax); if (current_pointer + second - ArrayMax < 0) { queue_index = current_pointer + second; } else if (current_pointer + second - ArrayMax > 0) { queue_index = current_pointer + second - ArrayMax; } } else { cycle = 0; queue_index = current_pointer + second; } index = queue_index; } catch (Exception e) { Console.WriteLine(e); throw; } } /// <summary> /// Get the current location of the pointer. /// </summary> /// <returns></returns> private long GetPointer() { return Interlocked.Read(ref _pointer); } /// <summary> /// Reset pointer position. /// </summary> private void ReSetPointer() { Interlocked.Exchange(ref _pointer, 0); } /// <summary> /// Pointer moves clockwise. /// </summary> private void RightMovePointer() { try { if (GetPointer() >= ArrayMax - 1) { ReSetPointer(); } else { Interlocked.Increment(ref _pointer); } var pointer = GetPointer(); var taskCollection = ArraySlot[pointer]; if (taskCollection == null || taskCollection.Count == 0) return; Parallel.ForEach(taskCollection, (BaseTask<T> task) => { if (task.Cycle > 0) { task.SubCycleNumber(); } if (task.Cycle <= 0) { taskCollection.TryTake(out task); task.TaskAction(task.Data); } }); } catch (Exception e) { Console.WriteLine(e); throw; } } } } using System; using System.Threading; using DelayMessageApp.Interfaces; namespace DelayMessageApp.Achieves.Base { public class BaseTask<T> : ITask { private long _cycle; private long _id; private T _data; public Action<T> TaskAction { get; set; } public long Cycle { get { return Interlocked.Read(ref _cycle); } set { Interlocked.Exchange(ref _cycle, value); } } public long Id { get { return _id; } set { _id = value; } } public T Data { get { return _data; } set { _data = value; } } public BaseTask(long cycle, Action<T> action, T data,long id) { Cycle = cycle; TaskAction = action; Data = data; Id = id; } public BaseTask(long cycle, Action<T> action,T data) { Cycle = cycle; TaskAction = action; Data = data; } public BaseTask(long cycle, Action<T> action) { Cycle = cycle; TaskAction = action; } public void SubCycleNumber() { Interlocked.Decrement(ref _cycle); } } }
Logic,這層主要實現(xiàn)調(diào)用邏輯,調(diào)用者最終只需要關(guān)心把任務放進隊列并指定什么時候執(zhí)行就行了,根本不需要關(guān)心其它的任何信息。
public static void Start() { //1.Initialize queues of different granularity. IRingQueue<NewsModel> minuteRingQueue = new MinuteQueue<NewsModel>(); //2.Open thread. var lstTasks = new List<Task> { Task.Factory.StartNew(minuteRingQueue.Start) }; //3.Add tasks performed in different periods. minuteRingQueue.Add(5, new Action<NewsModel>((NewsModel newsObj) => { Console.WriteLine(newsObj.News); }), new NewsModel() { News = "Trump's visit to China!" }); minuteRingQueue.Add(10, new Action<NewsModel>((NewsModel newsObj) => { Console.WriteLine(newsObj.News); }), new NewsModel() { News = "Putin Pu's visit to China!" }); minuteRingQueue.Add(60, new Action<NewsModel>((NewsModel newsObj) => { Console.WriteLine(newsObj.News); }), new NewsModel() { News = "Eisenhower's visit to China!" }); minuteRingQueue.Add(120, new Action<NewsModel>((NewsModel newsObj) => { Console.WriteLine(newsObj.News); }), new NewsModel() { News = "Xi Jinping's visit to the US!" }); //3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop. //F5 Run the program and see the effect. Task.WaitAll(lstTasks.ToArray()); Console.Read(); }
Models,這層就是用來在延遲任務中帶入的數(shù)據(jù)模型類而已了。自己用的時候換成任意自定義類型都可以。
5.運行效果
到此這篇關(guān)于如何實現(xiàn)定時推送的具體方案的文章就介紹到這了,希望對大家有所幫助,更多相關(guān)C#內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持腳本之家!
相關(guān)文章
C#實現(xiàn)DataTable,List和Json轉(zhuǎn)換的方法
這篇文章主要介紹了C#實現(xiàn)DataTable,List和Json轉(zhuǎn)換的方法,結(jié)合實例形式分析了DataTable、list、DataReader、DataSet等轉(zhuǎn)換成JSON的相關(guān)實現(xiàn)技巧,需要的朋友可以參考下2016-08-08c#讀寫App.config,ConfigurationManager.AppSettings 不生效的解決方法
這篇文章主要介紹了c#讀寫App.config,ConfigurationManager.AppSettings 不生效的解決方法,需要的朋友可以參考下2015-10-10C#?將Excel轉(zhuǎn)為PDF時自定義表格紙張大小的代碼思路
這篇文章主要介紹了C#?將Excel轉(zhuǎn)為PDF時自定義表格紙張大小的代碼思路,轉(zhuǎn)換前的頁面大小設置為該版本中寫入的新功能,在舊版本和免費版本中暫不支持,感興趣的朋友跟隨小編一起看看實例代碼2021-11-11datatable生成excel和excel插入圖片示例詳解
excel導出在C#代碼中應用己經(jīng)很廣泛了,下面講了datatable生成excel、復制sheet頁、刪除sheet頁、選中sheet頁、另存excel文件、excel中插入圖片等功能2014-01-01