欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列

 更新時(shí)間:2024年07月16日 10:57:42   作者:CodeOfCC  
進(jìn)程通信一般情況下比較少用,但是也有一些使用場景,有些做視頻傳輸?shù)乃坪鯐?huì)用多進(jìn)程來實(shí)現(xiàn),還有在子進(jìn)程中調(diào)用特定的庫來避免內(nèi)存泄漏,筆者最近也遇到了需要使用多進(jìn)程的場景,本文介紹了C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列,需要的朋友可以參考下

前言

進(jìn)程通信一般情況下比較少用,但是也有一些使用場景,有些做視頻傳輸?shù)乃坪鯐?huì)用多進(jìn)程來實(shí)現(xiàn),還有在子進(jìn)程中調(diào)用特定的庫來避免內(nèi)存泄漏,筆者最近也遇到了需要使用多進(jìn)程的場景。多進(jìn)程的使用最主要的就是進(jìn)程間的通信,本文參考了go語言的ipc庫,實(shí)現(xiàn)了一個(gè)基于共享內(nèi)存的跨進(jìn)程隊(duì)列。

一、實(shí)現(xiàn)原理

1、用到的主要對(duì)象

//共享內(nèi)存管理對(duì)象
MemoryMappedFile _mmf;
//跨進(jìn)程的互斥變量
Mutex _mtx;
//入隊(duì)信號(hào)量
Semaphore _semaEq;
//出隊(duì)信號(hào)量
Semaphore _semaDq;

2、創(chuàng)建共享內(nèi)存

創(chuàng)建共享內(nèi)存需要使用MemoryMappedFile.CreateFromFile實(shí)現(xiàn)跨平臺(tái)。CreateNew只能創(chuàng)建無法打開第二個(gè),OpenExisting只支持windows。

string name="共享內(nèi)存標(biāo)識(shí)名稱";
_shmPath="共享內(nèi)存文件路徑"+name;
//通過文件路徑創(chuàng)建共享內(nèi)存
_mmf = MemoryMappedFile.CreateFromFile(File.Open(_shmPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite), null, (_QueuetHeaderSize + (elementBodyMaxSize + _ElementHeaderSize) * capacity), MemoryMappedFileAccess.ReadWrite, HandleInheritability.Inheritable, false);
//創(chuàng)建互斥變量
_mtx = new Mutex(false, Name + ".mx");
//創(chuàng)建入隊(duì)信號(hào)量,capacity為隊(duì)列元素個(gè)數(shù)容量
_semaEq = new Semaphore(0, (int)capacity, name+ ".eq");
//創(chuàng)建出隊(duì)信號(hào)量,capacity為隊(duì)列元素個(gè)數(shù)容量
_semaDq = new Semaphore((int)capacity, (int)capacity, name + ".dq");

獲取讀寫對(duì)象

_mmva = _mmf.CreateViewAccessor();

值類型數(shù)組方式寫入

T[] obj;
_mmva.WriteArray<T>(position , obj, 0, obj.Length);

3、頭部信息

采用循環(huán)隊(duì)列方式實(shí)現(xiàn),判斷隊(duì)空隊(duì)滿通過count、capacity的方式(參考了C#的Queue源碼),避免占用多一個(gè)空間。

struct QueueHeader
{
    //元素大小
    public nint ElementSize;
    //隊(duì)列容量
    public nint Capacity;
    //當(dāng)前元素個(gè)數(shù)
    public nint Count;
    //隊(duì)列頭
    public nint Front;
    //隊(duì)列尾
    public nint Rear;
}

隊(duì)列頭信息需要存儲(chǔ)在共享內(nèi)存中。

QueueHeader Header
{
    get
    {
        QueueHeader header;
        _mmva.Read(0, out header);
        return header;
    }
    set
    {
        _mmva.Write(0, ref value);
    }
}

4、入隊(duì)

示例如下

bool Enqueuee<T>(T[] obj) where T : struct
{    
    //共享內(nèi)存中讀取header
    var header = Header;
    //隊(duì)列滿返回
    if (header.Count == header.Capacity) return false;
    //計(jì)算寫入的位置,頭部長度+隊(duì)尾*元素大小
    nint position = _QueuetHeaderSize + header.Rear * header.ElementSize;
    //寫入共享內(nèi)存
    _mmva.WriteArray<T>(position, obj, 0, obj.Length);
    //更新隊(duì)尾
    header.Rear = (header.Rear + 1) % header.Capacity;
    //更新長度
    header.Count++;
    //更新頭部信息到共享內(nèi)存
    Header=header;
    return true;
}

同步

 //等待出隊(duì)信號(hào)量(如果隊(duì)列滿則會(huì)等待)
 if (!_semaDq.WaitOne(timeout)) return false;
 //進(jìn)入互斥鎖
 if (!_mtx.WaitOne(timeout)) return false;
 try
 {   
     //入隊(duì)
     Enqueue(obj);
 }
 finally
 {
     //通知入隊(duì)信號(hào)量
     _semaEq.Release();
     //釋放互斥鎖
     _mtx.ReleaseMutex();
 }
 return true;

5、出隊(duì)

object Dequeue()
{   
    //共享內(nèi)存中讀取header
    var header = Header;
    //隊(duì)列空則返回
    if (header.Count == 0) return null;
    //計(jì)算讀取的位置,頭部長度+隊(duì)頭*元素大小
    long position = _QueuetHeaderSize + header.Front * header.ElementSize;
    //創(chuàng)建數(shù)據(jù)用于裝載數(shù)據(jù)
    Array arr = Array.CreateInstance(readType, msg.Header.ArrayLength);
    //將泛型轉(zhuǎn)type調(diào)用。
    var readArray = _ReadArrayGeneric.MakeGenericMethod(readType);
    //讀取共享內(nèi)存的數(shù)據(jù)
    readArray.Invoke(_mmva, [position , arr, 0, arr.Length]);
    //更新隊(duì)頭
    header.Front = (header.Front + 1) % header.Capacity;
    //更新長度
    header.Count--;
    //更新頭部信息到共享內(nèi)存
    Header = header;
    return msg;
}

同步

 //等待入隊(duì)信號(hào)量(如果隊(duì)列空則會(huì)等待)
if (!_semaEq.WaitOne(timeout)) return null;
 //進(jìn)入互斥鎖
if (!_mtx.WaitOne(timeout!)) return null;
try
{ 
   //出隊(duì)
    return  Dequeue();
}
finally
{
     //通知入隊(duì)信號(hào)量
    _semaDq.Release();
     //釋放互斥鎖
    _mtx.ReleaseMutex();
}

6、釋放資源

/// <summary>
/// 銷毀隊(duì)列,只會(huì)銷毀當(dāng)前實(shí)例,如果多個(gè)隊(duì)列打開同個(gè)名稱,其他隊(duì)列不受影響
/// </summary>
public void Dispose()
{
    _mmf.Dispose();
    _mmva.Dispose();
    _mtx.Dispose();
    _semaEq.Dispose();
    _semaDq.Dispose();
}

二、完整代碼

類的定義

/// <summary>
/// 共享隊(duì)列
/// 基于共享內(nèi)存實(shí)現(xiàn)
/// </summary>
class SharedQueue : IDisposable
{
    /// <summary>
    /// 名稱
    /// </summary>
    public string Name { get; private set; }
    /// <summary>
    /// 元素最大大小
    /// </summary>
    public long ElementMaxSize { get; private set; }
    /// <summary>
    /// 隊(duì)列容量
    /// </summary>
    public long Capacity { get; private set; }
    /// <summary>
    /// 表示是否新創(chuàng)建,是則是創(chuàng)建,否則是打開已存在的。
    /// </summary>
    public bool IsNewCreate { get; private set; }
    /// <summary>
    /// 構(gòu)造方法
    /// </summary>
    /// <param name="name">唯一名稱,系統(tǒng)級(jí)別,不同進(jìn)程創(chuàng)建相同名稱的本對(duì)象,就是同一個(gè)隊(duì)列,可以進(jìn)行數(shù)據(jù)傳輸。</param>
    /// <param name="capacity">隊(duì)列容量,元素個(gè)數(shù)總量</param>
    /// <param name="elementBodyMaxSize">隊(duì)列元素最大大小,此大小需要考慮傳輸數(shù)據(jù)Type.FullName長度</param>
    public SharedQueue(string name, nint capacity = 1, nint elementBodyMaxSize = 3145728);
    /// <summary>
    /// 發(fā)送數(shù)據(jù)
    /// </summary>
    /// <param name="obj">發(fā)送的對(duì)象,支持值類型(元類型、結(jié)構(gòu)體)、值類型數(shù)組、可json序列化的任意對(duì)象(實(shí)體類、數(shù)組、List、字典等等),無法序列化會(huì)產(chǎn)生異常。
    /// 會(huì)根據(jù)類型自動(dòng)判斷傳輸方式,值類型以及值類型數(shù)組會(huì)直接內(nèi)存拷貝,引用類型會(huì)進(jìn)行序列化。
    /// 此方法隊(duì)列滿了會(huì)阻塞,直到發(fā)送成功才返回。
    /// </param>
    /// <param name="isForceSerialize">是否強(qiáng)制序列化,結(jié)構(gòu)體不含引用的情況下會(huì)直接復(fù)制數(shù)據(jù)性能較高,但是如果結(jié)構(gòu)體成員變量有引用類型則會(huì)引發(fā)異常,此時(shí)可以強(qiáng)制序列化。</param>
    public void Send(object obj, bool isForceSerialize = false);
    /// <summary>
    /// 接收數(shù)據(jù)
    /// 此方法隊(duì)列空會(huì)阻塞,直到有數(shù)據(jù)才返回。
    /// </summary>
    /// <returns>接收的數(shù)據(jù),與send的數(shù)據(jù)類型對(duì)應(yīng)??梢酝ㄟ^type或is判斷,或者提前知道類型直接轉(zhuǎn)換</returns>
    public object Receive();
    /// <summary>
    /// 發(fā)送數(shù)據(jù)超時(shí)
    /// </summary>
    /// <param name="obj">發(fā)送的對(duì)象,支持值類型(元類型、結(jié)構(gòu)體)、值類型數(shù)組、可json序列化的任意對(duì)象(實(shí)體類、數(shù)組、List、字典等等),無法序列化會(huì)產(chǎn)生異常。
    /// 會(huì)根據(jù)類型自動(dòng)判斷傳輸方式,值類型以及值類型數(shù)組會(huì)直接內(nèi)存拷貝,引用類型會(huì)進(jìn)行序列化。</param>
    /// <param name="timeout">超時(shí)時(shí)長</param>
    /// <param name="isForceSerialize">是否強(qiáng)制序列化,結(jié)構(gòu)體不含引用的情況下會(huì)直接復(fù)制數(shù)據(jù)性能較高,但是如果結(jié)構(gòu)體成員變量有引用類型則會(huì)引發(fā)異常,此時(shí)可以強(qiáng)制序列化。</param>
    /// <returns>true發(fā)送成功,false超時(shí)</returns>
    public bool SendTimeout(object obj, TimeSpan timeout, bool isForceSerialize = false);
    /// <summary>
    /// 接收超時(shí)
    /// </summary>
    /// <param name="timeout">超時(shí)時(shí)長</param>
    /// <returns>接收的數(shù)據(jù),與send的數(shù)據(jù)類型對(duì)應(yīng)??梢酝ㄟ^type或is判斷,或者提前知道類型直接轉(zhuǎn)換。
    /// 超時(shí)返回null。
    /// </returns>
    public object? ReceiveTimeout(TimeSpan timeout);
    /// <summary>
    /// 銷毀隊(duì)列,只會(huì)銷毀當(dāng)前實(shí)例,如果多個(gè)隊(duì)列打開同個(gè)名稱,其他隊(duì)列不受影響
    /// </summary>
    public void Dispose();
}

三、使用示例

1、傳輸byte[]數(shù)據(jù)

進(jìn)程a

SharedQueue shq= new SharedQueue("shq1", 10);
byte[] a = new byte[5] { 1, 2, 3, 4, 5 };
//發(fā)送數(shù)據(jù)
shq.send(a);

進(jìn)程b

SharedQueue shq= new SharedQueue("shq1", 10);
//接收數(shù)據(jù)
var a=shq.Receive() as byte[];
Console.Write("receive: ");
foreach (var i in a)
{
    Console.Write(i);
}

2、傳輸字符串

進(jìn)程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
shq.send("12345");

進(jìn)程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as string;
Console.WriteLine("receive: " + a);

3、傳輸對(duì)象

class A
{
    public string Name;
    public int Number;
}

進(jìn)程a

SharedQueue shq= new SharedQueue("shq1", 10,64);
sq.Send(new A() { Name = "Tommy", Number = 102185784 });

進(jìn)程b

SharedQueue shq= new SharedQueue("shq1", 10,64);
var a=shq.Receive() as A;
Console.WriteLine("receive: " + a.Name + " " + a.Number);

總結(jié)

以上就是今天要講的內(nèi)容,實(shí)現(xiàn)這樣的一個(gè)對(duì)象,雖然代碼量不多,但還是有一點(diǎn)難度的,很多細(xì)節(jié)需要處理,比如泛型轉(zhuǎn)type以統(tǒng)一接口,信號(hào)量實(shí)現(xiàn)隊(duì)列和條件變量是有差異的,用CreateFromFile才能實(shí)現(xiàn)跨平臺(tái)。總的來說,有了這樣的一個(gè)隊(duì)列,跨線程通信就變的比較方便且高效了。

到此這篇關(guān)于C#基于共享內(nèi)存實(shí)現(xiàn)跨進(jìn)程隊(duì)列的文章就介紹到這了,更多相關(guān)C#跨進(jìn)程隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論