如何用C#實(shí)現(xiàn)SAGA分布式事務(wù)
背景
銀行跨行轉(zhuǎn)賬業(yè)務(wù)是一個典型分布式事務(wù)場景,假設(shè) A 需要跨行轉(zhuǎn)賬給 B,那么就涉及兩個銀行的數(shù)據(jù),無法通過一個數(shù)據(jù)庫的本地事務(wù)保證轉(zhuǎn)賬的 ACID ,只能夠通過分布式事務(wù)來解決。
市面上使用比較多的分布式事務(wù)框架,支持 SAGA 的,大部分都是 JAVA 為主的,沒有提供 C# 的對接方式,或者是對接難度大,一定程度上讓人望而卻步。
下面就基于這個框架來實(shí)踐一下銀行轉(zhuǎn)賬的例子。
前置工作
dotnet add package Dtmcli --version 0.3.0
成功的 SAGA
先來看一下一個成功完成的 SAGA 時(shí)序圖。
上圖的微服務(wù)1,對應(yīng)我們示例的 OutApi,也就是轉(zhuǎn)錢出去的那個服務(wù)。
微服務(wù)2,對應(yīng)我們示例的 InApi,也就是轉(zhuǎn)錢進(jìn)來的那個服務(wù)。
下面是兩個服務(wù)的正向操作和補(bǔ)償操作的處理。
OutApi
app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => { // 進(jìn)行 數(shù)據(jù)庫操作 Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); }); app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) => { // 進(jìn)行 數(shù)據(jù)庫操作 Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)出【{req.Amount}】補(bǔ)償操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); });
InApi
app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) => { Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); }); app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) => { Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)入【{req.Amount}】補(bǔ)償操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); });
注:示例為了簡單,沒有進(jìn)行實(shí)際的數(shù)據(jù)庫操作。
到此各個子事務(wù)的處理已經(jīng) OK 了,然后是開啟 SAGA 事務(wù),進(jìn)行分支調(diào)用
var userOutReq = new TransRequest() { UserId = "1", Amount = -30 }; var userInReq = new TransRequest() { UserId = "2", Amount = 30 }; var ct = new CancellationToken(); var gid = await dtmClient.GenGid(ct); var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq) ; var flag = await saga.Submit(ct); Console.WriteLine($"case1, {gid} saga 提交結(jié)果 = {flag}");
到這里,一個完整的 SAGA 分布式事務(wù)就編寫完成了。
搭建好 dtm 的環(huán)境后,運(yùn)行上面的例子,會看到下面的輸出。
當(dāng)然,上面的情況太理想了,轉(zhuǎn)出轉(zhuǎn)入都是一次性就成功了。
但是實(shí)際上我們會遇到許許多多的問題,最常見的應(yīng)該就是網(wǎng)絡(luò)故障了。
下面來看一個異常的 SAGA 示例
異常的 SAGA
做一個假設(shè),用戶1的轉(zhuǎn)出是正常的,但是用戶2在轉(zhuǎn)入的時(shí)候出現(xiàn)了問題。
由于事務(wù)已經(jīng)提交給 dtm 了,按照 SAGA 事務(wù)的協(xié)議,dtm 會重試未完成的操作。
這個時(shí)候用戶2 這邊會出現(xiàn)什么樣的情況呢?
轉(zhuǎn)入其實(shí)成功了,但是 dtm 收到錯誤 (網(wǎng)絡(luò)故障等)轉(zhuǎn)入沒有成功,直接告訴 dtm 失敗了 (應(yīng)用異常等)
無論是那一種,dtm 都會進(jìn)行重試操作。這個時(shí)候會發(fā)生什么呢?我們繼續(xù)往下看。
先看一下事務(wù)失敗交互的時(shí)序圖
再通過調(diào)整上面成功的例子,來比較直觀的看看出現(xiàn)的情況。
在 InApi 加多一個轉(zhuǎn)入失敗的處理接口
app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) => { Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)入【{req.Amount}】正向操作--失敗,gid={gid}, branch_id={branch_id}, op={op}"); //return Results.BadRequest(); return Results.Ok(TransResponse.BuildFailureResponse()); });
失敗的返回有兩種,一種是狀態(tài)碼大于 400,一種是狀態(tài)碼是 200 并且響應(yīng)體包含 FAILURE,上面的例子是第二種
調(diào)整一下調(diào)用方,把轉(zhuǎn)入正向操作替換成上面這個返回錯誤的接口。
var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);
運(yùn)行結(jié)果如下:
在這個例子中,只考慮補(bǔ)償/重試成功的情況下。
用戶1 轉(zhuǎn)出的 30 塊錢最終是回到了他的帳號上,他沒有出現(xiàn)損失。
用戶2 就有點(diǎn)苦逼了,轉(zhuǎn)入沒有成功,返回了失敗,還觸發(fā)了轉(zhuǎn)入的補(bǔ)償機(jī)制,結(jié)果就是把用戶2 還沒進(jìn)帳的 30 塊錢給多扣了,這個就是上面的情況2,常見的空補(bǔ)償問題。
這個時(shí)候就要在進(jìn)行轉(zhuǎn)入補(bǔ)償?shù)臅r(shí)候做一系列的判斷,轉(zhuǎn)入有沒有成功,轉(zhuǎn)出有沒有失敗等等,把業(yè)務(wù)變的十分復(fù)雜。
如果出現(xiàn)了上述的情況1,會發(fā)生什么呢?
用戶2 第一次已經(jīng)成功轉(zhuǎn)入 30 塊錢,返回的也是成功,但是網(wǎng)絡(luò)出了點(diǎn)問題,導(dǎo)致 dtm 認(rèn)為失敗了,它就會進(jìn)行重試,相當(dāng)于用戶2 還會收到第二個轉(zhuǎn)入 30 塊錢的請求!也就是說這次轉(zhuǎn)帳,用戶2 會進(jìn)賬 60 塊錢,翻倍了,也就是說這個請求不是冪等。
同樣的,要處理這個問題,在進(jìn)行轉(zhuǎn)入的正向操作中也要進(jìn)行一系列的判斷,同樣會把復(fù)雜度上升一個級別。
前面有提到 dtm 提供了子事務(wù)屏障的功能,保證了冪等、空補(bǔ)償?shù)瘸R妴栴}。
再來看看這個子事務(wù)屏障的功能有沒有幫我們簡化上面異常處理。
子事務(wù)屏障
子事務(wù)屏障,需要根據(jù) trans_type,gid,branch_id 和 op 四個內(nèi)容進(jìn)行創(chuàng)建。
這4個內(nèi)容 dtm 在回調(diào)時(shí)會放在 querysting 上面。
客戶端里面提供了 IBranchBarrierFactory 來供我們使用。
空補(bǔ)償
針對上面的異常情況(用戶2 憑空消失 30 塊錢),對轉(zhuǎn)入的補(bǔ)償進(jìn)行子事務(wù)屏障的改造。
app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) => { var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op); using var db = Db.GeConn(); await barrier.Call(db, async (tx) => { // 轉(zhuǎn)入失敗的情況下,不應(yīng)該輸出下面這個 Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)入【{req.Amount}】補(bǔ)償操作,gid={gid}, branch_id={branch_id}, op={op}"); // tx 參數(shù)是事務(wù),可和本地事務(wù)一起提交回滾 await Task.CompletedTask; }); Console.WriteLine($"子事務(wù)屏障-補(bǔ)償操作,gid={gid}, branch_id={branch_id}, op={op}"); return Results.Ok(TransResponse.BuildSucceedResponse()); });
Call 方法就是關(guān)鍵所在了,需要傳入一個 DbConnection 和真正的業(yè)務(wù)操作,這里的業(yè)務(wù)操作就是在控制臺輸出補(bǔ)償操作的信息。
同樣的,我們再調(diào)整一下調(diào)用方,把轉(zhuǎn)入補(bǔ)償操作替換成上面帶子事務(wù)屏障的接口。
var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq) ;
再來運(yùn)行這個例子。
會發(fā)現(xiàn)轉(zhuǎn)入的補(bǔ)償操作并沒執(zhí)行,控制臺沒有輸出補(bǔ)償信息,而是輸出了
Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False
這個就表明了,這個請求是個空補(bǔ)償,是不應(yīng)該執(zhí)行業(yè)務(wù)方法的,既空操作。
再來看一下,轉(zhuǎn)入成功的,但是 dtm 收到了失敗的信號,不斷重試造成重復(fù)請求的情況。
冪等
針對用戶2 轉(zhuǎn)入兩次 30 塊錢的異常情況,對轉(zhuǎn)入的正向操作進(jìn)行子事務(wù)屏障的改造。
app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) => { Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)入【{req.Amount}】請求來了!??! gid={gid}, branch_id={branch_id}, op={op}"); var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op); using var db = Db.GeConn(); await barrier.Call(db, async (tx) => { var c = Interlocked.Increment(ref _errCount); // 模擬一個超時(shí)執(zhí)行 if (c > 0 && c < 2) await Task.Delay(10000); Console.WriteLine($"用戶【{req.UserId}】轉(zhuǎn)入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}"); await Task.CompletedTask; }); return Results.Ok(TransResponse.BuildSucceedResponse()); });
這里通過一個超時(shí)執(zhí)行來讓 dtm 進(jìn)行轉(zhuǎn)入正向操作的重試。
同樣的,我們再調(diào)整一下調(diào)用方,把轉(zhuǎn)入的正向操作也替換成上面帶子事務(wù)屏障的接口。
var saga = new Saga(dtmClient, gid) .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq) .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq) ;
再來運(yùn)行這個例子。
可以看到轉(zhuǎn)入的正向操作確實(shí)是觸發(fā)了多次,第一次實(shí)際上是成功,只是響應(yīng)比較慢,導(dǎo)致 dtm 認(rèn)為是失敗了,觸發(fā)了第二次請求,但是第二次請求并沒有執(zhí)行業(yè)務(wù)操作,而是輸出了
Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True
這個就表明了,這個請求是個重復(fù)請求,是不應(yīng)該執(zhí)行業(yè)務(wù)方法的,保證了冪等。
到這里,可以看出,子事務(wù)屏障確實(shí)解決了冪等和空補(bǔ)償?shù)膯栴},大大降低了業(yè)務(wù)判斷的復(fù)雜度和出錯的可能性。
寫在最后
在這篇文章里,也通過幾個例子,完整給出了編寫一個 SAGA 事務(wù)的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。希望對研究分布式事務(wù)的您有所幫助。
本文示例代碼: DtmSagaSample
到此這篇關(guān)于如何用C#實(shí)現(xiàn)SAGA分布式事務(wù)的文章就介紹到這了,更多相關(guān)C#實(shí)現(xiàn)SAGA內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C#中comboBox實(shí)現(xiàn)三級聯(lián)動
給大家分享了C#中comboBox實(shí)現(xiàn)三級聯(lián)動的全部代碼,代碼經(jīng)過測試,有興趣的朋友跟著做一下。2018-03-03WPF實(shí)現(xiàn)雷達(dá)圖(仿英雄聯(lián)盟)的示例代碼
這篇文章主要介紹了如何利用WPF實(shí)現(xiàn)雷達(dá)圖(仿英雄聯(lián)盟)的繪制,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)或工作有一定幫助,需要的可以參考一下2022-07-07如何用C#找出數(shù)組中只出現(xiàn)了一次的數(shù)字
數(shù)組從字面上理解就是存放一組數(shù),下面這篇文章主要給大家介紹了關(guān)于如何用C#找出數(shù)組中只出現(xiàn)了一次的數(shù)字,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-12-12