.NET?高性能緩沖隊(duì)列實(shí)現(xiàn)?BufferQueue的操作過程
前言
BufferQueue 是一個(gè)用 .NET 編寫的高性能的緩沖隊(duì)列實(shí)現(xiàn),支持多線程并發(fā)操作。
項(xiàng)目是從 mocha 項(xiàng)目中獨(dú)立出來的一個(gè)組件,經(jīng)過修改以提供更通用的緩沖隊(duì)列功能。
目前支持的緩沖區(qū)類型為內(nèi)存緩沖區(qū),后續(xù)會考慮支持更多類型的緩沖區(qū)。
適用場景
生產(chǎn)者和消費(fèi)者之間的速度不一致,需要并發(fā)批量處理數(shù)據(jù)的場景。
功能說明
- 支持創(chuàng)建多個(gè) Topic,每個(gè) Topic 可以有多種數(shù)據(jù)類型。每一對 Topic 和數(shù)據(jù)類型對應(yīng)一個(gè)獨(dú)立的緩沖區(qū)。
支持創(chuàng)建多個(gè) Consumer Group,每個(gè) Consumer Group 的消費(fèi)進(jìn)度都是獨(dú)立的。支持多個(gè) Consumer Group 并發(fā)消費(fèi)同一個(gè) Topic。
支持同一個(gè) Consumer Group 創(chuàng)建多個(gè) Consumer,以負(fù)載均衡的方式消費(fèi)數(shù)據(jù)。
支持?jǐn)?shù)據(jù)的批量消費(fèi),可以一次性獲取多條數(shù)據(jù)。
支持 pull 模式和 push 模式兩種消費(fèi)模式。
pull 模式下和 push 模式下都支持 auto commit 和 manual commit 兩種提交方式。auto commit 模式下,消費(fèi)者在收到數(shù)據(jù)后自動提交消費(fèi)進(jìn)度,如果消費(fèi)失敗不會重試。manual commit 模式下,消費(fèi)者需要手動提交消費(fèi)進(jìn)度,如果消費(fèi)失敗只要不提交進(jìn)度就可以重試。
需要注意的是,當(dāng)前版本出于簡化實(shí)現(xiàn)的考慮,暫不支持消費(fèi)者的動態(tài)擴(kuò)容和縮容,需要在創(chuàng)建消費(fèi)者時(shí)指定消費(fèi)者數(shù)量。
使用示例
安裝 Nuget 包:
dotnet add package BufferQueue
項(xiàng)目基于 Microsoft.Extensions.DependencyInjection,使用時(shí)需要先注冊服務(wù)。
BufferQueue 支持兩種消費(fèi)模式:pull 模式和 push 模式。
builder.Services.AddBufferQueue(options => { options.UseMemory(bufferOptions => { // 每一對 Topic 和數(shù)據(jù)類型對應(yīng)一個(gè)獨(dú)立的緩沖區(qū),可以設(shè)置 partitionNumber bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6); bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4); bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8); }) // 添加 push 模式的消費(fèi)者 // 掃描指定程序集中的標(biāo)記了 BufferPushCustomerAttribute 的類, // 注冊為 push 模式的消費(fèi)者 .AddPushCustomers(typeof(Program).Assembly); }); // 在 HostedService 中使用 pull模式 消費(fèi)數(shù)據(jù) builder.Services.AddHostedService<Foo1PullConsumerHostService>();
pull 模式的消費(fèi)者示例:
public class Foo1PullConsumerHostService( IBufferQueue bufferQueue, ILogger<Foo1PullConsumerHostService> logger) : IHostedService { private readonly CancellationTokenSource _cancellationTokenSource = new(); public Task StartAsync(CancellationToken cancellationToken) { var token = CancellationTokenSource .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token) .Token; var consumers = bufferQueue.CreatePullConsumers<Foo>( new BufferPullConsumerOptions { TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100, }, consumerNumber: 4); foreach (var consumer in consumers) { _ = ConsumeAsync(consumer, token); } return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _cancellationTokenSource.Cancel(); return Task.CompletedTask; } private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken) { await foreach (var buffer in consumer.ConsumeAsync(cancellationToken)) { foreach (var foo in buffer) { // Process the foo logger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo); } } } }
push 模式的消費(fèi)者示例:
通過 BufferPushCustomer 特性注冊 push 模式的消費(fèi)者。
push consumer 會被注冊到 DI 容器中,可以通過構(gòu)造函數(shù)注入其他服務(wù),可以通過設(shè)置 ServiceLifetime 來控制 consumer 的生命周期。
BufferPushCustomerAttribute 中的 concurrency 參數(shù)用于設(shè)置 push consumer 的消費(fèi)并發(fā)數(shù),對應(yīng) pull consumer 的 consumerNumber。
[BufferPushCustomer( topicName: "topic-foo2", groupName: "group-foo2", batchSize: 100, serviceLifetime: ServiceLifetime.Singleton, concurrency: 2)] public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo> { public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken) { foreach (var foo in buffer) { logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo); } return Task.CompletedTask; } }
[BufferPushCustomer( "topic-bar", "group-bar", 100, ServiceLifetime.Scoped, 2)] public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar> { public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer, CancellationToken cancellationToken) { foreach (var bar in buffer) { logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar); } var commitTask = committer.CommitAsync(); if (!commitTask.IsCompletedSuccessfully) { await commitTask.AsTask(); } } }
BufferQueue 內(nèi)部設(shè)計(jì)概述
Topic 的隔離
BufferQueue 有以下的特性:
同一個(gè)數(shù)據(jù)類型 下的 不同 Topic 的 BufferQueue 互不干擾。
同一個(gè) Topic 下的 不同數(shù)據(jù)類型 的 BufferQueue 互不干擾。
這個(gè)特性是通過以下兩層接口設(shè)計(jì)實(shí)現(xiàn)的:
IBufferQueue:根據(jù) TopicName 和 類型參數(shù) T 將請求轉(zhuǎn)發(fā)給具體的 IBufferQueue<T> 實(shí)現(xiàn)(借助 KeyedService 實(shí)現(xiàn)),其中參數(shù) T 代表 Buffer 所承載的數(shù)據(jù)實(shí)體的類型。
IBufferQueue<T>:具體的 BufferQueue 實(shí)現(xiàn),負(fù)責(zé)管理 Topic 下的數(shù)據(jù)。屬于 Buffer 模塊的內(nèi)部實(shí)現(xiàn),不對外暴露。
Partition 的設(shè)計(jì)
為了保證消費(fèi)速度,BufferQueue 將數(shù)據(jù)劃分為多個(gè) Partition,每個(gè) Partition 都是一個(gè)獨(dú)立的隊(duì)列,每個(gè) Partition 都有一個(gè)對應(yīng)的消費(fèi)者線程。
Producer 以輪詢的方式往每個(gè) Partition 中寫入數(shù)據(jù)。
Consumer 最多不允許超過 Partition 的數(shù)量,Partition 按平均分配到組內(nèi)每個(gè) Customer 上。
當(dāng)一個(gè) Consumer 被分配了多個(gè) Partition 時(shí),以輪訓(xùn)的方式進(jìn)行消費(fèi)。
每個(gè) Partition 上會記錄不同消費(fèi)組的消費(fèi)進(jìn)度,不同組之間的消費(fèi)進(jìn)度互不干擾。
對并發(fā)的支持
Producer 支持并發(fā)寫入。
Consumer 消費(fèi)時(shí)是綁定 Partition 的,為保證能正確管理 Partition 的消費(fèi)進(jìn)度,Consumer 不支持并發(fā)消費(fèi)。
如果要增加消費(fèi)速度,需創(chuàng)建多個(gè) Consumer。
Partition 的動態(tài)擴(kuò)容
Partition 的基本組成單元是 Segment,Segment 代表保存數(shù)據(jù)的數(shù)組,多個(gè) Segment 通過鏈表的形式組合成一個(gè) Partition。
當(dāng)一個(gè) Segment 寫滿后,通過在其后面追加一個(gè) Segment 實(shí)現(xiàn)擴(kuò)容。
Segment 中用于保存數(shù)據(jù)的數(shù)組的每一個(gè)元素稱為 Slot,每個(gè) Slot 都有一個(gè)Partition 內(nèi)唯一的自增 Offset。
Segment 的回收機(jī)制
每次在 Partition 中新增 Segment 時(shí),會從頭判斷此前的 Segment 是否已經(jīng)被所有消費(fèi)組消費(fèi)完,回收最后一個(gè)消費(fèi)完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。
Benchmark
測試環(huán)境:Apple M2 Max 64GB
寫入性能測試
與 BlockingCollection 對比并發(fā),并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12, partitionNumber 為 1 和 12。
測試結(jié)果
在并發(fā)寫入時(shí),BufferQueue 的寫入性能明顯優(yōu)于 BlockingCollection。
消費(fèi)性能測試
pull 模式 consumer 與 BlockingCollection 對比并發(fā)讀取性能,并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12,partitionNumber 為 12。
測試結(jié)果
在批量消費(fèi)時(shí),隨著批量大小的增加,BufferQueue 的消費(fèi)性能優(yōu)勢更加明顯。
到此這篇關(guān)于.NET 高性能緩沖隊(duì)列實(shí)現(xiàn) BufferQueue的文章就介紹到這了,更多相關(guān).NET 高性能緩沖隊(duì)列實(shí)現(xiàn) BufferQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決VS2012 Express的There was a problem sending the command to
安裝Visual Studio 2012 Express之后,雙擊打開web.config文件時(shí)經(jīng)常出現(xiàn)“There was a problem sending the command to the program”的錯(cuò)誤,然后VS2012 Express打開了,但web.config文件沒打開,需要再次雙擊web.config文件才能打開。很是煩人2013-02-02基于.Net中的數(shù)字與日期格式化規(guī)則助記詞的使用詳解
本篇文章是對.Net中的數(shù)字與日期格式化規(guī)則助記詞的使用進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05詳解在Windows下如何使用AspNetCore Api 和consul
這篇文章主要介紹了詳解在Windows下如何使用AspNetCore Api 和consul,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-06-06CZGL.ProcessMetrics處理監(jiān)控?cái)?shù)據(jù)的三種方式介紹
這篇文章介紹了CZGL.ProcessMetrics處理監(jiān)控?cái)?shù)據(jù)的三種方式,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04