如何實現(xiàn)定時推送的具體方案
詳細內(nèi)容
詳細內(nèi)容大概分為4個部分,1.應(yīng)用場景 2.遇到問題 3.設(shè)計 4.實現(xiàn) 5.運行效果
1.應(yīng)用場景
需要定時推送數(shù)據(jù),且輕量化的實現(xiàn)。
2.遇到問題
- 如果啟動一個定時器去定時輪詢
- (1)輪詢效率比較低
- (2)每次掃庫,已經(jīng)被執(zhí)行過記錄,仍然會被掃描(只是不會出現(xiàn)在結(jié)果集中),會做重復(fù)工作
- (3)時效性不夠好,如果每小時輪詢一次,最差的情況下會有時間誤差
- 如何利用“延時消息”,對于每個任務(wù)只觸發(fā)一次,保證效率的同時保證實時性,是今天要討論的問題。
3.設(shè)計
高效延時消息,包含兩個重要的數(shù)據(jù)結(jié)構(gòu):
- 環(huán)形隊列,例如可以創(chuàng)建一個包含3600個slot的環(huán)形隊列(本質(zhì)是個數(shù)組)
- 任務(wù)集合,環(huán)上每一個slot是一個Set
同時,啟動一個timer,這個timer每隔1s,在上述環(huán)形隊列中移動一格,有一個Current Index指針來標識正在檢測的slot。
Task結(jié)構(gòu)中有兩個很重要的屬性:
- Cycle-Num:當(dāng)Current Index第幾圈掃描到這個Slot時,執(zhí)行任務(wù)
- Task-Function:需要執(zhí)行的任務(wù)指針
假設(shè)當(dāng)前Current Index指向第一格,當(dāng)有延時消息到達之后,例如希望3610秒之后,觸發(fā)一個延時消息任務(wù),只需:
- 計算這個Task應(yīng)該放在哪一個slot,現(xiàn)在指向1,3610秒之后,應(yīng)該是第11格,所以這個Task應(yīng)該放在第11個slot的Set中
- 計算這個Task的Cycle-Num,由于環(huán)形隊列是3600格(每秒移動一格,正好1小時),這個任務(wù)是3610秒后執(zhí)行,所以應(yīng)該繞3610/3600=1圈之后再執(zhí)行,于是Cycle-Num=1
Current Index不停的移動,每秒移動到一個新slot,這個slot中對應(yīng)的Set,每個Task看Cycle-Num是不是0:
- 如果不是0,說明還需要多移動幾圈,將Cycle-Num減1
- 如果是0,說明馬上要執(zhí)行這個Task了,取出Task-Funciton執(zhí)行(可以用單獨的線程來執(zhí)行Task),并把這個Task從Set中刪除
使用了“延時消息”方案之后,“訂單48小時后關(guān)閉評價”的需求,只需將在訂單關(guān)閉時,觸發(fā)一個48小時之后的延時消息即可:
- 無需再輪詢?nèi)坑唵危矢?/li>
- 一個訂單,任務(wù)只執(zhí)行一次
- 時效性好,精確到秒(控制timer移動頻率可以控制精度)
4.實現(xiàn)
首先寫一個方案要理清楚自己的項目結(jié)構(gòu),我做了如下分層。

Interfaces , 這層里主要約束延遲消息隊列的隊列和消息任務(wù)行。
public interface IRingQueue<T>
{
/// <summary>
/// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
/// </summary>
/// <param name="delayTime">The specified task is executed after N seconds.</param>
/// <param name="action">Definitions of callback</param>
void Add(long delayTime,Action<T> action);
/// <summary>
/// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
/// </summary>
/// <param name="delayTime">The specified task is executed after N seconds.</param>
/// <param name="action">Definitions of callback.</param>
/// <param name="data">Parameters used in the callback function.</param>
void Add(long delayTime, Action<T> action, T data);
/// <summary>
/// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
/// </summary>
/// <param name="delayTime"></param>
/// <param name="action">Definitions of callback</param>
/// <param name="data">Parameters used in the callback function.</param>
/// <param name="id">Task ID, used when deleting tasks.</param>
void Add(long delayTime, Action<T> action, T data, long id);
/// <summary>
/// Remove tasks [need to know: where the task is, which specific task].
/// </summary>
/// <param name="index">Task slot location</param>
/// <param name="id">Task ID, used when deleting tasks.</param>
void Remove(long id);
/// <summary>
/// Launch queue.
/// </summary>
void Start();
}
public interface ITask
{
}
Achieves,這層里實現(xiàn)之前定義的接口,這里寫成抽象類是為了后面方便擴展。
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DelayMessageApp.Interfaces;
namespace DelayMessageApp.Achieves.Base
{
public abstract class BaseQueue<T> : IRingQueue<T>
{
private long _pointer = 0L;
private ConcurrentBag<BaseTask<T>>[] _arraySlot;
private int ArrayMax;
/// <summary>
/// Ring queue.
/// </summary>
public ConcurrentBag<BaseTask<T>>[] ArraySlot
{
get { return _arraySlot ?? (_arraySlot = new ConcurrentBag<BaseTask<T>>[ArrayMax]); }
}
public BaseQueue(int arrayMax)
{
if (arrayMax < 60 && arrayMax % 60 == 0)
throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 .");
ArrayMax = arrayMax;
}
public void Add(long delayTime, Action<T> action)
{
Add(delayTime, action, default(T));
}
public void Add(long delayTime,Action<T> action,T data)
{
Add(delayTime, action, data,0);
}
public void Add(long delayTime, Action<T> action, T data,long id)
{
NextSlot(delayTime, out long cycle, out long pointer);
ArraySlot[pointer] = ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag<BaseTask<T>>());
var baseTask = new BaseTask<T>(cycle, action, data,id);
ArraySlot[pointer].Add(baseTask);
}
/// <summary>
/// Remove tasks based on ID.
/// </summary>
/// <param name="id"></param>
public void Remove(long id)
{
try
{
Parallel.ForEach(ArraySlot, (ConcurrentBag<BaseTask<T>> collection, ParallelLoopState state) =>
{
var resulTask = collection.FirstOrDefault(p => p.Id == id);
if (resulTask != null)
{
collection.TryTake(out resulTask);
state.Break();
}
});
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
public void Start()
{
while (true)
{
RightMovePointer();
Thread.Sleep(1000);
Console.WriteLine(DateTime.Now.ToString());
}
}
/// <summary>
/// Calculate the information of the next slot.
/// </summary>
/// <param name="delayTime">Delayed execution time.</param>
/// <param name="cycle">Number of turns.</param>
/// <param name="index">Task location.</param>
private void NextSlot(long delayTime, out long cycle,out long index)
{
try
{
var circle = delayTime / ArrayMax;
var second = delayTime % ArrayMax;
var current_pointer = GetPointer();
var queue_index = 0L;
if (delayTime - ArrayMax > ArrayMax)
{
circle = 1;
}
else if (second > ArrayMax)
{
circle += 1;
}
if (delayTime - circle * ArrayMax < ArrayMax)
{
second = delayTime - circle * ArrayMax;
}
if (current_pointer + delayTime >= ArrayMax)
{
cycle = (int)((current_pointer + delayTime) / ArrayMax);
if (current_pointer + second - ArrayMax < 0)
{
queue_index = current_pointer + second;
}
else if (current_pointer + second - ArrayMax > 0)
{
queue_index = current_pointer + second - ArrayMax;
}
}
else
{
cycle = 0;
queue_index = current_pointer + second;
}
index = queue_index;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
/// <summary>
/// Get the current location of the pointer.
/// </summary>
/// <returns></returns>
private long GetPointer()
{
return Interlocked.Read(ref _pointer);
}
/// <summary>
/// Reset pointer position.
/// </summary>
private void ReSetPointer()
{
Interlocked.Exchange(ref _pointer, 0);
}
/// <summary>
/// Pointer moves clockwise.
/// </summary>
private void RightMovePointer()
{
try
{
if (GetPointer() >= ArrayMax - 1)
{
ReSetPointer();
}
else
{
Interlocked.Increment(ref _pointer);
}
var pointer = GetPointer();
var taskCollection = ArraySlot[pointer];
if (taskCollection == null || taskCollection.Count == 0) return;
Parallel.ForEach(taskCollection, (BaseTask<T> task) =>
{
if (task.Cycle > 0)
{
task.SubCycleNumber();
}
if (task.Cycle <= 0)
{
taskCollection.TryTake(out task);
task.TaskAction(task.Data);
}
});
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
}
}
using System;
using System.Threading;
using DelayMessageApp.Interfaces;
namespace DelayMessageApp.Achieves.Base
{
public class BaseTask<T> : ITask
{
private long _cycle;
private long _id;
private T _data;
public Action<T> TaskAction { get; set; }
public long Cycle
{
get { return Interlocked.Read(ref _cycle); }
set { Interlocked.Exchange(ref _cycle, value); }
}
public long Id
{
get { return _id; }
set { _id = value; }
}
public T Data
{
get { return _data; }
set { _data = value; }
}
public BaseTask(long cycle, Action<T> action, T data,long id)
{
Cycle = cycle;
TaskAction = action;
Data = data;
Id = id;
}
public BaseTask(long cycle, Action<T> action,T data)
{
Cycle = cycle;
TaskAction = action;
Data = data;
}
public BaseTask(long cycle, Action<T> action)
{
Cycle = cycle;
TaskAction = action;
}
public void SubCycleNumber()
{
Interlocked.Decrement(ref _cycle);
}
}
}
Logic,這層主要實現(xiàn)調(diào)用邏輯,調(diào)用者最終只需要關(guān)心把任務(wù)放進隊列并指定什么時候執(zhí)行就行了,根本不需要關(guān)心其它的任何信息。
public static void Start()
{
//1.Initialize queues of different granularity.
IRingQueue<NewsModel> minuteRingQueue = new MinuteQueue<NewsModel>();
//2.Open thread.
var lstTasks = new List<Task>
{
Task.Factory.StartNew(minuteRingQueue.Start)
};
//3.Add tasks performed in different periods.
minuteRingQueue.Add(5, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Trump's visit to China!" });
minuteRingQueue.Add(10, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Putin Pu's visit to China!" });
minuteRingQueue.Add(60, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Eisenhower's visit to China!" });
minuteRingQueue.Add(120, new Action<NewsModel>((NewsModel newsObj) =>
{
Console.WriteLine(newsObj.News);
}), new NewsModel() { News = "Xi Jinping's visit to the US!" });
//3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop.
//F5 Run the program and see the effect.
Task.WaitAll(lstTasks.ToArray());
Console.Read();
}
Models,這層就是用來在延遲任務(wù)中帶入的數(shù)據(jù)模型類而已了。自己用的時候換成任意自定義類型都可以。
5.運行效果

到此這篇關(guān)于如何實現(xiàn)定時推送的具體方案的文章就介紹到這了,希望對大家有所幫助,更多相關(guān)C#內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持腳本之家!
相關(guān)文章
C#實現(xiàn)DataTable,List和Json轉(zhuǎn)換的方法
這篇文章主要介紹了C#實現(xiàn)DataTable,List和Json轉(zhuǎn)換的方法,結(jié)合實例形式分析了DataTable、list、DataReader、DataSet等轉(zhuǎn)換成JSON的相關(guān)實現(xiàn)技巧,需要的朋友可以參考下2016-08-08
c#讀寫App.config,ConfigurationManager.AppSettings 不生效的解決方法
這篇文章主要介紹了c#讀寫App.config,ConfigurationManager.AppSettings 不生效的解決方法,需要的朋友可以參考下2015-10-10
C#?將Excel轉(zhuǎn)為PDF時自定義表格紙張大小的代碼思路
這篇文章主要介紹了C#?將Excel轉(zhuǎn)為PDF時自定義表格紙張大小的代碼思路,轉(zhuǎn)換前的頁面大小設(shè)置為該版本中寫入的新功能,在舊版本和免費版本中暫不支持,感興趣的朋友跟隨小編一起看看實例代碼2021-11-11
datatable生成excel和excel插入圖片示例詳解
excel導(dǎo)出在C#代碼中應(yīng)用己經(jīng)很廣泛了,下面講了datatable生成excel、復(fù)制sheet頁、刪除sheet頁、選中sheet頁、另存excel文件、excel中插入圖片等功能2014-01-01

