.NET?高性能緩沖隊列實現(xiàn)?BufferQueue的操作過程
前言
BufferQueue 是一個用 .NET 編寫的高性能的緩沖隊列實現(xiàn),支持多線程并發(fā)操作。
項目是從 mocha 項目中獨立出來的一個組件,經(jīng)過修改以提供更通用的緩沖隊列功能。
目前支持的緩沖區(qū)類型為內(nèi)存緩沖區(qū),后續(xù)會考慮支持更多類型的緩沖區(qū)。
適用場景
生產(chǎn)者和消費者之間的速度不一致,需要并發(fā)批量處理數(shù)據(jù)的場景。
功能說明
- 支持創(chuàng)建多個 Topic,每個 Topic 可以有多種數(shù)據(jù)類型。每一對 Topic 和數(shù)據(jù)類型對應一個獨立的緩沖區(qū)。
支持創(chuàng)建多個 Consumer Group,每個 Consumer Group 的消費進度都是獨立的。支持多個 Consumer Group 并發(fā)消費同一個 Topic。
支持同一個 Consumer Group 創(chuàng)建多個 Consumer,以負載均衡的方式消費數(shù)據(jù)。
支持數(shù)據(jù)的批量消費,可以一次性獲取多條數(shù)據(jù)。
支持 pull 模式和 push 模式兩種消費模式。
pull 模式下和 push 模式下都支持 auto commit 和 manual commit 兩種提交方式。auto commit 模式下,消費者在收到數(shù)據(jù)后自動提交消費進度,如果消費失敗不會重試。manual commit 模式下,消費者需要手動提交消費進度,如果消費失敗只要不提交進度就可以重試。
需要注意的是,當前版本出于簡化實現(xiàn)的考慮,暫不支持消費者的動態(tài)擴容和縮容,需要在創(chuàng)建消費者時指定消費者數(shù)量。
使用示例
安裝 Nuget 包:
dotnet add package BufferQueue
項目基于 Microsoft.Extensions.DependencyInjection,使用時需要先注冊服務。
BufferQueue 支持兩種消費模式:pull 模式和 push 模式。
builder.Services.AddBufferQueue(options => { options.UseMemory(bufferOptions => { // 每一對 Topic 和數(shù)據(jù)類型對應一個獨立的緩沖區(qū),可以設置 partitionNumber bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6); bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4); bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8); }) // 添加 push 模式的消費者 // 掃描指定程序集中的標記了 BufferPushCustomerAttribute 的類, // 注冊為 push 模式的消費者 .AddPushCustomers(typeof(Program).Assembly); }); // 在 HostedService 中使用 pull模式 消費數(shù)據(jù) builder.Services.AddHostedService<Foo1PullConsumerHostService>();
pull 模式的消費者示例:
public class Foo1PullConsumerHostService( IBufferQueue bufferQueue, ILogger<Foo1PullConsumerHostService> logger) : IHostedService { private readonly CancellationTokenSource _cancellationTokenSource = new(); public Task StartAsync(CancellationToken cancellationToken) { var token = CancellationTokenSource .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token) .Token; var consumers = bufferQueue.CreatePullConsumers<Foo>( new BufferPullConsumerOptions { TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100, }, consumerNumber: 4); foreach (var consumer in consumers) { _ = ConsumeAsync(consumer, token); } return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _cancellationTokenSource.Cancel(); return Task.CompletedTask; } private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken) { await foreach (var buffer in consumer.ConsumeAsync(cancellationToken)) { foreach (var foo in buffer) { // Process the foo logger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo); } } } }
push 模式的消費者示例:
通過 BufferPushCustomer 特性注冊 push 模式的消費者。
push consumer 會被注冊到 DI 容器中,可以通過構(gòu)造函數(shù)注入其他服務,可以通過設置 ServiceLifetime 來控制 consumer 的生命周期。
BufferPushCustomerAttribute 中的 concurrency 參數(shù)用于設置 push consumer 的消費并發(fā)數(shù),對應 pull consumer 的 consumerNumber。
[BufferPushCustomer( topicName: "topic-foo2", groupName: "group-foo2", batchSize: 100, serviceLifetime: ServiceLifetime.Singleton, concurrency: 2)] public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo> { public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken) { foreach (var foo in buffer) { logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo); } return Task.CompletedTask; } }
[BufferPushCustomer( "topic-bar", "group-bar", 100, ServiceLifetime.Scoped, 2)] public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar> { public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer, CancellationToken cancellationToken) { foreach (var bar in buffer) { logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar); } var commitTask = committer.CommitAsync(); if (!commitTask.IsCompletedSuccessfully) { await commitTask.AsTask(); } } }
BufferQueue 內(nèi)部設計概述
Topic 的隔離
BufferQueue 有以下的特性:
同一個數(shù)據(jù)類型 下的 不同 Topic 的 BufferQueue 互不干擾。
同一個 Topic 下的 不同數(shù)據(jù)類型 的 BufferQueue 互不干擾。
這個特性是通過以下兩層接口設計實現(xiàn)的:
IBufferQueue:根據(jù) TopicName 和 類型參數(shù) T 將請求轉(zhuǎn)發(fā)給具體的 IBufferQueue<T> 實現(xiàn)(借助 KeyedService 實現(xiàn)),其中參數(shù) T 代表 Buffer 所承載的數(shù)據(jù)實體的類型。
IBufferQueue<T>:具體的 BufferQueue 實現(xiàn),負責管理 Topic 下的數(shù)據(jù)。屬于 Buffer 模塊的內(nèi)部實現(xiàn),不對外暴露。
Partition 的設計
為了保證消費速度,BufferQueue 將數(shù)據(jù)劃分為多個 Partition,每個 Partition 都是一個獨立的隊列,每個 Partition 都有一個對應的消費者線程。
Producer 以輪詢的方式往每個 Partition 中寫入數(shù)據(jù)。
Consumer 最多不允許超過 Partition 的數(shù)量,Partition 按平均分配到組內(nèi)每個 Customer 上。
當一個 Consumer 被分配了多個 Partition 時,以輪訓的方式進行消費。
每個 Partition 上會記錄不同消費組的消費進度,不同組之間的消費進度互不干擾。
對并發(fā)的支持
Producer 支持并發(fā)寫入。
Consumer 消費時是綁定 Partition 的,為保證能正確管理 Partition 的消費進度,Consumer 不支持并發(fā)消費。
如果要增加消費速度,需創(chuàng)建多個 Consumer。
Partition 的動態(tài)擴容
Partition 的基本組成單元是 Segment,Segment 代表保存數(shù)據(jù)的數(shù)組,多個 Segment 通過鏈表的形式組合成一個 Partition。
當一個 Segment 寫滿后,通過在其后面追加一個 Segment 實現(xiàn)擴容。
Segment 中用于保存數(shù)據(jù)的數(shù)組的每一個元素稱為 Slot,每個 Slot 都有一個Partition 內(nèi)唯一的自增 Offset。
Segment 的回收機制
每次在 Partition 中新增 Segment 時,會從頭判斷此前的 Segment 是否已經(jīng)被所有消費組消費完,回收最后一個消費完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。
Benchmark
測試環(huán)境:Apple M2 Max 64GB
寫入性能測試
與 BlockingCollection 對比并發(fā),并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12, partitionNumber 為 1 和 12。
測試結(jié)果
在并發(fā)寫入時,BufferQueue 的寫入性能明顯優(yōu)于 BlockingCollection。
消費性能測試
pull 模式 consumer 與 BlockingCollection 對比并發(fā)讀取性能,并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12,partitionNumber 為 12。
測試結(jié)果
在批量消費時,隨著批量大小的增加,BufferQueue 的消費性能優(yōu)勢更加明顯。
到此這篇關于.NET 高性能緩沖隊列實現(xiàn) BufferQueue的文章就介紹到這了,更多相關.NET 高性能緩沖隊列實現(xiàn) BufferQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
解決VS2012 Express的There was a problem sending the command to
安裝Visual Studio 2012 Express之后,雙擊打開web.config文件時經(jīng)常出現(xiàn)“There was a problem sending the command to the program”的錯誤,然后VS2012 Express打開了,但web.config文件沒打開,需要再次雙擊web.config文件才能打開。很是煩人2013-02-02基于.Net中的數(shù)字與日期格式化規(guī)則助記詞的使用詳解
本篇文章是對.Net中的數(shù)字與日期格式化規(guī)則助記詞的使用進行了詳細的分析介紹,需要的朋友參考下2013-05-05詳解在Windows下如何使用AspNetCore Api 和consul
這篇文章主要介紹了詳解在Windows下如何使用AspNetCore Api 和consul,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-06-06CZGL.ProcessMetrics處理監(jiān)控數(shù)據(jù)的三種方式介紹
這篇文章介紹了CZGL.ProcessMetrics處理監(jiān)控數(shù)據(jù)的三種方式,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-04-04