欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Seata?AT模式TM處理流程圖文示例詳解

 更新時間:2022年09月30日 16:36:06   作者:夢想實現(xiàn)家_Z  
這篇文章主要為大家介紹了Seata?AT模式TM處理流程圖文示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

TM的作用

我們根據(jù)源碼解讀畫出了下圖,該圖示展現(xiàn)了TM在整個Seata AT模式的分布式事務中所起的作用:

從上圖中可以看出,TM主要有兩個作用:

開啟分布式事務,以拿到XID作為分布式事務開啟的標識;一定是從TC拿到XID,不是從調(diào)用方傳遞過來的XID;

根據(jù)所有RM的處理結(jié)果來決定是提交分布式事務還是回滾分布式事務;

轉(zhuǎn)換成偽代碼如下:

try{
  // 開啟分布式事務
  String xid = TM.beginGlobalTransaction();
  // 執(zhí)行業(yè)務邏輯,包含遠程rpc調(diào)用
  RM1.execute(xid); -------RPC調(diào)用--------> RM2.execute(xid);
  // 提交分布式事務
  TM.commitGlobalTransaction(xid);
}catch(Exception e){
  // 回滾分布式事務
  TM.rollbackGlobalTransaction(xid);
}finally{
  // 恢復現(xiàn)場
}

源碼分解

在之前講述圖解Seata AT模式啟動流程中,我們已經(jīng)知道了TM的處理流程是通過掃描注解@GlobalTransactional來完成對業(yè)務邏輯的攔截的。

主要完成這個攔截功能的類是io.seata.spring.annotation.GlobalTransactionalInterceptor,在這個類中,我們主要看invoke方法:

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        // 拿到被攔截的目標類
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        // 獲取目標方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        // 判斷這個方法是不是Object類中的toString()、equals()等方法
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            // 通過被攔截的方法找出對應的注解GlobalTransactional和GlobalLock
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            // 判斷是否開啟分布式事務,或者TM是否被降級處理,默認是沒有被降級的
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            // 分布式事務可以正常使用
            if (!localDisable) {
                // 如果注解GlobalTransactional存在,那么直接把里面的配置解析成AspectTransactional
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    // 調(diào)用handleGlobalTransaction處理
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    // 調(diào)用handleGlobalLock處理
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        // 如果是Object類中的方法的話,直接調(diào)用,不作攔截
        return methodInvocation.proceed();
    }

以上代碼就做了下面幾件事情:

判斷攔截的方法是否是一個合理的方法,像Object類中的toString()、equals()等方法是不應該被攔截的;

攔截的方法合理的話,那么要確認是否允許開啟分布式事務;

  • 如果配置了service.disableGlobalTransaction=true,那么說明不能開啟分布式事務;
  • 另一個就是配置了允許TM降級client.tm.degradeCheck=true(默認是false),那么就會開啟定時任務不斷地與TC通信,如果建立通信失敗的次數(shù)超過了閾值client.tm.degradeCheckAllowTimes,那么就會觸發(fā)TM降級,此時無法開啟新的分布式事務,降級前開啟的分布式事務沒有影響;

可以正常地準備分布式事務了,那么開始收集注解的相關信息;

  • 如果是GlobalTransactional注解,交給handleGlobalTransaction()處理;
  • 如果是GlobalLock注解,交給handleGlobalLock()處理;

需要注意的是,我們從源碼當中了解到,原來TM還可以做一個降級的配置。降級后的TM是不會開啟新的分布式事務的,這個時候只能保證本地事務的正常進行,只有當TM與TC通信恢復后,降級后的TM會立馬恢復,可以重新開啟新的分布式事務。

在TM降級期間的需要業(yè)務側(cè)自行處理因降級導致的數(shù)據(jù)臟寫和臟讀問題。

handleGlobalTransaction

處理被@GlobalTransactional標注的業(yè)務邏輯

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable {
        // 默認succeed=true
        boolean succeed = true;
        try {
            // 執(zhí)行分布式事務處理邏輯
            // 詳細內(nèi)容后面介紹
            return transactionalTemplate.execute(new TransactionalExecutor() {
                // 執(zhí)行業(yè)務邏輯
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
                // 分布式事務名稱,沒有指定的話,就用【方法名+參數(shù)類型】命名
                public String name() {
                    String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
                // 分布式事務信息,其實就是@GlobalTransactional注解里面拿到的配置
                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            // 發(fā)生異常
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                // 已經(jīng)回滾過了
                case RollbackDone:
                    throw e.getOriginalException();
                // 開啟分布式事務失敗
                case BeginFailure:
                    // 分布式事務失敗
                    succeed = false;
                    // 調(diào)用失敗處理邏輯
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                // 分布式事務提交失敗
                case CommitFailure:
                    // 分布式事務失敗
                    succeed = false;
                    // 調(diào)用失敗處理邏輯
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                // 回滾失敗
                case RollbackFailure:
                    // 調(diào)用失敗處理邏輯
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                // 回滾重試
                case RollbackRetrying:
                    // 調(diào)用失敗處理器中的回滾重試回調(diào)邏輯
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                // 啥也不是,直接拋異常
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            // 如果允許TM降級,那么這次處理完畢后,說明與TC恢復通信,可以解除降級
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

其實上面就一行代碼,使用的是模版模式,所以其實真正的重點還是應該進到模版里面去看看具體是怎么處理的。

public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. 拿到整理好的@GlobalTransactional注解里面的配置信息
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 獲取當前的分布式事務,如果為null的話,說明這是分布式事務的發(fā)起者;如果不為null,說明這是分布式事務的參與者
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();
        // 1.2 獲取分布式事務的傳播級別,其實就是按照spring的傳播級別來一套,區(qū)別就是spring事務是本地事務,這是分布式事務,原理都一樣
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            // 這個switch里面全都是處理分布式事務傳播級別的
            switch (propagation) {
                // 如果不支持分布式事務,如果當前存在事務,那么先掛起當前的分布式事務,再執(zhí)行業(yè)務邏輯
                case NOT_SUPPORTED:
                    // 分布式事務存在,先掛起
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // 執(zhí)行業(yè)務邏輯
                    return business.execute();
                // 如果是每次都要創(chuàng)建一個新的分布式事務,先把當前存在的分布式事務掛起,然后創(chuàng)建一個新分布式事務
                case REQUIRES_NEW:
                    // 如果分布式事務存在,先掛起當前分布式事務,再創(chuàng)建一個新的分布式事務
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // 之所以用break,是為了后面的代碼和其他的傳播級別一起共用,業(yè)務邏輯肯定還是要執(zhí)行的
                    break;
                // 如果支持分布式事務,如果當前不存在分布式事務,那么直接執(zhí)行業(yè)務邏輯,否則以分布式事務的方式執(zhí)行業(yè)務邏輯
                case SUPPORTS:
                    // 如果不存在分布式事務,直接執(zhí)行業(yè)務邏輯
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // 否則,以分布式事務的方式執(zhí)行業(yè)務邏輯
                    break;
                // 如果有分布式事務,就在當前分布式事務下執(zhí)行業(yè)務邏輯,否則創(chuàng)建一個新的分布式事務執(zhí)行業(yè)務邏輯
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                // 如果不允許有分布式事務,那么一旦發(fā)現(xiàn)存在分布式事務,直接拋異常;只有不存在分布式事務的時候才正常執(zhí)行
                case NEVER:
                    // 存在分布式事務,拋異常
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // 不存在分布式事務,執(zhí)行業(yè)務邏輯
                        return business.execute();
                    }
                // 一定要有分布式事務,分布式事務不存在的話,拋異常;
                case MANDATORY:
                    // 不存在分布式事務,拋異常
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }
            // 上面的傳播級別的邏輯處理完畢,下面就是公共的處理邏輯
            // 1.3 如果當前分布式事務沒有的話,那么我們就要創(chuàng)建新的分布式事務,此時我們就是分布式事務的發(fā)起者,也就是TM本身,否則不能稱之為`TM`
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }
            // 開始準備干活的條件
            // 把我們這個方法的全局鎖配置放進當前線程中,并且把線程中已有的全局鎖的配置取出來
            // 我們在干完自己的活后,會把這個取出來的配置放回去的
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
            try {
                // 2. 如果我們是分布式事務的發(fā)起者的話,那么我們會和TC通信,并且拿到一個XID;如果我們不是分布式事務的發(fā)起者的話,那么這一步啥也不干
                // 這個XID可以從RootContext中獲取
                beginTransaction(txInfo, tx);
                Object rs;
                try {
                    // 執(zhí)行業(yè)務邏輯
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. 發(fā)生任何異常,我們準備啟動回滾機制
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                // 4. 一切順利,通知提交分布式事務
                commitTransaction(tx);
                return rs;
            } finally {
                //5. 恢復現(xiàn)場,把之前的配置放回去
                resumeGlobalLockConfig(previousConfig);
                // 觸發(fā)回調(diào)
                triggerAfterCompletion();
                // 清理工作
                cleanUp();
            }
        } finally {
            // 恢復之前掛起的事務
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

根據(jù)上面的源碼分析,execute方法做了以下幾件事情:

處理分布式事務的傳播級別,參照spring的事務傳播級別;

如果是分布式事務的發(fā)起者,那么需要與TC通信,并獲取XID開啟分布式事務;

如果業(yè)務邏輯處理出現(xiàn)異常,說明分布式事務需要準備回滾;如果沒有任何異常,那么準備發(fā)起分布式事務提交

分布式事務處理完畢后,準備恢復現(xiàn)場

分布式事務開啟:

 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 回調(diào),默認是空回調(diào)
            triggerBeforeBegin();
            // 發(fā)起分布式事務
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            // 回調(diào),默認是空回調(diào)
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
        }
    }
@Override
    public void begin(int timeout, String name) throws TransactionException {
        // 如果不是分布式事務發(fā)起者,那么啥也不做
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        // 如果當前已經(jīng)處于分布式事務當中,那么拋異常,因為事務發(fā)起者不可能事先處于別的分布式事務當中
        String currentXid = RootContext.getXID();
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
        // 發(fā)起分布式事務
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        // 把xid綁定到當前線程中
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // 發(fā)起分布式事務開啟的請求
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        // 獲取拿到的xid,表示分布式事務開啟成功
        return response.getXid();
    }

1.分布式事務的發(fā)起其實就是TM向TC請求,獲取XID,并把XID綁定到當前線程中

異?;貪L:

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        // 如果異常類型和指定的類型一致,那么發(fā)起回滾;不一致還是要提交分布式事務
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                // 回滾分布式事務
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // 回滾失敗拋異常
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            // 不是指定的異常類型,還是繼續(xù)提交分布式事務
            commitTransaction(tx);
        }
    }
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        // 執(zhí)行回調(diào),默認空回調(diào)
        triggerBeforeRollback();
        // 回滾
        tx.rollback();
        // 執(zhí)行回調(diào),默認空回調(diào)
        triggerAfterRollback();
        // 就算回滾沒問題,照樣拋異常,目的應該是告知開發(fā)人員此處產(chǎn)生了回滾
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }
@Override
    public void rollback() throws TransactionException {
        // 如果是分布式事務參與者,那么啥也不做,RM的回滾不在這里,這是TM的回滾
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 下面就是一個循環(huán)重試發(fā)起分布式事務回滾
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 發(fā)起回滾的核心代碼
                    status = transactionManager.rollback(xid);
                    // 回滾成功跳出循環(huán)
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重試失敗次數(shù)完成才會跳出循環(huán)
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            // 如果回滾的分布式事務就是當前的分布式事務,那么從當前線程中解綁XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        // 準備發(fā)起請求給TC,回滾指定的分布式事務
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

分布式事務回滾邏輯中有以下幾個點:

觸發(fā)回滾需要產(chǎn)生的異常和注解中指定的異常一致才會發(fā)起回滾,否則還是繼續(xù)提交;

回滾是可以設置重試次數(shù)的,只有重試都失敗了,才會導致回滾失敗,否則只要有一次成功,那么回滾就成功;

TM發(fā)起的回滾其實只是和TC發(fā)起一次分布式事務回滾的通信,并沒有數(shù)據(jù)庫的操作;

分布式事務提交:

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 回調(diào),默認空回調(diào)
            triggerBeforeCommit();
            // 分布式事務提交
            tx.commit();
            // 回調(diào),默認空回調(diào)
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 提交出異常,提交失敗
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }
@Override
    public void commit() throws TransactionException {
        // 如果只是分布式事務參與者,那么啥也不干,TM只能有一個,哈哈
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 分布式事務提交也是有重試的
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 發(fā)起分布式事務提交
                    status = transactionManager.commit(xid);
                    // 提交成功跳出循環(huán)
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重試結(jié)束,依然失敗就拋異常
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            // 如果提交的分布式事務就是當前事務,那么需要清理當前線程中的XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        // 發(fā)起分布式事務提交請求,這是與TC通信
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

分布式事務回滾也是可以設置重試次數(shù)的;

分布式事務提交其實也是TM與TC進行通信,告知TC這個XID對應的分布式事務可以提交了;

handleGlobalLock

private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
        // 模版模式實現(xiàn)全局鎖
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            // 執(zhí)行業(yè)務邏輯
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
            // 獲取全局鎖配置
            // 一個是全局鎖重試間隔時間
            // 一個是全局鎖重試次數(shù)
            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });
    }
public Object execute(GlobalLockExecutor executor) throws Throwable {
        // 判斷當前是否有全局鎖
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        // 如果沒有全局鎖,那么在當前線程中設置需要全局鎖標識
        if (!alreadyInGlobalLock) {
            RootContext.bindGlobalLockFlag();
        }
        // 把全局鎖的配置設置進當前線程,并把線程中已有的全局鎖配置拿出來,后面恢復現(xiàn)場需要用
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
        try {
            // 執(zhí)行業(yè)務邏輯
            return executor.execute();
        } finally {
            // 清除線程中的全局鎖標記
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }
            // 恢復現(xiàn)場
            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }

其實真正的全局鎖邏輯并不在TM當中,TM只是負責根據(jù)@GlobalLock注解把相應的全局鎖標記綁定到線程中,真正負責處理全局鎖的還是底層的RM;

小結(jié)

至此我們已經(jīng)把TM的所有工作都解讀完畢了,下面來做一個小結(jié):

1.TM主要針對兩個注解GlobalTransactional和GlobalLock來實現(xiàn)處理邏輯,原理都是基于Aop和反射;處理邏輯里面涉及到TM降級的一個情況,這是一個值得注意的點

2.處理GlobalTransactional主要分兩步:

  • 開啟分布式事務,需要與TC交互,存在rpc開銷;
  • 根據(jù)RM的處理情況決定是提交分布式事務還是回滾分布式事務,也是需要與TC交互,存在rpc開銷;在提交或回滾分布式事務中,還可以設置重試次數(shù);

3.處理GlobalLock,主要就是在當前線程中設置一個需要檢查全局鎖的標記,讓底層的RM去做全局鎖的檢測動作;

以上就是Seata AT模式TM處理流程圖文示例詳解的詳細內(nèi)容,更多關于Seata AT模式TM處理流程的資料請關注腳本之家其它相關文章!

相關文章

最新評論