C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列
前言
進(jìn)程通信一般情況下比較少用,但是也有一些使用場景,有些做視頻傳輸?shù)乃坪鯐枚噙M(jìn)程來實(shí)現(xiàn),還有在子進(jìn)程中調(diào)用特定的庫來避免內(nèi)存泄漏,筆者最近也遇到了需要使用多進(jìn)程的場景。多進(jìn)程的使用最主要的就是進(jìn)程間的通信,本文參考了go語言的ipc庫,實(shí)現(xiàn)了一個基于共享內(nèi)存的跨進(jìn)程隊(duì)列。
一、實(shí)現(xiàn)原理
1、用到的主要對象
//共享內(nèi)存管理對象 MemoryMappedFile _mmf; //跨進(jìn)程的互斥變量 Mutex _mtx; //入隊(duì)信號量 Semaphore _semaEq; //出隊(duì)信號量 Semaphore _semaDq;
2、創(chuàng)建共享內(nèi)存
創(chuàng)建共享內(nèi)存需要使用MemoryMappedFile.CreateFromFile實(shí)現(xiàn)跨平臺。CreateNew只能創(chuàng)建無法打開第二個,OpenExisting只支持windows。
string name="共享內(nèi)存標(biāo)識名稱"; _shmPath="共享內(nèi)存文件路徑"+name; //通過文件路徑創(chuàng)建共享內(nèi)存 _mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false); //創(chuàng)建互斥變量 _mtx = new Mutex(false, Name + ".mx"); //創(chuàng)建入隊(duì)信號量,capacity為隊(duì)列元素個數(shù)容量 _semaEq = new Semaphore(0, (int)capacity, name+ ".eq"); //創(chuàng)建出隊(duì)信號量,capacity為隊(duì)列元素個數(shù)容量 _semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");
獲取讀寫對象
_mmva = _mmf.CreateViewAccessor();
值類型數(shù)組方式寫入
T[] obj; _mmva.WriteArray<T>(position , obj, 0, obj.Length);
3、頭部信息
采用循環(huán)隊(duì)列方式實(shí)現(xiàn),判斷隊(duì)空隊(duì)滿通過count、capacity的方式(參考了C#的Queue源碼),避免占用多一個空間。
struct QueueHeader
{
//元素大小
public nint ElementSize;
//隊(duì)列容量
public nint Capacity;
//當(dāng)前元素個數(shù)
public nint Count;
//隊(duì)列頭
public nint Front;
//隊(duì)列尾
public nint Rear;
}
隊(duì)列頭信息需要存儲在共享內(nèi)存中。
QueueHeader Header
{
get
{
QueueHeader header;
_mmva.Read(0, out header);
return header;
}
set
{
_mmva.Write(0, ref value);
}
}
4、入隊(duì)
示例如下
bool Enqueuee<T>(T[] obj) where T : struct
{
//共享內(nèi)存中讀取header
var header = Header;
//隊(duì)列滿返回
if (header.Count == header.Capacity) return false;
//計(jì)算寫入的位置,頭部長度+隊(duì)尾*元素大小
nint position = _QueuetHeaderSize + header.Rear * header.ElementSize;
//寫入共享內(nèi)存
_mmva.WriteArray<T>(position, obj, 0, obj.Length);
//更新隊(duì)尾
header.Rear = (header.Rear + 1) % header.Capacity;
//更新長度
header.Count++;
//更新頭部信息到共享內(nèi)存
Header=header;
return true;
}
同步
//等待出隊(duì)信號量(如果隊(duì)列滿則會等待)
if (!_semaDq.WaitOne(timeout)) return false;
//進(jìn)入互斥鎖
if (!_mtx.WaitOne(timeout)) return false;
try
{
//入隊(duì)
Enqueue(obj);
}
finally
{
//通知入隊(duì)信號量
_semaEq.Release();
//釋放互斥鎖
_mtx.ReleaseMutex();
}
return true;
5、出隊(duì)
object Dequeue()
{
//共享內(nèi)存中讀取header
var header = Header;
//隊(duì)列空則返回
if (header.Count == 0) return null;
//計(jì)算讀取的位置,頭部長度+隊(duì)頭*元素大小
long position = _QueuetHeaderSize + header.Front * header.ElementSize;
//創(chuàng)建數(shù)據(jù)用于裝載數(shù)據(jù)
Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength);
//將泛型轉(zhuǎn)type調(diào)用。
var readArray = _ReadArrayGeneric.MakeGenericMethod(readType);
//讀取共享內(nèi)存的數(shù)據(jù)
readArray.Invoke(_mmva, [position , arr, 0, arr.Length]);
//更新隊(duì)頭
header.Front = (header.Front + 1) % header.Capacity;
//更新長度
header.Count--;
//更新頭部信息到共享內(nèi)存
Header = header;
return msg;
}
同步
//等待入隊(duì)信號量(如果隊(duì)列空則會等待)
if (!_semaEq.WaitOne(timeout)) return null;
//進(jìn)入互斥鎖
if (!_mtx.WaitOne(timeout!)) return null;
try
{
//出隊(duì)
return Dequeue();
}
finally
{
//通知入隊(duì)信號量
_semaDq.Release();
//釋放互斥鎖
_mtx.ReleaseMutex();
}
6、釋放資源
/// <summary>
/// 銷毀隊(duì)列,只會銷毀當(dāng)前實(shí)例,如果多個隊(duì)列打開同個名稱,其他隊(duì)列不受影響
/// </summary>
public void Dispose()
{
_mmf.Dispose();
_mmva.Dispose();
_mtx.Dispose();
_semaEq.Dispose();
_semaDq.Dispose();
}
二、完整代碼
類的定義
/// <summary>
/// 共享隊(duì)列
/// 基于共享內(nèi)存實(shí)現(xiàn)
/// </summary>
class SharedQueue : IDisposable
{
/// <summary>
/// 名稱
/// </summary>
public string Name { get; private set; }
/// <summary>
/// 元素最大大小
/// </summary>
public long ElementMaxSize { get; private set; }
/// <summary>
/// 隊(duì)列容量
/// </summary>
public long Capacity { get; private set; }
/// <summary>
/// 表示是否新創(chuàng)建,是則是創(chuàng)建,否則是打開已存在的。
/// </summary>
public bool IsNewCreate { get; private set; }
/// <summary>
/// 構(gòu)造方法
/// </summary>
/// <param name="name">唯一名稱,系統(tǒng)級別,不同進(jìn)程創(chuàng)建相同名稱的本對象,就是同一個隊(duì)列,可以進(jìn)行數(shù)據(jù)傳輸。</param>
/// <param name="capacity">隊(duì)列容量,元素個數(shù)總量</param>
/// <param name="elementBodyMaxSize">隊(duì)列元素最大大小,此大小需要考慮傳輸數(shù)據(jù)Type.FullName長度</param>
public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728);
/// <summary>
/// 發(fā)送數(shù)據(jù)
/// </summary>
/// <param name="obj">發(fā)送的對象,支持值類型(元類型、結(jié)構(gòu)體)、值類型數(shù)組、可json序列化的任意對象(實(shí)體類、數(shù)組、List、字典等等),無法序列化會產(chǎn)生異常。
/// 會根據(jù)類型自動判斷傳輸方式,值類型以及值類型數(shù)組會直接內(nèi)存拷貝,引用類型會進(jìn)行序列化。
/// 此方法隊(duì)列滿了會阻塞,直到發(fā)送成功才返回。
/// </param>
/// <param name="isForceSerialize">是否強(qiáng)制序列化,結(jié)構(gòu)體不含引用的情況下會直接復(fù)制數(shù)據(jù)性能較高,但是如果結(jié)構(gòu)體成員變量有引用類型則會引發(fā)異常,此時可以強(qiáng)制序列化。</param>
public void Send(object obj, bool isForceSerialize = false);
/// <summary>
/// 接收數(shù)據(jù)
/// 此方法隊(duì)列空會阻塞,直到有數(shù)據(jù)才返回。
/// </summary>
/// <returns>接收的數(shù)據(jù),與send的數(shù)據(jù)類型對應(yīng)??梢酝ㄟ^type或is判斷,或者提前知道類型直接轉(zhuǎn)換</returns>
public object Receive();
/// <summary>
/// 發(fā)送數(shù)據(jù)超時
/// </summary>
/// <param name="obj">發(fā)送的對象,支持值類型(元類型、結(jié)構(gòu)體)、值類型數(shù)組、可json序列化的任意對象(實(shí)體類、數(shù)組、List、字典等等),無法序列化會產(chǎn)生異常。
/// 會根據(jù)類型自動判斷傳輸方式,值類型以及值類型數(shù)組會直接內(nèi)存拷貝,引用類型會進(jìn)行序列化。</param>
/// <param name="timeout">超時時長</param>
/// <param name="isForceSerialize">是否強(qiáng)制序列化,結(jié)構(gòu)體不含引用的情況下會直接復(fù)制數(shù)據(jù)性能較高,但是如果結(jié)構(gòu)體成員變量有引用類型則會引發(fā)異常,此時可以強(qiáng)制序列化。</param>
/// <returns>true發(fā)送成功,false超時</returns>
public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false);
/// <summary>
/// 接收超時
/// </summary>
/// <param name="timeout">超時時長</param>
/// <returns>接收的數(shù)據(jù),與send的數(shù)據(jù)類型對應(yīng)??梢酝ㄟ^type或is判斷,或者提前知道類型直接轉(zhuǎn)換。
/// 超時返回null。
/// </returns>
public object? ReceiveTimeout(TimeSpan timeout);
/// <summary>
/// 銷毀隊(duì)列,只會銷毀當(dāng)前實(shí)例,如果多個隊(duì)列打開同個名稱,其他隊(duì)列不受影響
/// </summary>
public void Dispose();
}
三、使用示例
1、傳輸byte[]數(shù)據(jù)
進(jìn)程a
SharedQueue shq= new SharedQueue("shq1", 10);
byte[] a = new byte[5] { 1, 2, 3, 4, 5 };
//發(fā)送數(shù)據(jù)
shq.send(a);
進(jìn)程b
SharedQueue shq= new SharedQueue("shq1", 10);
//接收數(shù)據(jù)
var a=shq.Receive() as byte[];
Console.Write("receive: ");
foreach (var i in a)
{
Console.Write(i);
}

2、傳輸字符串
進(jìn)程a
SharedQueue shq= new SharedQueue("shq1", 10,64);
shq.send("12345");
進(jìn)程b
SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as string;
Console.WriteLine("receive: " + a);

3、傳輸對象
class A
{
public string Name;
public int Number;
}
進(jìn)程a
SharedQueue shq= new SharedQueue("shq1", 10,64);
sq.Send(new A() { Name = "Tommy", Number = 102185784 });
進(jìn)程b
SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as A;
Console.WriteLine("receive: " + a.Name + " " + a.Number);

總結(jié)
以上就是今天要講的內(nèi)容,實(shí)現(xiàn)這樣的一個對象,雖然代碼量不多,但還是有一點(diǎn)難度的,很多細(xì)節(jié)需要處理,比如泛型轉(zhuǎn)type以統(tǒng)一接口,信號量實(shí)現(xiàn)隊(duì)列和條件變量是有差異的,用CreateFromFile才能實(shí)現(xiàn)跨平臺。總的來說,有了這樣的一個隊(duì)列,跨線程通信就變的比較方便且高效了。
到此這篇關(guān)于C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列的文章就介紹到這了,更多相關(guān)C#跨進(jìn)程隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
c#實(shí)現(xiàn)適配器模式的項(xiàng)目實(shí)踐
適配器模式將一個類的接口轉(zhuǎn)換成客戶希望的另一個接口,使得原本由于接口不兼容而不能一起工作的那些類可以一起工作,本文主要介紹了c#實(shí)現(xiàn)適配器模式的項(xiàng)目實(shí)踐,感興趣的可以一起來了解一下2023-08-08
C#?委托與?Lambda?表達(dá)式轉(zhuǎn)換機(jī)制及弱事件模式下的生命周期詳解
本文介紹了C#委托和Lambda表達(dá)式的工作原理,包括委托的內(nèi)部結(jié)構(gòu)、Lambda表達(dá)式的轉(zhuǎn)換機(jī)制以及弱事件模式下的生命周期管理,感興趣的朋友一起看看吧2025-02-02
使用C#實(shí)現(xiàn)在word中插入頁眉頁腳的方法
這篇文章主要介紹了使用C#實(shí)現(xiàn)在word中插入頁眉頁腳的方法,是操作Word的常見方法,有一定的學(xué)習(xí)借鑒價值,需要的朋友可以參考下2014-08-08

