欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解析探秘fescar分布式事務(wù)實(shí)現(xiàn)原理

 更新時(shí)間:2022年02月28日 15:36:13   作者:kl  
這篇文章主要為大家解析探秘fescar分布式事務(wù)的實(shí)現(xiàn)原理,fescar的txc模型實(shí)現(xiàn)非常有研究的價(jià)值,所以今天我們來(lái)好好翻一翻fescar項(xiàng)目的代碼

前言

fescar發(fā)布已有時(shí)日,分布式事務(wù)一直是業(yè)界備受關(guān)注的領(lǐng)域,fescar發(fā)布一個(gè)月左右便受到了近5000個(gè)star足以說(shuō)明其熱度。當(dāng)然,在fescar出來(lái)之前,已經(jīng)有比較成熟的分布式事務(wù)的解決方案開(kāi)源了,

比較典型的方案如LCN(https://github.com/codingapi/tx-lcn)的2pc型無(wú)侵入事務(wù),目前l(fā)cn已發(fā)展到5.0,已支持和fescar事務(wù)模型類(lèi)似的TCX型事務(wù)。

還有如TCC型事務(wù)實(shí)現(xiàn)hmily(https://github.com/yu199195/hmily)、

tcc-transaction(https://github.com/changmingxie/tcc-transaction)等。在微服務(wù)架構(gòu)流行的當(dāng)下、阿里這種開(kāi)源大戶背景下,fescar的發(fā)布無(wú)疑又掀起了研究分布式事務(wù)的熱潮。fescar脫胎于阿里云商業(yè)分布式事務(wù)服務(wù)GTS,在線上環(huán)境提供這種公共服務(wù)其模式肯定經(jīng)受了非常嚴(yán)苛的考驗(yàn)。其分布式事務(wù)模型TXC又仿于傳統(tǒng)事務(wù)模型XA方案,主要區(qū)別在于資源管理器的定位一個(gè)在應(yīng)用層一個(gè)在數(shù)據(jù)庫(kù)層。博主覺(jué)得fescar的txc模型實(shí)現(xiàn)非常有研究的價(jià)值,所以今天我們來(lái)好好翻一翻fescar項(xiàng)目的代碼。本文篇幅較長(zhǎng),瀏覽并理解本文大概耗時(shí)30~60分鐘左右。

項(xiàng)目說(shuō)明

本博文所述代碼為fescar的0.1.2-SNAPSHOT版本,根據(jù)fescar后期的迭代計(jì)劃,其項(xiàng)目結(jié)構(gòu)和模塊實(shí)現(xiàn)都可能有很大的改變,特此說(shuō)明。

fescar的TXC模型

上圖為fescar官方針對(duì)TXC模型制作的示意圖。不得不說(shuō)大廠的圖制作的真的不錯(cuò),結(jié)合示意圖我們可以看到TXC實(shí)現(xiàn)的全貌。TXC的實(shí)現(xiàn)通過(guò)三個(gè)組件來(lái)完成。也就是上圖的三個(gè)深黃色部分,其作用如下,:

  • TM:全局事務(wù)管理器,在標(biāo)注開(kāi)啟fescar分布式事務(wù)的服務(wù)端開(kāi)啟,并將全局事務(wù)發(fā)送到TC事務(wù)控制端管理
  • TC:事務(wù)控制中心,控制全局事務(wù)的提交或者回滾。這個(gè)組件需要獨(dú)立部署維護(hù),目前只支持單機(jī)版本,后續(xù)迭代計(jì)劃會(huì)有集群版本
  • RM:資源管理器,主要負(fù)責(zé)分支事務(wù)的上報(bào),本地事務(wù)的管理

一段話簡(jiǎn)述其實(shí)現(xiàn)過(guò)程:服務(wù)起始方發(fā)起全局事務(wù)并注冊(cè)到TC。在調(diào)用協(xié)同服務(wù)時(shí),協(xié)同服務(wù)的事務(wù)分支事務(wù)會(huì)先完成階段一的事務(wù)提交或回滾,并生成事務(wù)回滾的undo_log日志,同時(shí)注冊(cè)當(dāng)前協(xié)同服務(wù)到TC并上報(bào)其事務(wù)狀態(tài),歸并到同一個(gè)業(yè)務(wù)的全局事務(wù)中。此時(shí)若沒(méi)有問(wèn)題繼續(xù)下一個(gè)協(xié)同服務(wù)的調(diào)用,期間任何協(xié)同服務(wù)的分支事務(wù)回滾,都會(huì)通知到TC,TC在通知全局事務(wù)包含的所有已完成一階段提交的分支事務(wù)回滾。如果所有分支事務(wù)都正常,最后回到全局事務(wù)發(fā)起方時(shí),也會(huì)通知到TC,TC在通知全局事務(wù)包含的所有分支刪除回滾日志。在這個(gè)過(guò)程中為了解決寫(xiě)隔離和度隔離的問(wèn)題會(huì)涉及到TC管理的全局鎖。

本博文的目標(biāo)是深入代碼細(xì)節(jié),探究其基本思路是如何實(shí)現(xiàn)的。首先會(huì)從項(xiàng)目的結(jié)構(gòu)來(lái)簡(jiǎn)述每個(gè)模塊的作用,繼而結(jié)合官方自帶的examples實(shí)例來(lái)探究整個(gè)分布式事務(wù)的實(shí)現(xiàn)過(guò)程。

項(xiàng)目結(jié)構(gòu)解析

項(xiàng)目拉下來(lái),用IDE打開(kāi)后的目錄結(jié)構(gòu)如下,下面先大致的看下每個(gè)模塊的實(shí)現(xiàn)

common :公共組件,提供常用輔助類(lèi),靜態(tài)變量、擴(kuò)展機(jī)制類(lèi)加載器、以及定義全局的異常等

config : 配置加載解析模塊,提供了配置的基礎(chǔ)接口,目前只有文件配置實(shí)現(xiàn),后續(xù)會(huì)有nacos等配置中心的實(shí)現(xiàn)

core : 核心模塊主要封裝了TM、RM和TC通訊用RPC相關(guān)內(nèi)容

distrbution :這個(gè)模塊目前是空的,distrbution是高性能的隊(duì)列,后期應(yīng)該應(yīng)用于事務(wù)日志的落地

dubbo :dubbo模塊主要適配dubbo通訊框架,使用dubbo的filter機(jī)制來(lái)傳統(tǒng)全局事務(wù)的信息到分支

examples :簡(jiǎn)單的演示實(shí)例模塊,等下從這個(gè)模塊入手探索

rm-datasource :資源管理模塊,比較核心的一個(gè)模塊,個(gè)人認(rèn)為這個(gè)模塊命名為core要更合理一點(diǎn)。代理了JDBC的一些類(lèi),用來(lái)解析sql生成回滾日志、協(xié)調(diào)管理本地事務(wù)

server : TC組件所在,主要協(xié)調(diào)管理全局事務(wù),負(fù)責(zé)全局事務(wù)的提交或者回滾,同時(shí)管理維護(hù)全局鎖。

spring :和spring集成的模塊,主要是aop邏輯,是整個(gè)分布式事務(wù)的入口,研究fescar的突破口

tm : 全局事務(wù)事務(wù)管理模塊,管理全局事務(wù)的邊界,全局事務(wù)開(kāi)啟回滾點(diǎn)都在這個(gè)模塊控制

通過(guò)【examples】模塊的實(shí)例看下效果

第一步、

先啟動(dòng)TC也就是【Server】模塊,main方法直接啟動(dòng)就好,默認(rèn)服務(wù)端口8091

第二步、

回到examples模塊,將訂單,業(yè)務(wù),賬戶、倉(cāng)庫(kù)四個(gè)服務(wù)的配置文件配置好,主要是mysql數(shù)據(jù)源和zookeeper連接地址,這里要注意下,默認(rèn)dubbo的zk注冊(cè)中心依賴沒(méi)有,啟動(dòng)的時(shí)候回拋找不到class的異常,需要添加如下的依賴:

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>

第三步、

在BusinessServiceImpl中的模擬拋異常的地方打個(gè)斷點(diǎn),依次啟動(dòng)OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四個(gè)服務(wù)、等進(jìn)斷點(diǎn)后,查看數(shù)據(jù)庫(kù)account_tbl表,金額已減去400元,變成了599元。然后放開(kāi)斷點(diǎn)、BusinessServiceImpl模塊模擬的異常觸發(fā),全局事務(wù)回滾,account_tbl表的金額就又回滾到999元了

如上,我們已經(jīng)體驗(yàn)到fescar事務(wù)的控制能力了,下面我們具體看下它是怎么控制的。

fescar事務(wù)過(guò)程分析

首先分析配置文件

這個(gè)是一個(gè)鐵律,任何一個(gè)技術(shù)或框架要集成,配置文件肯定是一個(gè)突破口。從上面的例子我們了解到,實(shí)例模塊的配置文件中配置了一個(gè)全局事務(wù)掃描器實(shí)例,如:

<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
    <constructor-arg value="dubbo-demo-app"/>
    <constructor-arg value="my_test_tx_group"/>
</bean>

這個(gè)實(shí)例在項(xiàng)目啟動(dòng)時(shí)會(huì)掃描所有實(shí)例,具體實(shí)現(xiàn)見(jiàn)【spring】模塊。并將標(biāo)注了@GlobalTransactional注解的方法織入GlobalTransactionalInterceptor的invoke方法邏輯。同時(shí)應(yīng)用啟動(dòng)時(shí),會(huì)初始化TM(TmRpcClient)和RM(RmRpcClient)的實(shí)例,這個(gè)時(shí)候,服務(wù)已經(jīng)和TC事務(wù)控制中心勾搭上了。在往下看就涉及到TM模塊的事務(wù)模板類(lèi)TransactionalTemplate。

【TM】模塊啟動(dòng)全局事務(wù)

全局事務(wù)的開(kāi)啟,提交、回滾都被封裝在TransactionlTemplate中完成了,代碼如:

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    // 1. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    // 2. begin transaction
    try {
        tx.begin(business.timeout(), business.name());
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);
    }
    Object rs = null;
    try {
        // Do Your Business
        rs = business.execute();
    } catch (Throwable ex) {
        // 3. any business exception, rollback.
        try {
            tx.rollback();
            // 3.1 Successfully rolled back
            throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
        } catch (TransactionException txe) {
            // 3.2 Failed to rollback
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.RollbackFailure, ex);
        }
    }
    // 4. everything is fine, commit.
    try {
        tx.commit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);
    }
    return rs;
}

更詳細(xì)的實(shí)現(xiàn)在【TM】模塊中被分成了兩個(gè)Class實(shí)現(xiàn),如下:

DefaultGlobalTransaction :全局事務(wù)具體的開(kāi)啟,提交、回滾動(dòng)作

DefaultTransactionManager :負(fù)責(zé)使用TmRpcClient向TC控制中心發(fā)送指令,如開(kāi)啟全局事務(wù)(GlobalBeginRequest)、提交(GlobalCommitRequest)、回滾(GlobalRollbackRequest)、查詢狀態(tài)(GlobalStatusRequest)等。

以上是TM模塊核心內(nèi)容點(diǎn),TM模塊完成全局事務(wù)開(kāi)啟后,接下來(lái)就開(kāi)始看看全局事務(wù)iD,xid是如何傳遞、RM組件是如何介入的

【DUBBO】全局事務(wù)XID的傳遞

首先是xid的傳遞,目前已經(jīng)實(shí)現(xiàn)了dubbo框架實(shí)現(xiàn)的微服務(wù)架構(gòu)下的傳遞,其他的像spring cloud和motan等的想要實(shí)現(xiàn)也很容易,通過(guò)一般RPC通訊框架都有的filter機(jī)制,將xid從全局事務(wù)的發(fā)起節(jié)點(diǎn)傳遞到服務(wù)協(xié)從節(jié)點(diǎn),從節(jié)點(diǎn)接收到后綁定到當(dāng)前線程上線文環(huán)境中,用于在分支事務(wù)執(zhí)行sql時(shí)判斷是否加入全局事務(wù)。fescar的實(shí)現(xiàn)見(jiàn)【dubbo】模塊如下:

@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}

上面代碼rpcXid不為空時(shí),就加入到了RootContext的ContextCore中,這里稍微深入講下。ContextCore是一個(gè)可擴(kuò)展實(shí)現(xiàn)的接口,目前默認(rèn)的實(shí)現(xiàn)是ThreadLocalContextCore,基于ThreadLocal來(lái)保存維護(hù)當(dāng)前的xid。這里fescar提供了可擴(kuò)展的機(jī)制,實(shí)現(xiàn)在【common】模塊中,通過(guò)一個(gè)自定義的類(lèi)加載器EnhancedServiceLoader加載需要擴(kuò)展的服務(wù)類(lèi),這樣只需要在擴(kuò)展類(lèi)加上@LoadLevel注解。標(biāo)記order屬性聲明高優(yōu)先級(jí)別,就可以達(dá)到擴(kuò)展實(shí)現(xiàn)的目的。

【RM】模塊本地資源管理的介入

fescar針對(duì)本地事務(wù)相關(guān)的接口,通過(guò)代理機(jī)制都實(shí)現(xiàn)了一遍代理類(lèi),如數(shù)據(jù)源(DataSourceProxy)、ConnectionProxy、StatementProxy等。這個(gè)在配置文件中也可以看出來(lái),也就是說(shuō),我們要使用fescar分布式事務(wù),一定要配置fescar提供的代理數(shù)據(jù)源。如:

配置好代理數(shù)據(jù)源后,從DataSourceProxy出發(fā),本地針對(duì)數(shù)據(jù)庫(kù)的所有操作過(guò)程我們就可以隨意控制了。從上面xid傳遞,已經(jīng)知道了xid被保存在RootContext中了,那么請(qǐng)看下面的代碼,就非常清楚了:

首先看StatementProxy的一段代碼

在看ExecuteTemplate中的代碼

和【TM】模塊中的事務(wù)管理模板類(lèi)TransactionlTemplate類(lèi)似,這里非常關(guān)鍵的邏輯代理也被封裝在了ExecuteTemplate模板類(lèi)中。因重寫(xiě)了Statement有了StatementProxy實(shí)現(xiàn),在執(zhí)行原JDBC的executeUpdate方法時(shí),會(huì)調(diào)用到ExecuteTemplate的execute邏輯。在sql真正執(zhí)行前,會(huì)判斷RootCOntext當(dāng)前上下文中是否包含xid,也就是判斷當(dāng)前是否是全局分布式事務(wù)。如果不是,就直接使用本地事務(wù),如果是,這里RM就會(huì)增加一些分布式事務(wù)相關(guān)的邏輯了。這里根據(jù)sql的不同的類(lèi)型,fescar封裝了五個(gè)不同的執(zhí)行器來(lái)處理,分別是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,結(jié)構(gòu)如下圖:

PLAINEXECUTOR:

原生的JDBC接口實(shí)現(xiàn),未做任何處理,提供給全局事務(wù)中的普通的select查詢使用

UPDATEEXECUTOR、DELETEEXECUTOR、INSERTEXECUTOR:

三個(gè)DML增刪改執(zhí)行器實(shí)現(xiàn),主要在sql執(zhí)行的前后對(duì)sql語(yǔ)句進(jìn)行了解析,實(shí)現(xiàn)了如下兩個(gè)抽象接口方法:

protected abstract TableRecords beforeImage() throws SQLException;
protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

在這個(gè)過(guò)程中通過(guò)解析sql生成了提供回滾操作的undo_log日志,日志目前是保存在msyql中的,和業(yè)務(wù)sql操作共用同一個(gè)事務(wù)。表的結(jié)構(gòu)如下:

rollback_info保存的undo_log詳細(xì)信息,是longblob類(lèi)型的,結(jié)構(gòu)如下:

{
????"branchId":3958194,
????"sqlUndoLogs":[
????????{
????????????"afterImage":{
????????????????"rows":[
????????????????????{
????????????????????????"fields":[
????????????????????????????{
????????????????????????????????"keyType":"PrimaryKey",
????????????????????????????????"name":"ID",
????????????????????????????????"type":4,
????????????????????????????????"value":10 ????????????????????????????},
????????????????????????????{
????????????????????????????????"keyType":"NULL",
????????????????????????????????"name":"COUNT",
????????????????????????????????"type":4,
????????????????????????????????"value":98 ????????????????????????????}
????????????????????????]
????????????????????}
????????????????],
????????????????"tableName":"storage_tbl" ????????????},
????????????"beforeImage":{
????????????????"rows":[
????????????????????{
????????????????????????"fields":[
????????????????????????????{
????????????????????????????????"keyType":"PrimaryKey",
????????????????????????????????"name":"ID",
????????????????????????????????"type":4,
????????????????????????????????"value":10 ????????????????????????????},
????????????????????????????{
????????????????????????????????"keyType":"NULL",
????????????????????????????????"name":"COUNT",
????????????????????????????????"type":4,
????????????????????????????????"value":100 ????????????????????????????}
????????????????????????]
????????????????????}
????????????????],
????????????????"tableName":"storage_tbl" ????????????},
????????????"sqlType":"UPDATE",
????????????"tableName":"storage_tbl" ????????}
????],
????"xid":"192.168.7.77:8091:3958193" }

這里貼的是一個(gè)update的操作,undo_log記錄的非常的詳細(xì),通過(guò)全局事務(wù)xid關(guān)聯(lián)branchid,記錄數(shù)據(jù)操作的表名,操作字段名,以及sql執(zhí)行前后的記錄數(shù),如這個(gè)記錄,表名=storage_tbl,sql執(zhí)行前ID=10,count=100,sql執(zhí)行后id=10,count=98。如果整個(gè)全局事務(wù)失敗,需要回滾的時(shí)候就可以生成:

update storage_tbl set count = 100 where id = 10;

這樣的回滾sql語(yǔ)句執(zhí)行了。

SELECTFORUPDATEEXECUTOR:

fescar的AT模式在本地事務(wù)之上默認(rèn)支持讀未提交的隔離級(jí)別,但是通過(guò)SelectForUpdateExecutor執(zhí)行器,可以支持讀已提交的隔離級(jí)別。代碼如:

@Override
public Object doExecute(Object... args) throws Throwable {
    SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;
    Connection conn = statementProxy.getConnection();
    ResultSet rs = null;
    Savepoint sp = null;
    LockRetryController lockRetryController = new LockRetryController();
    boolean originalAutoCommit = conn.getAutoCommit();
    StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
    selectSQLAppender.append(getTableMeta().getPkName());
    selectSQLAppender.append(" FROM " + getTableMeta().getTableName());
    String whereCondition = null;
    ArrayList<Object> paramAppender = new ArrayList<>();
    if (statementProxy instanceof ParametersHolder) {
        whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
    } else {
        whereCondition = recognizer.getWhereCondition();
    }
    if (!StringUtils.isEmpty(whereCondition)) {
        selectSQLAppender.append(" WHERE " + whereCondition);
    }
    selectSQLAppender.append(" FOR UPDATE");
    String selectPKSQL = selectSQLAppender.toString();
    try {
        if (originalAutoCommit) {
            conn.setAutoCommit(false);
        }
        sp = conn.setSavepoint();
        rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
        while (true) {
            // Try to get global lock of those rows selected
            Statement stPK = null;
            PreparedStatement pstPK = null;
            ResultSet rsPK = null;
            try {
                if (paramAppender.isEmpty()) {
                    stPK = statementProxy.getConnection().createStatement();
                    rsPK = stPK.executeQuery(selectPKSQL);
                } else {
                    pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
                    for (int i = 0; i < paramAppender.size(); i++) {
                        pstPK.setObject(i + 1, paramAppender.get(i));
                    }
                    rsPK = pstPK.executeQuery();
                }
                TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
                statementProxy.getConnectionProxy().checkLock(selectPKRows);
                break;
            } catch (LockConflictException lce) {
                conn.rollback(sp);
                lockRetryController.sleep(lce);

            } finally {
                if (rsPK != null) {
                    rsPK.close();
                }
                if (stPK != null) {
                    stPK.close();
                }
                if (pstPK != null) {
                    pstPK.close();
                }
            }
        }
    } finally {
        if (sp != null) {
            conn.releaseSavepoint(sp);
        }
        if (originalAutoCommit) {
            conn.setAutoCommit(true);
        }
    }
    return rs;
}

關(guān)鍵代碼見(jiàn):

TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);

通過(guò)selectPKRows表操作記錄拿到lockKeys,然后到TC控制器端查詢是否被全局鎖定了,如果被鎖定了,就重新嘗試,直到鎖釋放返回查詢結(jié)果。

分支事務(wù)的注冊(cè)和上報(bào)

在本地事務(wù)提交前,fescar會(huì)注冊(cè)和上報(bào)分支事務(wù)相關(guān)的信息,見(jiàn)ConnectionProxy類(lèi)的commit部分代碼:

@Override
public void commit() throws SQLException {
    if (context.inGlobalTransaction()) {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }
        try {
            if (context.hasUndoLog()) { 
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();
    } else {
        targetConnection.commit();
    }
}

從這段代碼我們可以看到,首先是判斷是了是否是全局事務(wù),如果不是,就直接提交了,如果是,就先向TC控制器注冊(cè)分支事務(wù),為了寫(xiě)隔離,在TC端會(huì)涉及到全局鎖的獲取。然后保存了用于回滾操作的undo_log日志,繼而真正提交本地事務(wù),最后向TC控制器上報(bào)事務(wù)狀態(tài)。此時(shí),階段一的本地事務(wù)已完成了。

【SERVER】模塊協(xié)調(diào)全局

關(guān)于server模塊,我們可以聚焦在DefaultCoordinator這個(gè)類(lèi),這個(gè)是AbstractTCInboundHandler控制處理器默認(rèn)實(shí)現(xiàn)。主要實(shí)現(xiàn)了全局事務(wù)開(kāi)啟,提交,回滾,狀態(tài)查詢,分支事務(wù)注冊(cè),上報(bào),鎖檢查等接口,如:

回到一開(kāi)始的TransactionlTemplate,如果整個(gè)分布式事務(wù)失敗需要回滾了,首先是TM向TC發(fā)起回滾的指令,然后TC接收到后,解析請(qǐng)求后會(huì)被路由到默認(rèn)控制器類(lèi)的doGlobalRollback方法內(nèi),最終在TC控制器端執(zhí)行的代碼如下:

@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
        BranchStatus currentBranchStatus = branchSession.getStatus();
        if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
            continue;
        }
        try {
            BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

            switch (branchStatus) {
                case PhaseTwo_Rollbacked:
                    globalSession.removeBranch(branchSession);
                    LOGGER.error("Successfully rolled back branch " + branchSession);
                    continue;
                case PhaseTwo_RollbackFailed_Unretryable:
                    GlobalStatus currentStatus = globalSession.getStatus();
                    if (currentStatus.name().startsWith("Timeout")) {
                        globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
                    } else {
                        globalSession.changeStatus(GlobalStatus.RollbackFailed);
                    }
                    globalSession.end();
                    LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
                    return;
                default:
                    LOGGER.info("Failed to rollback branch " + branchSession);
                    if (!retrying) {
                        queueToRetryRollback(globalSession);
                    }
                    return;

            }
        } catch (Exception ex) {
            LOGGER.info("Exception rollbacking branch " + branchSession, ex);
            if (!retrying) {
                queueToRetryRollback(globalSession);
                if (ex instanceof TransactionException) {
                    throw (TransactionException) ex;
                } else {
                    throw new TransactionException(ex);
                }
            }

        }
    }
    GlobalStatus currentStatus = globalSession.getStatus();
    if (currentStatus.name().startsWith("Timeout")) {
        globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
    } else {
        globalSession.changeStatus(GlobalStatus.Rollbacked);
    }
    globalSession.end();
}

如上代碼可以看到,回滾時(shí)從全局事務(wù)會(huì)話中迭代每個(gè)分支事務(wù),然后通知每個(gè)分支事務(wù)回滾。分支服務(wù)接收到請(qǐng)求后,首先會(huì)被路由到RMHandlerAT中的doBranchRollback方法,繼而調(diào)用了RM中的branchRollback方法,代碼如下:

@Override
public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

RM分支事務(wù)端最后執(zhí)行的是UndoLogManager的undo方法,通過(guò)xid和branchid從數(shù)據(jù)庫(kù)查詢出回滾日志,完成數(shù)據(jù)回滾操作,整個(gè)過(guò)程都是同步完成的。如果全局事務(wù)是成功的,TC也會(huì)有類(lèi)似的上述協(xié)調(diào)過(guò)程,只不過(guò)是異步的將本次全局事務(wù)相關(guān)的undo_log清除了而已。至此,就完成了2階段的提交或回滾,也就完成了完整的全局事務(wù)事務(wù)的控制。

結(jié)語(yǔ)

如果你看到這里,那么非常感謝你,在繁忙工作之余耐心的花時(shí)間來(lái)學(xué)習(xí)。同時(shí),我相信花的時(shí)間沒(méi)白費(fèi),完整的瀏覽理解估計(jì)對(duì)fescar實(shí)現(xiàn)的大致流程了解的十之八九了。本文從構(gòu)思立題到完成大概耗時(shí)1人天左右,博主在這個(gè)過(guò)程中,對(duì)fescar的實(shí)現(xiàn)也有了更加深入的了解。由于篇幅原因,并沒(méi)有面面俱到的對(duì)每個(gè)實(shí)現(xiàn)的細(xì)節(jié)去深究,如sql是如何解析的等,更多的是在fescar的TXC模型的實(shí)現(xiàn)過(guò)程的關(guān)鍵點(diǎn)做了詳細(xì)闡述。本文已校對(duì),但由于個(gè)人知識(shí)水平及精力有限,文中不免出現(xiàn)錯(cuò)誤或理解不當(dāng)?shù)牡胤?,歡迎指正。

以上就是解析探秘fescar分布式事務(wù)實(shí)現(xiàn)原理的詳細(xì)內(nèi)容,更多關(guān)于escar分布式事務(wù)原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java京東面試題之為什么HashMap線程不安全

    Java京東面試題之為什么HashMap線程不安全

    那天,小二去京東面試,面試官老王一上來(lái)就甩給了他一道面試題:為什么 HashMap 是線程不安全的?這個(gè)問(wèn)題哪能難的住小二,這篇文章詳細(xì)解答該題目
    2021-11-11
  • Java Cache詳解及簡(jiǎn)單實(shí)現(xiàn)

    Java Cache詳解及簡(jiǎn)單實(shí)現(xiàn)

    這篇文章主要介紹了 Java Cache詳解及簡(jiǎn)單實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • JAVA基本類(lèi)型包裝類(lèi) BigDecimal BigInteger 的使用

    JAVA基本類(lèi)型包裝類(lèi) BigDecimal BigInteger 的使用

    Java 中預(yù)定義了八種基本數(shù)據(jù)類(lèi)型,包括:byte,int,long,double,float,boolean,char,short,接下來(lái)文章小編將向大家介紹其中幾個(gè)類(lèi)型的內(nèi)容,需要的朋友可以參考下文章
    2021-09-09
  • java利用udp實(shí)現(xiàn)發(fā)送數(shù)據(jù)

    java利用udp實(shí)現(xiàn)發(fā)送數(shù)據(jù)

    這篇文章主要為大家詳細(xì)介紹了java利用udp實(shí)現(xiàn)發(fā)送數(shù)據(jù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-07-07
  • 在Spring Boot2中使用CompletableFuture的方法教程

    在Spring Boot2中使用CompletableFuture的方法教程

    這篇文章主要給大家介紹了關(guān)于在Spring Boot2中使用CompletableFuture的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧
    2019-01-01
  • Spring?Boot如何在加載bean時(shí)優(yōu)先選擇我

    Spring?Boot如何在加載bean時(shí)優(yōu)先選擇我

    這篇文章主要介紹了Spring?Boot如何在加載bean時(shí)優(yōu)先選擇我,在?Spring?Boot?應(yīng)用程序中,我們可以采取三種方式實(shí)現(xiàn)自己的?bean?優(yōu)先加載,本文通過(guò)實(shí)例代碼給大家詳細(xì)講解,需要的朋友可以參考下
    2023-03-03
  • hadoop上傳文件功能實(shí)例代碼

    hadoop上傳文件功能實(shí)例代碼

    這篇文章主要介紹了hadoop上傳文件功能實(shí)例代碼,需要的朋友可以參考下
    2017-09-09
  • Java?RabbitMQ消息隊(duì)列詳解常見(jiàn)問(wèn)題

    Java?RabbitMQ消息隊(duì)列詳解常見(jiàn)問(wèn)題

    消息隊(duì)列是最古老的中間件之一,從系統(tǒng)之間有通信需求開(kāi)始,就自然產(chǎn)生了消息隊(duì)列。本文告訴什么是消息隊(duì)列,為什么需要消息隊(duì)列,常見(jiàn)的消息隊(duì)列有哪些,RabbitMQ的部署和使用
    2022-07-07
  • MyBatis傳入集合 list 數(shù)組 map參數(shù)的寫(xiě)法

    MyBatis傳入集合 list 數(shù)組 map參數(shù)的寫(xiě)法

    這篇文章主要介紹了MyBatis傳入集合 list 數(shù)組 map參數(shù)的寫(xiě)法的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2016-06-06
  • 關(guān)于java方法區(qū)詳解

    關(guān)于java方法區(qū)詳解

    這篇文章主要介紹了關(guān)于java方法區(qū)的使用解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-09-09

最新評(píng)論