C#多線程系列之工作流實(shí)現(xiàn)
前言
前面學(xué)習(xí)了很多多線程和任務(wù)的基礎(chǔ)知識(shí),這里要來實(shí)踐一下啦。通過本篇教程,你可以寫出一個(gè)簡單的工作流引擎。
本篇教程內(nèi)容完成是基于任務(wù)的,只需要看過筆者的三篇關(guān)于異步的文章,掌握 C# 基礎(chǔ),即可輕松完成。
由于本篇文章編寫的工作流程序,主要使用任務(wù),有些邏輯過程會(huì)比較難理解,多測試一下就好。代碼主要還是 C# 基礎(chǔ),為什么說簡單?
- 不包含 async 、await
- 幾乎不含包含多線程(有個(gè)讀寫鎖)
- 不包含表達(dá)式樹
- 幾乎不含反射(有個(gè)小地方需要反射一下,但是非常簡單)
- 沒有復(fù)雜的算法
因?yàn)槭腔谌蝿?wù)(Task)的,所以可以輕松設(shè)計(jì)組合流程,組成復(fù)雜的工作流。
由于只是講述基礎(chǔ),所以不會(huì)包含很多種流程控制,這里只實(shí)現(xiàn)一些簡單的。
先說明,別用到業(yè)務(wù)上。。。這個(gè)工作流非常簡單,就幾個(gè)功能,這個(gè)工作流是基于筆者的多線程系列文章的知識(shí)點(diǎn)。寫這個(gè)東西是為了講解任務(wù)操作,讓讀者更加深入理解任務(wù)。
代碼地址:https://github.com/whuanle/CZGL.FLow
節(jié)點(diǎn)
在開始前,我們來設(shè)計(jì)幾種流程控制的東西。
將一個(gè) 步驟/流程/節(jié)點(diǎn) 稱為 step。
Then
一個(gè)普通的節(jié)點(diǎn),包含一個(gè)任務(wù)。
多個(gè) Then 節(jié)點(diǎn),可以組成一條連續(xù)的工作流。

Parallel
并行節(jié)點(diǎn),可以設(shè)置多個(gè)并行節(jié)點(diǎn)放到 Parallel 中,以及在里面為任一個(gè)節(jié)點(diǎn)創(chuàng)建新的分支。

Schedule
定時(shí)節(jié)點(diǎn),創(chuàng)建后會(huì)在一定時(shí)間后執(zhí)行節(jié)點(diǎn)中的任務(wù)。

Delay
讓當(dāng)前任務(wù)阻塞一段時(shí)間。

試用一下
順序節(jié)點(diǎn)
打開你的 VS ,創(chuàng)建項(xiàng)目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。
創(chuàng)建一個(gè)類 MyFlow1,繼承 IDoFlow。
public class MyFlow1 : IDoFlow
{
public int Id => 1;
public string Name => "隨便起個(gè)名字";
public int Version => 1;
public IDoFlowBuilder Build(IDoFlowBuilder builder)
{
throw new NotImplementedException();
}
}你可以創(chuàng)建多個(gè)工作流任務(wù),每個(gè)工作流的 Id 必須唯一。Name 和 Version 隨便填,因?yàn)檫@里筆者沒有對這幾個(gè)字段做邏輯。
IDoFlowBuilder 是構(gòu)建工作流的一個(gè)接口。
我們來寫一個(gè)工作流測試一下。
/// <summary>
/// 普通節(jié)點(diǎn) Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
public int Id => 1;
public string Name => "test";
public int Version => 1;
public IDoFlowBuilder Build(IDoFlowBuilder builder)
{
builder.StartWith(() =>
{
Console.WriteLine("工作流開始");
}).Then(() =>
{
Console.WriteLine("下一個(gè)節(jié)點(diǎn)");
}).Then(() =>
{
Console.WriteLine("最后一個(gè)節(jié)點(diǎn)");
});
return builder;
}
} Main 方法中:
static void Main(string[] args)
{
FlowCore.RegisterWorkflow<MyFlow1>();
// FlowCore.RegisterWorkflow(new MyFlow1());
FlowCore.Start(1);
Console.ReadKey();
}.StartWith() 方法開始一個(gè)工作流;
FlowCore.RegisterWorkflow<T>() 注冊一個(gè)工作流;
FlowCore.Start();執(zhí)行一個(gè)工作流;
并行任務(wù)
其代碼如下:
/// <summary>
/// 并行節(jié)點(diǎn) Parallel 使用方法
/// </summary>
public class MyFlow2 : IDoFlow
{
public int Id => 2;
public string Name => "test";
public int Version => 1;
public IDoFlowBuilder Build(IDoFlowBuilder builder)
{
builder.StartWith()
.Parallel(steps =>
{
// 每個(gè)并行任務(wù)也可以設(shè)計(jì)后面繼續(xù)執(zhí)行其它任務(wù)
steps.Do(() =>
{
Console.WriteLine("并行1");
}).Do(() =>
{
Console.WriteLine("并行2");
});
steps.Do(() =>
{
Console.WriteLine("并行3");
});
// 并行任務(wù)設(shè)計(jì)完成后,必須調(diào)用此方法
// 此方法必須放在所有并行任務(wù) .Do() 的最后
steps.EndParallel();
// 如果 .Do() 在 EndParallel() 后,那么不會(huì)等待此任務(wù)
steps.Do(() => { Console.WriteLine("并行異步"); });
// 開啟新的分支
steps.StartWith()
.Then(() =>
{
Console.WriteLine("新的分支" + Task.CurrentId);
}).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });
}, false)
.Then(() =>
{
Console.WriteLine("11111111111111111 ");
});
return builder;
}
}Main 方法中:
static void Main(string[] args)
{
FlowCore.RegisterWorkflow<MyFlow2>();
FlowCore.Start(2);
Console.ReadKey();
}通過以上示例,可以大概了解本篇文章中我們要寫的程序。
編寫工作流
建立一個(gè)類庫項(xiàng)目,名為 DoFlow。
建立 Extensions、Interfaces、Services 三個(gè)目錄。
接口構(gòu)建器
新建 IStepBuilder 接口文件到 Interfaces 目錄,其內(nèi)容如下:
using System;
namespace DoFlow.Interfaces
{
public interface IStepBuilder
{
/// <summary>
/// 普通節(jié)點(diǎn)
/// </summary>
/// <param name="stepBuilder"></param>
/// <returns></returns>
IStepBuilder Then(Action action);
/// <summary>
/// 多個(gè)節(jié)點(diǎn)
/// <para>默認(rèn)下,需要等待所有的任務(wù)完成,這個(gè)step才算完成</para>
/// </summary>
/// <param name="action"></param>
/// <param name="anyWait">任意一個(gè)任務(wù)完成即可跳轉(zhuǎn)到下一個(gè)step</param>
/// <returns></returns>
IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);
/// <summary>
/// 節(jié)點(diǎn)將在某個(gè)時(shí)間間隔后執(zhí)行
/// <para>異步,不會(huì)阻塞當(dāng)前工作流的運(yùn)行,計(jì)劃任務(wù)將在一段時(shí)間后觸發(fā)</para>
/// </summary>
/// <returns></returns>
IStepBuilder Schedule(Action action, TimeSpan time);
/// <summary>
/// 阻塞一段時(shí)間
/// </summary>
/// <param name="time"></param>
/// <returns></returns>
IStepBuilder Delay(TimeSpan time);
}
}新建 IStepParallel 文件到 Interfaces 目錄。
using System;
namespace DoFlow.Interfaces
{
/// <summary>
/// 并行任務(wù)
/// <para>默認(rèn)情況下,只有這個(gè)節(jié)點(diǎn)的所有并行任務(wù)都完成后,這個(gè)節(jié)點(diǎn)才算完成</para>
/// </summary>
public interface IStepParallel
{
/// <summary>
/// 一個(gè)并行任務(wù)
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
IStepParallel Do(Action action);
/// <summary>
/// 開始一個(gè)分支
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
IStepBuilder StartWith(Action action = null);
/// <summary>
/// 必須使用此方法結(jié)束一個(gè)并行任務(wù)
/// </summary>
void EndParallel();
}
/// <summary>
/// 并行任務(wù)
/// <para>任意一個(gè)任務(wù)完成后,就可以跳轉(zhuǎn)到下一個(gè) step</para>
/// </summary>
public interface IStepParallelAny : IStepParallel
{
}
}工作流構(gòu)建器
新建 IDoFlowBuilder 接口文件到 Interfaces 目錄。
using System;
using System.Threading.Tasks;
namespace DoFlow.Interfaces
{
/// <summary>
/// 構(gòu)建工作流任務(wù)
/// </summary>
public interface IDoFlowBuilder
{
/// <summary>
/// 開始一個(gè) step
/// </summary>
IStepBuilder StartWith(Action action = null);
void EndWith(Action action);
Task ThatTask { get; }
}
}新建 IDoFlow 接口文件到 Interfaces 目錄。
namespace DoFlow.Interfaces
{
/// <summary>
/// 工作流
/// <para>無參數(shù)傳遞</para>
/// </summary>
public interface IDoFlow
{
/// <summary>
/// 全局唯一標(biāo)識(shí)
/// </summary>
int Id { get; }
/// <summary>
/// 標(biāo)識(shí)此工作流的名稱
/// </summary>
string Name { get; }
/// <summary>
/// 標(biāo)識(shí)此工作流的版本
/// </summary>
int Version { get; }
IDoFlowBuilder Build(IDoFlowBuilder builder);
}
}依賴注入
新建 DependencyInjectionService 文件到 Services 目錄。
用于實(shí)現(xiàn)依賴注入和解耦。
using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;
namespace DoFlow.Services
{
/// <summary>
/// 依賴注入服務(wù)
/// </summary>
public static class DependencyInjectionService
{
private static IServiceCollection _servicesList;
private static IServiceProvider _services;
static DependencyInjectionService()
{
IServiceCollection services = new ServiceCollection();
_servicesList = services;
// 注入引擎需要的服務(wù)
InitExtension.StartInitExtension();
var serviceProvider = services.BuildServiceProvider();
_services = serviceProvider;
}
/// <summary>
/// 添加一個(gè)注入到容器服務(wù)
/// </summary>
/// <typeparam name="TService"></typeparam>
/// <typeparam name="TImplementation"></typeparam>
public static void AddService<TService, TImplementation>()
where TService : class
where TImplementation : class, TService
{
_servicesList.AddTransient<TService, TImplementation>();
}
/// <summary>
/// 獲取需要的服務(wù)
/// </summary>
/// <typeparam name="TIResult"></typeparam>
/// <returns></returns>
public static TIResult GetService<TIResult>()
{
TIResult Tservice = _services.GetService<TIResult>();
return Tservice;
}
}
}添加一個(gè) InitExtension 文件到 Extensions 目錄。
using DoFlow.Interfaces;
using DoFlow.Services;
namespace DoFlow.Extensions
{
public static class InitExtension
{
private static bool IsInit = false;
public static void StartInitExtension()
{
if (IsInit) return;
IsInit = true;
DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
}
}
}實(shí)現(xiàn)工作流解析
以下文件均在 Services 目錄建立。
新建 StepBuilder 文件,用于解析節(jié)點(diǎn),構(gòu)建任務(wù)。
using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;
namespace DoFlow.Services
{
/// <summary>
/// 節(jié)點(diǎn)工作引擎
/// </summary>
public class StepBuilder : IStepBuilder
{
private Task _task;
/// <summary>
/// 延遲執(zhí)行
/// </summary>
/// <param name="time"></param>
/// <returns></returns>
public IStepBuilder Delay(TimeSpan time)
{
Task.Delay(time).Wait();
return this;
}
/// <summary>
/// 并行 step
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
{
IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
Task task = new Task(() =>
{
action.Invoke(parallel);
});
_task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
{
task.Start();
});
_task = task;
return this;
}
/// <summary>
/// 計(jì)劃任務(wù)
/// </summary>
/// <param name="action"></param>
/// <param name="time"></param>
/// <returns></returns>
public IStepBuilder Schedule(Action action, TimeSpan time)
{
Task.Factory.StartNew(() =>
{
Task.Delay(time).Wait();
action.Invoke();
});
return this;
}
/// <summary>
/// 普通 step
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public IStepBuilder Then(Action action)
{
Task task = new Task(action);
_task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
{
task.Start();
task.Wait();
});
_task = task;
return this;
}
public void SetTask(Task task)
{
_task = task;
}
}
}新建 StepParallel 文件,里面有兩個(gè)類,用于實(shí)現(xiàn)同步任務(wù)。
using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace DoFlow.Services
{
/// <summary>
/// 第一層所有任務(wù)結(jié)束后才能跳轉(zhuǎn)下一個(gè) step
/// </summary>
public class StepParallelWhenAll : IStepParallel
{
private Task _task;
private readonly List<Task> _tasks = new List<Task>();
public StepParallelWhenAll()
{
_task = new Task(() => { },TaskCreationOptions.AttachedToParent);
}
public IStepParallel Do(Action action)
{
_tasks.Add(Task.Run(action));
return this;
}
public void EndParallel()
{
_task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
{
Task.WhenAll(_tasks).Wait();
});
}
public IStepBuilder StartWith(Action action = null)
{
Task task =
action is null ? new Task(() => { })
: new Task(action);
var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
_task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });
return _stepBuilder;
}
}
/// <summary>
/// 完成任意一個(gè)任務(wù)即可跳轉(zhuǎn)到下一個(gè) step
/// </summary>
public class StepParallelWhenAny : IStepParallelAny
{
private Task _task;
private readonly List<Task> _tasks = new List<Task>();
public StepParallelWhenAny()
{
_task = Task.Run(() => { });
}
public IStepParallel Do(Action action)
{
_tasks.Add(Task.Run(action));
return this;
}
public void EndParallel()
{
_task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
{
Task.WhenAny(_tasks).Wait();
});
}
public IStepBuilder StartWith(Action action = null)
{
Task task =
action is null ? new Task(() => { })
: new Task(action);
var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
_task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });
return _stepBuilder;
}
}
}新建 DoFlowBuilder 文件,用于構(gòu)建工作流。
using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;
namespace DoFlow.Services
{
public class DoFlowBuilder : IDoFlowBuilder
{
private Task _task;
public Task ThatTask => _task;
public void EndWith(Action action)
{
_task.Start();
}
public IStepBuilder StartWith(Action action = null)
{
if (action is null)
_task = new Task(() => { });
else _task = new Task(action);
IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
((StepBuilder)_stepBuilder).SetTask(_task);
return _stepBuilder;
}
}
}新建 FlowEngine 文件,用于執(zhí)行工作流。
using DoFlow.Interfaces;
namespace DoFlow.Services
{
/// <summary>
/// 工作流引擎
/// </summary>
public class FlowEngine
{
private readonly IDoFlow _flow;
public FlowEngine(IDoFlow flow)
{
_flow = flow;
}
/// <summary>
/// 開始一個(gè)工作流
/// </summary>
public void Start()
{
IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
_flow.Build(builder).ThatTask.Start();
}
}
}新建 FlowCore 文件,用于存儲(chǔ)和索引工作流。使用讀寫鎖解決并發(fā)字典問題。
using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;
namespace DoFlow.Services
{
public static class FlowCore
{
private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();
// 讀寫鎖
private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();
/// <summary>
/// 注冊工作流
/// </summary>
/// <param name="flow"></param>
public static bool RegisterWorkflow(IDoFlow flow)
{
try
{
readerWriterLockSlim.EnterReadLock();
if (flowEngines.ContainsKey(flow.Id))
return false;
flowEngines.Add(flow.Id, new FlowEngine(flow));
return true;
}
finally
{
readerWriterLockSlim.ExitReadLock();
}
}
/// <summary>
/// 注冊工作流
/// </summary>
/// <param name="flow"></param>
public static bool RegisterWorkflow<TDoFlow>()
{
Type type = typeof(TDoFlow);
IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
try
{
readerWriterLockSlim.EnterReadLock();
if (flowEngines.ContainsKey(flow.Id))
return false;
flowEngines.Add(flow.Id, new FlowEngine(flow));
return true;
}
finally
{
readerWriterLockSlim.ExitReadLock();
}
}
/// <summary>
/// 要啟動(dòng)的工作流
/// </summary>
/// <param name="id"></param>
public static bool Start(int id)
{
FlowEngine engine;
// 讀寫鎖
try
{
readerWriterLockSlim.EnterUpgradeableReadLock();
if (!flowEngines.ContainsKey(id))
return default;
try
{
readerWriterLockSlim.EnterWriteLock();
engine = flowEngines[id];
}
catch { return default; }
finally
{
readerWriterLockSlim.ExitWriteLock();
}
}
catch { return default; }
finally
{
readerWriterLockSlim.ExitUpgradeableReadLock();
}
engine.Start();
return true;
}
}
}就這樣程序?qū)懲炅恕?/p>
到此這篇關(guān)于C#多線程系列之工作流實(shí)現(xiàn)的文章就介紹到這了。希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
詳解C#如何利用TcpListener和TcpClient實(shí)現(xiàn)Tcp通訊
TcpListener 和 TcpClient 是在 System.Net.Sockets.Socket 類的基礎(chǔ)上做的進(jìn)一步封裝,使用 GetStream 方法返回網(wǎng)絡(luò)流,下面我們就來詳細(xì)一下如何使用TcpListener和TcpClient實(shí)現(xiàn)Tcp通訊吧2023-12-12
C#筆記之EF Code First 數(shù)據(jù)模型 數(shù)據(jù)遷移
EF 中 Code First 的數(shù)據(jù)遷移網(wǎng)上有很多資料,我這份并沒什么特別。Code First 創(chuàng)建視圖網(wǎng)上也有很多資料,但好像很麻煩,而且親測好像是無效的方法(可能是我太笨,沒搞成功),我摸索出了一種簡單有效的方法,這里分享給大家2021-09-09

