.NET中的MassTransit分布式應用框架詳解
MassTransit是一款優(yōu)秀的分布式應用框架,可作為分布式應用的消息總線,也可以用作單體應用的事件總線。
引言
A free, open-source distributed application framework for .NET.
一個免費、開源的.NET 分布式應用框架。-- MassTransit 官網(wǎng)
MassTransit,直譯公共交通, 是由Chris Patterson
開發(fā)的基于消息驅(qū)動的.NET 分布式應用框架,其核心思想是借助消息來實現(xiàn)服務(wù)之間的松耦合異步通信,進而確保應用更高的可用性、可靠性和可擴展性。通過對消息模型的高度抽象,以及對主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大簡化了基于消息驅(qū)動的開發(fā)門檻,同時內(nèi)置了連接管理、消息序列化和消費者生命周期管理,以及諸如重試、限流、斷路器等異常處理機制,讓開發(fā)者更好的專注于業(yè)務(wù)實現(xiàn)。
簡而言之,MassTransit實現(xiàn)了消息代理透明化。無需面向消息代理編程進行諸如連接管理、隊列的申明和綁定等操作,即可輕松實現(xiàn)應用間消息的傳遞和消費。
快速體驗
空口無憑,創(chuàng)建一個項目快速體驗一下。
- 基于
worker
模板創(chuàng)建一個基礎(chǔ)項目:dotnet new worker -n MassTransit.Demo
- 打開項目,添加NuGet包:
MassTransit
- 定義訂單創(chuàng)建事件消息契約:
using System; namespace MassTransit.Demo { public record OrderCreatedEvent { public Guid OrderId { get; set; } } }
4.修改Worker
類,發(fā)送訂單創(chuàng)建事件:
namespace MassTransit.Demo; public class Worker : BackgroundService { readonly IBus _bus;//注冊總線 public Worker(IBus bus) { _bus = bus; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { //模擬并發(fā)送訂單創(chuàng)建事件 await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken); await Task.Delay(1000, stoppingToken); } } }
5.僅需實現(xiàn)IConsumer<OrderCreatedEvent>
泛型接口,即可實現(xiàn)消息的訂閱:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent> { private readonly ILogger<OrderCreatedEventConsumer> _logger; public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger) { _logger = logger; } public Task Consume(ConsumeContext<OrderCreatedEvent> context) { _logger.LogInformation($"Received Order:{context.Message.OrderId}"); return Task.CompletedTask; } }
6.注冊服務(wù):
using MassTransit; using MassTransit.Demo; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService<Worker>(); services.AddMassTransit(configurator => { //注冊消費者 configurator.AddConsumer<OrderCreatedEventConsumer>(); //使用基于內(nèi)存的消息路由傳輸 configurator.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); }); }); }) .Build(); await host.RunAsync();
7.運行項目,一個簡單的進程內(nèi)事件發(fā)布訂閱的應用就完成了。
如果需要使用RabbitMQ 消息代理進行消息傳輸,則僅需安裝MassTransit.RabbitMQ
NuGet包,然后指定使用RabbitMQ 傳輸消息即可。
using MassTransit; using MassTransit.Demo; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService<Worker>(); services.AddMassTransit(configurator => { configurator.AddConsumer<OrderCreatedEventConsumer>(); // configurator.UsingInMemory((context, cfg) => // { // cfg.ConfigureEndpoints(context); // }); configurator.UsingRabbitMq((context, cfg) => { cfg.Host( host: "localhost", port: 5672, virtualHost: "/", configure: hostConfig => { hostConfig.Username("guest"); hostConfig.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); }) .Build(); await host.RunAsync();
運行項目,MassTransit會自動在指定的RabbitMQ上創(chuàng)建一個類型為fanout
的MassTransit.Demo.OrderCreatedEvent
Exchange和一個與OrderCreatedEvent
同名的隊列進行消息傳輸,如下圖所示。
核心概念
MassTranist 為了實現(xiàn)消息代理的透明化和應用間消息的高效傳輸,抽象了以下概念,其中消息流轉(zhuǎn)流程如下圖所示:
- Message:消息契約,定義了消息生產(chǎn)者和消息消費者之間的契約。
- Producer:生產(chǎn)者,發(fā)送消息的一方都可以稱為生產(chǎn)者。
- SendEndpoint:發(fā)送端點,用于將消息內(nèi)容序列化,并發(fā)送到傳輸模塊。
- Transport:傳輸模塊,消息代理透明化的核心,用于和消息代理通信,負責發(fā)送和接收消息。
- ReceiveEndpoint:接收端點,用于從傳輸模塊接收消息,反序列化消息內(nèi)容,并將消息路由到消費者。
- Consumer:消費者,用于消息消費。
從上圖可知,本質(zhì)上還是發(fā)布訂閱模式的實現(xiàn),接下來就核心概念進行詳解。
Message
Message:消息,可以使用class、interface、struct和record來創(chuàng)建,消息作為一個契約,需確保創(chuàng)建后不能篡改,因此應只保留只讀屬性且不應包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName
來表示特定的消息類型。因此若在另外的項目中消費同名的消息類型,需確保消息的命名空間相同。另外需注意消息不應繼承,以避免發(fā)送基類消息類型造成的不可預期的結(jié)果。為避免此類情況,官方建議使用接口來定義消息。在MassTransit中,消息主要分為兩種類型:
- Command:命令,用于告訴服務(wù)做什么,命令被發(fā)送到指定端點,僅被一個服務(wù)接收并執(zhí)行。一般以動名詞結(jié)構(gòu)命名,如:UpdateAddress、CancelOrder。
- Event:事件,用于告訴服務(wù)什么發(fā)生了,事件被發(fā)布到多個端點,可以被多個服務(wù)消費。 一般以過去式結(jié)構(gòu)命名,如:AddressUpdated,OrderCanceled。
經(jīng)過MassTransit發(fā)送的消息,會使用信封包裝,包含一些附加信息,數(shù)據(jù)結(jié)構(gòu)舉例如下:
{ "messageId": "6c600000-873b-00ff-9a8f-08da8da85542", "requestId": null, "correlationId": null, "conversationId": "6c600000-873b-00ff-9526-08da8da85544", "initiatorId": null, "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true", "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent", "responseAddress": null, "faultAddress": null, "messageType": [ "urn:message:MassTransit.Demo:OrderCreatedEvent" ], "message": { "orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8" }, "expirationTime": null, "sentTime": "2022-09-03T12:32:15.0796943Z", "headers": {}, "host": { "machineName": "THINKPAD", "processName": "MassTransit.Demo", "processId": 24684, "assembly": "MassTransit.Demo", "assemblyVersion": "1.0.0.0", "frameworkVersion": "6.0.5", "massTransitVersion": "8.0.6.0", "operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0" } }
從以上消息實例中可以看出一個包裝后的消息包含以下核心屬性:
- messageId:全局唯一的消息ID
- messageType:消息類型
- message:消息體,也就是具體的消息實例
- sourceAddress:消息來源地址
- destinationAddress:消息目標地址
- responseAddress:響應地址,在請求響應模式中使用
- faultAddress:消息異常發(fā)送地址,用于存儲異常消費消息
- headers:消息頭,允許應用自定義擴展信息
- correlationId:關(guān)聯(lián)Id,在Saga狀態(tài)機中會用到,用來關(guān)聯(lián)系列事件
- host:宿主,消息來源應用的宿主信息
Producer
Producer,生產(chǎn)者,即用于生產(chǎn)消息。在MassTransit主要借助以下對象進行命令的發(fā)送和事件的發(fā)布。
從以上類圖可以看出,消息的發(fā)送主要核心依賴于兩個接口:
ISendEndpoint
:提供了Send
方法,用于發(fā)送命令。IPublishEndpoint
:提供了Publish
方法,用于發(fā)布事件。
但基于上圖的繼承體系,可以看出通過IBus
、ISendEndpointProvider
和ConsumeContext
進行命令的發(fā)送;通過IBus
和IPublishEndpointProvider
進行事件的發(fā)布。具體舉例如下:
發(fā)送命令
1.通過IBus
發(fā)送:
private readonly IBus _bus; public async Task Post(CreateOrderRequest request) { //通過以下方式配置對應消息類型的目標地址 EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order")); await _bus.Send(request); }
2.通過ISendEndpointProvider
發(fā)送:
private readonly ISendEndpointProvider _sendEndpointProvider; public async Task Post(CreateOrderRequest request) { var serviceAddress = new Uri("queue:create-order"); var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress); await endpoint.Send(request); }
3.通過ConsumeContext
發(fā)送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest> { public async Task Consume(ConsumeContext<CreateOrderRequest> context) { //do something else var destinationAddress = new Uri("queue:lock-stock"); var command = new LockStockRequest(context.Message.OrderId); await context.Send<LockStockRequest>(destinationAddress, command); // 也可以通過獲取`SendEndpoint`發(fā)送命令 // var endpoint = await context.GetSendEndpoint(destinationAddress); // await endpoint.Send<LockStockRequest>(command); } }
發(fā)布事件
1.通過IBus
發(fā)布:
private readonly IBus _bus; public async Task Post(CreateOrderRequest request) { //do something await _bus.Publish(request); }
2.通過IPublishEndpoint
發(fā)布:
private readonly IPublishEndpoint _publishEndpoint; public async Task Post(CreateOrderRequest request) { //do something var order = CreateOrder(request); await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id)); }
3.通過ConsumeContext
發(fā)布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest> { public async Task Consume(ConsumeContext<CreateOrderRequest> context) { var order = CreateOrder(conext.Message); await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id)); } }
Consumer
Consumer,消費者,即用于消費消息。MassTransit 包括多種消費者類型,主要分為無狀態(tài)和有狀態(tài)兩種消費者類型。
無狀態(tài)消費者
無狀態(tài)消費者,即消費者無狀態(tài),消息消費完畢,消費者就釋放。主要的消費者類型有:IConsumer<TMessage>
、JobConsumer
、IActivity
和RoutingSlip
等。其中IConsumer<TMessage>
已經(jīng)在上面的快速體驗
部分舉例說明。而JobConsumer<TMessage>
主要是對IConsumer<TMessage>
的補充,其主要應用場景在于執(zhí)行耗時任務(wù)。
而對于IActivity
和RoutingSlip
則是MassTransit Courier
的核心對象,主要用于實現(xiàn)Saga模式的分布式事務(wù)。MassTransit Courier 實現(xiàn)了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivity
和IExecuteActivity
。二者的差別在于IActivity
定義了Execute
和Compensate
兩個方法,而IExecuteActivitiy
僅定義了Execute
方法。其中Execute
代表正向操作,Compensate
代表反向補償操作。用一個簡單的下單流程:創(chuàng)建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對于具體實現(xiàn),可參閱文章:AspNetCore&MassTransit Courier實現(xiàn)分布式事務(wù)
有狀態(tài)消費者
有狀態(tài)消費者,即消費者有狀態(tài),其狀態(tài)會持久化,代表的消費者類型為MassTransitStateMachine
。MassTransitStateMachine
是MassTransit Automatonymous
庫定義的,Automatonymous
是一個.NET 狀態(tài)機庫,用于定義狀態(tài)機,包括狀態(tài)、事件和行為。MassTransitStateMachine
就是狀態(tài)機的具體抽象,可以用其編排一系列事件來實現(xiàn)狀態(tài)的流轉(zhuǎn),也可以用來實現(xiàn)Saga模式的分布式事務(wù)。并支持與EF Core和Dapper集成將狀態(tài)持久化到關(guān)系型數(shù)據(jù)庫,也支持將狀態(tài)持久化到MongoDB、Redis等數(shù)據(jù)庫。MassTransitStateMachine
對于Saga模式分布式事務(wù)的實現(xiàn)方式與RoutingSlip
不同,還是以簡單的下單流程:創(chuàng)建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示?;?code>MassTransitStateMachine 實現(xiàn)分布式事務(wù)詳參后續(xù)文章。
從上圖可知,通過MassTransitStateMachine
可以將事件的執(zhí)行順序邏輯編排在一個集中的狀態(tài)機中,通過發(fā)送命令和訂閱事件來推動狀態(tài)流轉(zhuǎn),而這也正是Saga編排模式的實現(xiàn)。
應用場景
了解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應用場景:
- 基于消息的請求響應模式:可用于同步通信
- Mediator模式:中間者模式的實現(xiàn),類似MediatR,但功能更完善
- 計劃任務(wù):可用于執(zhí)行定時任務(wù)
- Routing Slip 模式:可用于實現(xiàn)Saga模式的分布式事務(wù)
- Saga 狀態(tài)機:可用于實現(xiàn)Saga模式的分布式事務(wù)
- 本地消息表:類似DotNetCore.Cap,用于實現(xiàn)最終一致性
總體而言,MassTransit是一款優(yōu)秀的分布式應用框架,可作為分布式應用的消息總線,也可以用作單體應用的事件總線。感興趣的朋友不妨一觀。
到此這篇關(guān)于MassTransit 中的.NET 分布式應用框架的文章就介紹到這了,更多相關(guān).NET 分布式應用框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Community Server專題二:體系結(jié)構(gòu)
Community Server專題二:體系結(jié)構(gòu)...2007-03-03.NET使用StackTrace獲取方法調(diào)用信息的代碼演示
StackTrace, 位于 System.Diagnostics 命名空間下,名字很直觀,它代表一個方法調(diào)用的跟蹤堆棧,里面存放著按順序排列的棧幀對象(StackFrame),每當發(fā)生一次調(diào)用,就會壓入一個棧幀,這篇文章主要介紹了.NET使用StackTrace獲取方法調(diào)用信息,需要的朋友可以參考下2022-09-09ASP.NET MVC 3仿Server.Transfer效果的實現(xiàn)方法
這篇文章主要介紹了ASP.NET MVC 3仿Server.Transfer效果的實現(xiàn)方法,需要的朋友可以參考下2015-10-10asp.net為網(wǎng)頁動態(tài)添加關(guān)鍵詞的方法
這篇文章主要介紹了asp.net為網(wǎng)頁動態(tài)添加關(guān)鍵詞的方法,可實現(xiàn)動態(tài)添加keyword meta的功能,非常具有實用價值,需要的朋友可以參考下2015-04-04.net 運用二進制位運算進行數(shù)據(jù)庫權(quán)限管理
.net 運用二進制位運算進行數(shù)據(jù)庫權(quán)限管理 ,需要的朋友可以參考一下2013-02-02