欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

講解如何利用 Python完成 Saga 分布式事務(wù)

 更新時間:2021年09月08日 15:11:13   作者:葉東富  
這篇文章主要介紹了如何利用 Python 完成一個 Saga 的分布式事務(wù),需要的朋友可以參考下面文章具體的內(nèi)容

銀行跨行轉(zhuǎn)賬業(yè)務(wù)是一個典型分布式事務(wù)場景,假設(shè) A 需要跨行轉(zhuǎn)賬給 B,那么就涉及兩個銀行的數(shù)據(jù),無法通過一個數(shù)據(jù)庫的本地事務(wù)保證轉(zhuǎn)賬的 ACID,只能夠通過分布式事務(wù)來解決。

1、分布式事務(wù)

分布式事務(wù)在分布式環(huán)境下,為了滿足可用性、性能與降級服務(wù)的需要,降低一致性與隔離性的要求,一方面遵循 BASE 理論:

  • 基本業(yè)務(wù)可用性( Basic Availability )
  • 柔性狀態(tài)( Soft state )
  • 最終一致性( Eventual consistency )

另一方面,分布式事務(wù)也部分遵循 ACID 規(guī)范:

  • 原子性:嚴(yán)格遵循
  • 一致性:事務(wù)完成后的一致性嚴(yán)格遵循;事務(wù)中的一致性可適當(dāng)放寬
  • 隔離性:并行事務(wù)間不可影響;事務(wù)中間結(jié)果可見性允許安全放寬
  • 持久性:嚴(yán)格遵循

2、SAGA

Saga 是這一篇數(shù)據(jù)庫論文SAGAS提到的一個分布式事務(wù)方案。其核心思想是將長事務(wù)拆分為多個本地短事務(wù),由 Saga 事務(wù)協(xié)調(diào)器協(xié)調(diào),如果各個本地事務(wù)成功完成那就正常完成,如果某個步驟失敗,則根據(jù)相反順序一次調(diào)用補償操作。

目前可用于 SAGA 的開源框架,主要為 Java 語言,其中以 seata 為代表。我們的例子采用 go 語言,使用的分布式事務(wù)框架為https://github.com/yedf/dtm,它對分布式事務(wù)的支持非常優(yōu)雅。下面來詳細(xì)講解 SAGA 的組成:

DTM 事務(wù)框架里,有 3 個角色,與經(jīng)典的 XA 分布式事務(wù)一樣:

  • AP/應(yīng)用程序,發(fā)起全局事務(wù),定義全局事務(wù)包含哪些事務(wù)分支
  • RM/資源管理器,負(fù)責(zé)分支事務(wù)各項資源的管理
  • TM/事務(wù)管理器,負(fù)責(zé)協(xié)調(diào)全局事務(wù)的正確執(zhí)行,包括 SAGA 正向 /逆向操作的執(zhí)行

下面看一個成功完成的 SAGA 時序圖,就很容易理解 SAGA 分布式事務(wù):

3、SAGA 實踐

對于我們要進行的銀行轉(zhuǎn)賬的例子,我們將在正向操作中,進行轉(zhuǎn)入轉(zhuǎn)出,在補償操作中,做相反的調(diào)整。

首先我們創(chuàng)建賬戶余額表:

CREATE TABLE dtm_busi.`user_account` ( 
  `id` int(11) AUTO_INCREMENT PRIMARY KEY, 
  `user_id` int(11) not NULL UNIQUE , 
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00', 
  `create_time` datetime DEFAULT now(), 
  `update_time` datetime DEFAULT now() 
); 


我們先編寫核心業(yè)務(wù)代碼,調(diào)整用戶的賬戶余額

def saga_adjust_balance(cursor, uid, amount): 
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount)) 
  if affected == 0: 
    raise Exception("update error, balance not enough") 


下面我們來編寫具體的正向操作 /補償操作的處理函數(shù)

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  saga_adjust_balance(c, out_uid, -30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransOutCompensate") 
def trans_out_compensate(): 
  saga_adjust_balance(c, out_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  saga_adjust_balance(c, in_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInCompensate") 
def trans_in_compensate(): 
  saga_adjust_balance(c, in_uid, -30) 
  return {"dtm_result": "SUCCESS"} 


到此各個子事務(wù)的處理函數(shù)已經(jīng) OK 了,然后是開啟 SAGA 事務(wù),進行分支調(diào)用

# 這是 dtm 服務(wù)地址 
dtm = "http://localhost:8080/api/dtmsvr" 
# 這是業(yè)務(wù)微服務(wù)地址 
svc = "http://localhost:5000/api" 
 
    req = {"amount": 30} 
    s = saga.Saga(dtm, utils.gen_gid(dtm)) 
    s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate") 
    s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate") 
    s.submit() 


至此,一個完整的 SAGA 分布式事務(wù)編寫完成。

如果您想要完整運行一個成功的示例,那么參考這個例子yedf/dtmcli-py-sample,將它運行起來非常簡單

# 部署啟動 dtm 
# 需要 docker 版本 18 以上 
git clone https://github.com/yedf/dtm 
cd dtm 
docker-compose up 
 
# 另起一個命令行 
git clone https://github.com/yedf/dtmcli-py-sample 
cd dtmcli-py-sample 
pip3 install flask dtmcli requests 
flask run 
 
# 另起一個命令行 
curl localhost:5000/api/fireSaga 

4、處理網(wǎng)絡(luò)異常

假設(shè)提交給 dtm 的事務(wù)中,調(diào)用轉(zhuǎn)入操作時,出現(xiàn)短暫的故障怎么辦?按照 SAGA 事務(wù)的協(xié)議,dtm 會重試未完成的操作,這時我們要如何處理?故障有可能是轉(zhuǎn)入操作完成后出網(wǎng)絡(luò)故障,也有可能是轉(zhuǎn)入操作完成中出現(xiàn)機器宕機。如何處理才能夠保障賬戶余額的調(diào)整是正確無問題的?

這類網(wǎng)絡(luò)異常的妥當(dāng)處理,是分布式事務(wù)中的大難題,異常情況包括三類:重復(fù)請求、空補償、懸掛,都需要正確處理

DTM 提供了子事務(wù)屏障功能,保證上述異常情況下的業(yè)務(wù)邏輯,只會有一次正確順序下的成功提交。(子事務(wù)屏障詳情參考分布式事務(wù)最經(jīng)典的七種解決方案的子事務(wù)屏障環(huán)節(jié))

我們把處理函數(shù)調(diào)整為:

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, out_uid, -30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "SUCCESS"} 


這里的 barrier_from_req(request).call(cursor, busi_callback)調(diào)用會使用子事務(wù)屏障技術(shù),保證 busi_callback 回調(diào)函數(shù)僅被提交一次

您可以嘗試多次調(diào)用這個 TransIn 服務(wù),僅有一次余額調(diào)整。

5、處理回滾

假如銀行將金額準(zhǔn)備轉(zhuǎn)入用戶 2 時,發(fā)現(xiàn)用戶 2 的賬戶異常,返回失敗,會怎么樣?我們調(diào)整處理函數(shù),讓轉(zhuǎn)入操作返回失敗

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  return {"dtm_result": "FAILURE"} 


我們給出事務(wù)失敗交互的時序圖:

這里有一點,TransIn 的正向操作什么都沒有做,就返回了失敗,此時調(diào)用 TransIn 的補償操作,會不會導(dǎo)致反向調(diào)整出錯了呢?

不用擔(dān)心,前面的子事務(wù)屏障技術(shù),能夠保證 TransIn 的錯誤如果發(fā)生在提交之前,則補償為空操作;TransIn 的錯誤如果發(fā)生在提交之后,則補償操作會將數(shù)據(jù)提交一次。

我們可以將返回錯誤的 TransIn 改成:

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, in_uid, 30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "FAILURE"} 


最后的結(jié)果余額依舊會是對的,原理可以參考:分布式事務(wù)最經(jīng)典的七種解決方案的子事務(wù)屏障環(huán)節(jié)

6、小結(jié)

在這篇文章里,我們介紹了 SAGA 的理論知識,也通過一個例子,完整給出了編寫一個 SAGA 事務(wù)的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。相信讀者通過這邊文章,對 SAGA 已經(jīng)有了深入的理解。

文中使用的 dtm 是新開源的 Golang 分布式事務(wù)管理框架,功能強大,支持 TCC 、SAGA 、XA 、事務(wù)消息等事務(wù)模式,支持 Go 、python 、PHP 、node 、csharp 等語言的。同時提供了非常簡單易用的接口。

以上就是利用 Python 輕松完成一個 Saga 分布式事務(wù)的詳細(xì)內(nèi)容,更多關(guān)于Python完成一個 Saga 分布式事務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論