C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列
前言
進(jìn)程通信一般情況下比較少用,但是也有一些使用場景,有些做視頻傳輸?shù)乃坪鯐?huì)用多進(jìn)程來實(shí)現(xiàn),還有在子進(jìn)程中調(diào)用特定的庫來避免內(nèi)存泄漏,筆者最近也遇到了需要使用多進(jìn)程的場景。多進(jìn)程的使用最主要的就是進(jìn)程間的通信,本文參考了go語言的ipc庫,實(shí)現(xiàn)了一個(gè)基于共享內(nèi)存的跨進(jìn)程隊(duì)列。
一、實(shí)現(xiàn)原理
1、用到的主要對(duì)象
//共享內(nèi)存管理對(duì)象 MemoryMappedFile _mmf; //跨進(jìn)程的互斥變量 Mutex _mtx; //入隊(duì)信號(hào)量 Semaphore _semaEq; //出隊(duì)信號(hào)量 Semaphore _semaDq;
2、創(chuàng)建共享內(nèi)存
創(chuàng)建共享內(nèi)存需要使用MemoryMappedFile.CreateFromFile
實(shí)現(xiàn)跨平臺(tái)。CreateNew只能創(chuàng)建無法打開第二個(gè),OpenExisting只支持windows。
string name="共享內(nèi)存標(biāo)識(shí)名稱"; _shmPath="共享內(nèi)存文件路徑"+name; //通過文件路徑創(chuàng)建共享內(nèi)存 _mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false); //創(chuàng)建互斥變量 _mtx = new Mutex(false, Name + ".mx"); //創(chuàng)建入隊(duì)信號(hào)量,capacity為隊(duì)列元素個(gè)數(shù)容量 _semaEq = new Semaphore(0, (int)capacity, name+ ".eq"); //創(chuàng)建出隊(duì)信號(hào)量,capacity為隊(duì)列元素個(gè)數(shù)容量 _semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");
獲取讀寫對(duì)象
_mmva = _mmf.CreateViewAccessor();
值類型數(shù)組方式寫入
T[] obj; _mmva.WriteArray<T>(position , obj, 0, obj.Length);
3、頭部信息
采用循環(huán)隊(duì)列方式實(shí)現(xiàn),判斷隊(duì)空隊(duì)滿通過count、capacity的方式(參考了C#的Queue源碼),避免占用多一個(gè)空間。
struct QueueHeader { //元素大小 public nint ElementSize; //隊(duì)列容量 public nint Capacity; //當(dāng)前元素個(gè)數(shù) public nint Count; //隊(duì)列頭 public nint Front; //隊(duì)列尾 public nint Rear; }
隊(duì)列頭信息需要存儲(chǔ)在共享內(nèi)存中。
QueueHeader Header { get { QueueHeader header; _mmva.Read(0, out header); return header; } set { _mmva.Write(0, ref value); } }
4、入隊(duì)
示例如下
bool Enqueuee<T>(T[] obj) where T : struct { //共享內(nèi)存中讀取header var header = Header; //隊(duì)列滿返回 if (header.Count == header.Capacity) return false; //計(jì)算寫入的位置,頭部長度+隊(duì)尾*元素大小 nint position = _QueuetHeaderSize + header.Rear * header.ElementSize; //寫入共享內(nèi)存 _mmva.WriteArray<T>(position, obj, 0, obj.Length); //更新隊(duì)尾 header.Rear = (header.Rear + 1) % header.Capacity; //更新長度 header.Count++; //更新頭部信息到共享內(nèi)存 Header=header; return true; }
同步
//等待出隊(duì)信號(hào)量(如果隊(duì)列滿則會(huì)等待) if (!_semaDq.WaitOne(timeout)) return false; //進(jìn)入互斥鎖 if (!_mtx.WaitOne(timeout)) return false; try { //入隊(duì) Enqueue(obj); } finally { //通知入隊(duì)信號(hào)量 _semaEq.Release(); //釋放互斥鎖 _mtx.ReleaseMutex(); } return true;
5、出隊(duì)
object Dequeue() { //共享內(nèi)存中讀取header var header = Header; //隊(duì)列空則返回 if (header.Count == 0) return null; //計(jì)算讀取的位置,頭部長度+隊(duì)頭*元素大小 long position = _QueuetHeaderSize + header.Front * header.ElementSize; //創(chuàng)建數(shù)據(jù)用于裝載數(shù)據(jù) Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength); //將泛型轉(zhuǎn)type調(diào)用。 var readArray = _ReadArrayGeneric.MakeGenericMethod(readType); //讀取共享內(nèi)存的數(shù)據(jù) readArray.Invoke(_mmva, [position , arr, 0, arr.Length]); //更新隊(duì)頭 header.Front = (header.Front + 1) % header.Capacity; //更新長度 header.Count--; //更新頭部信息到共享內(nèi)存 Header = header; return msg; }
同步
//等待入隊(duì)信號(hào)量(如果隊(duì)列空則會(huì)等待) if (!_semaEq.WaitOne(timeout)) return null; //進(jìn)入互斥鎖 if (!_mtx.WaitOne(timeout!)) return null; try { //出隊(duì) return Dequeue(); } finally { //通知入隊(duì)信號(hào)量 _semaDq.Release(); //釋放互斥鎖 _mtx.ReleaseMutex(); }
6、釋放資源
/// <summary> /// 銷毀隊(duì)列,只會(huì)銷毀當(dāng)前實(shí)例,如果多個(gè)隊(duì)列打開同個(gè)名稱,其他隊(duì)列不受影響 /// </summary> public void Dispose() { _mmf.Dispose(); _mmva.Dispose(); _mtx.Dispose(); _semaEq.Dispose(); _semaDq.Dispose(); }
二、完整代碼
類的定義
/// <summary> /// 共享隊(duì)列 /// 基于共享內(nèi)存實(shí)現(xiàn) /// </summary> class SharedQueue : IDisposable { /// <summary> /// 名稱 /// </summary> public string Name { get; private set; } /// <summary> /// 元素最大大小 /// </summary> public long ElementMaxSize { get; private set; } /// <summary> /// 隊(duì)列容量 /// </summary> public long Capacity { get; private set; } /// <summary> /// 表示是否新創(chuàng)建,是則是創(chuàng)建,否則是打開已存在的。 /// </summary> public bool IsNewCreate { get; private set; } /// <summary> /// 構(gòu)造方法 /// </summary> /// <param name="name">唯一名稱,系統(tǒng)級(jí)別,不同進(jìn)程創(chuàng)建相同名稱的本對(duì)象,就是同一個(gè)隊(duì)列,可以進(jìn)行數(shù)據(jù)傳輸。</param> /// <param name="capacity">隊(duì)列容量,元素個(gè)數(shù)總量</param> /// <param name="elementBodyMaxSize">隊(duì)列元素最大大小,此大小需要考慮傳輸數(shù)據(jù)Type.FullName長度</param> public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728); /// <summary> /// 發(fā)送數(shù)據(jù) /// </summary> /// <param name="obj">發(fā)送的對(duì)象,支持值類型(元類型、結(jié)構(gòu)體)、值類型數(shù)組、可json序列化的任意對(duì)象(實(shí)體類、數(shù)組、List、字典等等),無法序列化會(huì)產(chǎn)生異常。 /// 會(huì)根據(jù)類型自動(dòng)判斷傳輸方式,值類型以及值類型數(shù)組會(huì)直接內(nèi)存拷貝,引用類型會(huì)進(jìn)行序列化。 /// 此方法隊(duì)列滿了會(huì)阻塞,直到發(fā)送成功才返回。 /// </param> /// <param name="isForceSerialize">是否強(qiáng)制序列化,結(jié)構(gòu)體不含引用的情況下會(huì)直接復(fù)制數(shù)據(jù)性能較高,但是如果結(jié)構(gòu)體成員變量有引用類型則會(huì)引發(fā)異常,此時(shí)可以強(qiáng)制序列化。</param> public void Send(object obj, bool isForceSerialize = false); /// <summary> /// 接收數(shù)據(jù) /// 此方法隊(duì)列空會(huì)阻塞,直到有數(shù)據(jù)才返回。 /// </summary> /// <returns>接收的數(shù)據(jù),與send的數(shù)據(jù)類型對(duì)應(yīng)??梢酝ㄟ^type或is判斷,或者提前知道類型直接轉(zhuǎn)換</returns> public object Receive(); /// <summary> /// 發(fā)送數(shù)據(jù)超時(shí) /// </summary> /// <param name="obj">發(fā)送的對(duì)象,支持值類型(元類型、結(jié)構(gòu)體)、值類型數(shù)組、可json序列化的任意對(duì)象(實(shí)體類、數(shù)組、List、字典等等),無法序列化會(huì)產(chǎn)生異常。 /// 會(huì)根據(jù)類型自動(dòng)判斷傳輸方式,值類型以及值類型數(shù)組會(huì)直接內(nèi)存拷貝,引用類型會(huì)進(jìn)行序列化。</param> /// <param name="timeout">超時(shí)時(shí)長</param> /// <param name="isForceSerialize">是否強(qiáng)制序列化,結(jié)構(gòu)體不含引用的情況下會(huì)直接復(fù)制數(shù)據(jù)性能較高,但是如果結(jié)構(gòu)體成員變量有引用類型則會(huì)引發(fā)異常,此時(shí)可以強(qiáng)制序列化。</param> /// <returns>true發(fā)送成功,false超時(shí)</returns> public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false); /// <summary> /// 接收超時(shí) /// </summary> /// <param name="timeout">超時(shí)時(shí)長</param> /// <returns>接收的數(shù)據(jù),與send的數(shù)據(jù)類型對(duì)應(yīng)??梢酝ㄟ^type或is判斷,或者提前知道類型直接轉(zhuǎn)換。 /// 超時(shí)返回null。 /// </returns> public object? ReceiveTimeout(TimeSpan timeout); /// <summary> /// 銷毀隊(duì)列,只會(huì)銷毀當(dāng)前實(shí)例,如果多個(gè)隊(duì)列打開同個(gè)名稱,其他隊(duì)列不受影響 /// </summary> public void Dispose(); }
三、使用示例
1、傳輸byte[]數(shù)據(jù)
進(jìn)程a
SharedQueue shq= new SharedQueue("shq1", 10); byte[] a = new byte[5] { 1, 2, 3, 4, 5 }; //發(fā)送數(shù)據(jù) shq.send(a);
進(jìn)程b
SharedQueue shq= new SharedQueue("shq1", 10); //接收數(shù)據(jù) var a=shq.Receive() as byte[]; Console.Write("receive: "); foreach (var i in a) { Console.Write(i); }
2、傳輸字符串
進(jìn)程a
SharedQueue shq= new SharedQueue("shq1", 10,64); shq.send("12345");
進(jìn)程b
SharedQueue shq= new SharedQueue("shq1", 10,64); var a=shq.Receive() as string; Console.WriteLine("receive: " + a);
3、傳輸對(duì)象
class A { public string Name; public int Number; }
進(jìn)程a
SharedQueue shq= new SharedQueue("shq1", 10,64); sq.Send(new A() { Name = "Tommy", Number = 102185784 });
進(jìn)程b
SharedQueue shq= new SharedQueue("shq1", 10,64); var a=shq.Receive() as A; Console.WriteLine("receive: " + a.Name + " " + a.Number);
總結(jié)
以上就是今天要講的內(nèi)容,實(shí)現(xiàn)這樣的一個(gè)對(duì)象,雖然代碼量不多,但還是有一點(diǎn)難度的,很多細(xì)節(jié)需要處理,比如泛型轉(zhuǎn)type以統(tǒng)一接口,信號(hào)量實(shí)現(xiàn)隊(duì)列和條件變量是有差異的,用CreateFromFile才能實(shí)現(xiàn)跨平臺(tái)。總的來說,有了這樣的一個(gè)隊(duì)列,跨線程通信就變的比較方便且高效了。
到此這篇關(guān)于C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列的文章就介紹到這了,更多相關(guān)C#跨進(jìn)程隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
c#實(shí)現(xiàn)適配器模式的項(xiàng)目實(shí)踐
適配器模式將一個(gè)類的接口轉(zhuǎn)換成客戶希望的另一個(gè)接口,使得原本由于接口不兼容而不能一起工作的那些類可以一起工作,本文主要介紹了c#實(shí)現(xiàn)適配器模式的項(xiàng)目實(shí)踐,感興趣的可以一起來了解一下2023-08-08Unity實(shí)現(xiàn)物體左右移動(dòng)效果
這篇文章主要為大家詳細(xì)介紹了Unity實(shí)現(xiàn)物體左右移動(dòng)效果,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-08-08C#?委托與?Lambda?表達(dá)式轉(zhuǎn)換機(jī)制及弱事件模式下的生命周期詳解
本文介紹了C#委托和Lambda表達(dá)式的工作原理,包括委托的內(nèi)部結(jié)構(gòu)、Lambda表達(dá)式的轉(zhuǎn)換機(jī)制以及弱事件模式下的生命周期管理,感興趣的朋友一起看看吧2025-02-02使用C#實(shí)現(xiàn)在word中插入頁眉頁腳的方法
這篇文章主要介紹了使用C#實(shí)現(xiàn)在word中插入頁眉頁腳的方法,是操作Word的常見方法,有一定的學(xué)習(xí)借鑒價(jià)值,需要的朋友可以參考下2014-08-08