如何實(shí)現(xiàn)定時(shí)推送的具體方案
詳細(xì)內(nèi)容
詳細(xì)內(nèi)容大概分為4個(gè)部分,1.應(yīng)用場(chǎng)景 2.遇到問(wèn)題 3.設(shè)計(jì) 4.實(shí)現(xiàn) 5.運(yùn)行效果
1.應(yīng)用場(chǎng)景
需要定時(shí)推送數(shù)據(jù),且輕量化的實(shí)現(xiàn)。
2.遇到問(wèn)題
- 如果啟動(dòng)一個(gè)定時(shí)器去定時(shí)輪詢
- (1)輪詢效率比較低
- (2)每次掃庫(kù),已經(jīng)被執(zhí)行過(guò)記錄,仍然會(huì)被掃描(只是不會(huì)出現(xiàn)在結(jié)果集中),會(huì)做重復(fù)工作
- (3)時(shí)效性不夠好,如果每小時(shí)輪詢一次,最差的情況下會(huì)有時(shí)間誤差
- 如何利用“延時(shí)消息”,對(duì)于每個(gè)任務(wù)只觸發(fā)一次,保證效率的同時(shí)保證實(shí)時(shí)性,是今天要討論的問(wèn)題。
3.設(shè)計(jì)
高效延時(shí)消息,包含兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu):
- 環(huán)形隊(duì)列,例如可以創(chuàng)建一個(gè)包含3600個(gè)slot的環(huán)形隊(duì)列(本質(zhì)是個(gè)數(shù)組)
- 任務(wù)集合,環(huán)上每一個(gè)slot是一個(gè)Set
同時(shí),啟動(dòng)一個(gè)timer,這個(gè)timer每隔1s,在上述環(huán)形隊(duì)列中移動(dòng)一格,有一個(gè)Current Index指針來(lái)標(biāo)識(shí)正在檢測(cè)的slot。
Task結(jié)構(gòu)中有兩個(gè)很重要的屬性:
- Cycle-Num:當(dāng)Current Index第幾圈掃描到這個(gè)Slot時(shí),執(zhí)行任務(wù)
- Task-Function:需要執(zhí)行的任務(wù)指針
假設(shè)當(dāng)前Current Index指向第一格,當(dāng)有延時(shí)消息到達(dá)之后,例如希望3610秒之后,觸發(fā)一個(gè)延時(shí)消息任務(wù),只需:
- 計(jì)算這個(gè)Task應(yīng)該放在哪一個(gè)slot,現(xiàn)在指向1,3610秒之后,應(yīng)該是第11格,所以這個(gè)Task應(yīng)該放在第11個(gè)slot的Set中
- 計(jì)算這個(gè)Task的Cycle-Num,由于環(huán)形隊(duì)列是3600格(每秒移動(dòng)一格,正好1小時(shí)),這個(gè)任務(wù)是3610秒后執(zhí)行,所以應(yīng)該繞3610/3600=1圈之后再執(zhí)行,于是Cycle-Num=1
Current Index不停的移動(dòng),每秒移動(dòng)到一個(gè)新slot,這個(gè)slot中對(duì)應(yīng)的Set,每個(gè)Task看Cycle-Num是不是0:
- 如果不是0,說(shuō)明還需要多移動(dòng)幾圈,將Cycle-Num減1
- 如果是0,說(shuō)明馬上要執(zhí)行這個(gè)Task了,取出Task-Funciton執(zhí)行(可以用單獨(dú)的線程來(lái)執(zhí)行Task),并把這個(gè)Task從Set中刪除
使用了“延時(shí)消息”方案之后,“訂單48小時(shí)后關(guān)閉評(píng)價(jià)”的需求,只需將在訂單關(guān)閉時(shí),觸發(fā)一個(gè)48小時(shí)之后的延時(shí)消息即可:
- 無(wú)需再輪詢?nèi)坑唵?,效率?/li>
- 一個(gè)訂單,任務(wù)只執(zhí)行一次
- 時(shí)效性好,精確到秒(控制timer移動(dòng)頻率可以控制精度)
4.實(shí)現(xiàn)
首先寫(xiě)一個(gè)方案要理清楚自己的項(xiàng)目結(jié)構(gòu),我做了如下分層。

Interfaces , 這層里主要約束延遲消息隊(duì)列的隊(duì)列和消息任務(wù)行。
public interface IRingQueue<T>
{
/// <summary>
/// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
/// </summary>
/// <param name="delayTime">The specified task is executed after N seconds.</param>
/// <param name="action">Definitions of callback</param>
void Add(long delayTime,Action<T> action);
/// <summary>
/// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
/// </summary>
/// <param name="delayTime">The specified task is executed after N seconds.</param>
/// <param name="action">Definitions of callback.</param>
/// <param name="data">Parameters used in the callback function.</param>
void Add(long delayTime, Action<T> action, T data);
/// <summary>
/// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
/// </summary>
/// <param name="delayTime"></param>
/// <param name="action">Definitions of callback</param>
/// <param name="data">Parameters used in the callback function.</param>
/// <param name="id">Task ID, used when deleting tasks.</param>
void Add(long delayTime, Action<T> action, T data, long id);
/// <summary>
/// Remove tasks [need to know: where the task is, which specific task].
/// </summary>
/// <param name="index">Task slot location</param>
/// <param name="id">Task ID, used when deleting tasks.</param>
void Remove(long id);
/// <summary>
/// Launch queue.
/// </summary>
void Start();
}
public interface ITask
{
}
Achieves,這層里實(shí)現(xiàn)之前定義的接口,這里寫(xiě)成抽象類(lèi)是為了后面方便擴(kuò)展。
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DelayMessageApp.Interfaces;
namespace DelayMessageApp.Achieves.Base
{
public abstract class BaseQueue<T> : IRingQueue<T>
{
private long _pointer = 0L;
private ConcurrentBag<BaseTask<T>>[] _arraySlot;
private int ArrayMax;
/// <summary>
/// Ring queue.
/// </summary>
public ConcurrentBag<BaseTask<T>>[] ArraySlot
{
get { return _arraySlot ?? (_arraySlot = new ConcurrentBag<BaseTask<T>>[ArrayMax]); }
}
public BaseQueue(int arrayMax)
{
if (arrayMax < 60 && arrayMax % 60 == 0)
throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 .");
ArrayMax = arrayMax;
}
public void Add(long delayTime, Action<T> action)
{
Add(delayTime, action, default(T));
}
public void Add(long delayTime,Action<T> action,T data)
{
Add(delayTime, action, data,0);
}
public void Add(long delayTime, Action<T> action, T data,long id)
{
NextSlot(delayTime, out long cycle, out long pointer);
ArraySlot[pointer] = ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag<BaseTask<T>>());
var baseTask = new BaseTask<T>(cycle, action, data,id);
ArraySlot[pointer].Add(baseTask);
}
/// <summary>
/// Remove tasks based on ID.
/// </summary>
/// <param name="id"></param>
public void Remove(long id)
{
try
{
Parallel.ForEach(ArraySlot, (ConcurrentBag<BaseTask<T>> collection, ParallelLoopState state) =>
{
var resulTask = collection.FirstOrDefault(p => p.Id == id);
if (resulTask != null)
{
collection.TryTake(out resulTask);
state.Break();
}
});
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
public void Start()
{
while (true)
{
RightMovePointer();
Thread.Sleep(1000);
Console.WriteLine(DateTime.Now.ToString());
}
}
/// <summary>
/// Calculate the information of the next slot.
/// </summary>
/// <param name="delayTime">Delayed execution time.</param>
/// <param name="cycle">Number of turns.</param>
/// <param name="index">Task location.</param>
private void NextSlot(long delayTime, out long cycle,out long index)
{
try
{
var circle = delayTime / ArrayMax;
var second = delayTime % ArrayMax;
var current_pointer = GetPointer();
var queue_index = 0L;
if (delayTime - ArrayMax > ArrayMax)
{
circle = 1;
}
else if (second > ArrayMax)
{
circle += 1;
}
if (delayTime - circle * ArrayMax < ArrayMax)
{
second = delayTime - circle * ArrayMax;
}
if (current_pointer + delayTime >= ArrayMax)
{
cycle = (int)((current_pointer + delayTime) / ArrayMax);
if (current_pointer + second - ArrayMax < 0)
{
queue_index = current_pointer + second;
}
else if (current_pointer + second - ArrayMax > 0)
{
queue_index = current_pointer + second - ArrayMax;
}
}
else
{
cycle = 0;
queue_index = current_pointer + second;
}
index = queue_index;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
/// <summary>
/// Get the current location of the pointer.
/// </summary>
/// <returns></returns>
private long GetPointer()
{
return Interlocked.Read(ref _pointer);
}
/// <summary>
/// Reset pointer position.
/// </summary>
private void ReSetPointer()
{
Interlocked.Exchange(ref _pointer, 0);
}
/// <summary>
/// Pointer moves clockwise.
/// </summary>
private void RightMovePointer()
{
try
{
if (GetPointer() >= ArrayMax - 1)
{
ReSetPointer();
}
else
{
Interlocked.Increment(ref _pointer);
}
var pointer = GetPointer();
var taskCollection = ArraySlot[pointer];
if (taskCollection == null || taskCollection.Count == 0) return;
Parallel.ForEach(taskCollection, (BaseTask<T> task) =>
{
if (task.Cycle > 0)
{
task.SubCycleNumber();
}
if (task.Cycle <= 0)
{
taskCollection.TryTake(out task);
task.TaskAction(task.Data);
}
});
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
using System;
using System.Threading;
using DelayMessageApp.Interfaces;
namespace DelayMessageApp.Achieves.Base
{
public class BaseTask<T> : ITask
{
private long _cycle;
private long _id;
private T _data;
public Action<T> TaskAction { get; set; }
public long Cycle
{
get { return Interlocked.Read(ref _cycle); }
set { Interlocked.Exchange(ref _cycle, value); }
}
public long Id
{
get { return _id; }
set { _id = value; }
}
public T Data
{
get { return _data; }
set { _data = value; }
}
public BaseTask(long cycle, Action<T> action, T data,long id)
{
Cycle = cycle;
TaskAction = action;
Data = data;
Id = id;
}
public BaseTask(long cycle, Action<T> action,T data)
{
Cycle = cycle;
TaskAction = action;
Data = data;
}
public BaseTask(long cycle, Action<T> action)
{
Cycle = cycle;
TaskAction = action;
}
public void SubCycleNumber()
{
Interlocked.Decrement(ref _cycle);
}
}
}
Logic,這層主要實(shí)現(xiàn)調(diào)用邏輯,調(diào)用者最終只需要關(guān)心把任務(wù)放進(jìn)隊(duì)列并指定什么時(shí)候執(zhí)行就行了,根本不需要關(guān)心其它的任何信息。
public static void Start()
{
//1.Initialize queues of different granularity.
IRingQueue<NewsModel> minuteRingQueue = new MinuteQueue<NewsModel>();
//2.Open thread.
var lstTasks = new List<Task>
{
Task.Factory.StartNew(minuteRingQueue.Start)
};
//3.Add tasks performed in different periods.
minuteRingQueue.Add(5, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Trump's visit to China!" });
minuteRingQueue.Add(10, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Putin Pu's visit to China!" });
minuteRingQueue.Add(60, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Eisenhower's visit to China!" });
minuteRingQueue.Add(120, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Xi Jinping's visit to the US!" });
//3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop.
//F5 Run the program and see the effect.
Task.WaitAll(lstTasks.ToArray());
Console.Read();
}
Models,這層就是用來(lái)在延遲任務(wù)中帶入的數(shù)據(jù)模型類(lèi)而已了。自己用的時(shí)候換成任意自定義類(lèi)型都可以。
5.運(yùn)行效果

到此這篇關(guān)于如何實(shí)現(xiàn)定時(shí)推送的具體方案的文章就介紹到這了,希望對(duì)大家有所幫助,更多相關(guān)C#內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持腳本之家!
相關(guān)文章
C#實(shí)現(xiàn)DataTable,List和Json轉(zhuǎn)換的方法
這篇文章主要介紹了C#實(shí)現(xiàn)DataTable,List和Json轉(zhuǎn)換的方法,結(jié)合實(shí)例形式分析了DataTable、list、DataReader、DataSet等轉(zhuǎn)換成JSON的相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2016-08-08
c#讀寫(xiě)App.config,ConfigurationManager.AppSettings 不生效的解決方法
這篇文章主要介紹了c#讀寫(xiě)App.config,ConfigurationManager.AppSettings 不生效的解決方法,需要的朋友可以參考下2015-10-10
C#?將Excel轉(zhuǎn)為PDF時(shí)自定義表格紙張大小的代碼思路
這篇文章主要介紹了C#?將Excel轉(zhuǎn)為PDF時(shí)自定義表格紙張大小的代碼思路,轉(zhuǎn)換前的頁(yè)面大小設(shè)置為該版本中寫(xiě)入的新功能,在舊版本和免費(fèi)版本中暫不支持,感興趣的朋友跟隨小編一起看看實(shí)例代碼2021-11-11
datatable生成excel和excel插入圖片示例詳解
excel導(dǎo)出在C#代碼中應(yīng)用己經(jīng)很廣泛了,下面講了datatable生成excel、復(fù)制sheet頁(yè)、刪除sheet頁(yè)、選中sheet頁(yè)、另存excel文件、excel中插入圖片等功能2014-01-01
C#編程實(shí)現(xiàn)獲取文件夾中所有文件的文件名
這篇文章主要介紹了C#編程實(shí)現(xiàn)獲取文件夾中所有文件的文件名,可實(shí)現(xiàn)獲取特定目錄下制定類(lèi)型文件名稱(chēng)的功能,涉及C#針對(duì)文件與目錄的遍歷、查詢等操作相關(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11

