Seata?AT模式TM處理流程圖文示例詳解

TM的作用
我們根據(jù)源碼解讀畫出了下圖,該圖示展現(xiàn)了TM在整個(gè)Seata AT模式的分布式事務(wù)中所起的作用:

從上圖中可以看出,TM主要有兩個(gè)作用:
開啟分布式事務(wù),以拿到XID作為分布式事務(wù)開啟的標(biāo)識;一定是從TC拿到XID,不是從調(diào)用方傳遞過來的XID;
根據(jù)所有RM的處理結(jié)果來決定是提交分布式事務(wù)還是回滾分布式事務(wù);
轉(zhuǎn)換成偽代碼如下:
try{
// 開啟分布式事務(wù)
String xid = TM.beginGlobalTransaction();
// 執(zhí)行業(yè)務(wù)邏輯,包含遠(yuǎn)程rpc調(diào)用
RM1.execute(xid); -------RPC調(diào)用--------> RM2.execute(xid);
// 提交分布式事務(wù)
TM.commitGlobalTransaction(xid);
}catch(Exception e){
// 回滾分布式事務(wù)
TM.rollbackGlobalTransaction(xid);
}finally{
// 恢復(fù)現(xiàn)場
}
源碼分解
在之前講述圖解Seata AT模式啟動流程中,我們已經(jīng)知道了TM的處理流程是通過掃描注解@GlobalTransactional來完成對業(yè)務(wù)邏輯的攔截的。
主要完成這個(gè)攔截功能的類是io.seata.spring.annotation.GlobalTransactionalInterceptor,在這個(gè)類中,我們主要看invoke方法:
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
// 拿到被攔截的目標(biāo)類
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
// 獲取目標(biāo)方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
// 判斷這個(gè)方法是不是Object類中的toString()、equals()等方法
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
// 通過被攔截的方法找出對應(yīng)的注解GlobalTransactional和GlobalLock
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
// 判斷是否開啟分布式事務(wù),或者TM是否被降級處理,默認(rèn)是沒有被降級的
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
// 分布式事務(wù)可以正常使用
if (!localDisable) {
// 如果注解GlobalTransactional存在,那么直接把里面的配置解析成AspectTransactional
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
// 調(diào)用handleGlobalTransaction處理
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
// 調(diào)用handleGlobalLock處理
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
// 如果是Object類中的方法的話,直接調(diào)用,不作攔截
return methodInvocation.proceed();
}
以上代碼就做了下面幾件事情:
判斷攔截的方法是否是一個(gè)合理的方法,像Object類中的toString()、equals()等方法是不應(yīng)該被攔截的;
攔截的方法合理的話,那么要確認(rèn)是否允許開啟分布式事務(wù);
- 如果配置了
service.disableGlobalTransaction=true,那么說明不能開啟分布式事務(wù); - 另一個(gè)就是配置了允許TM降級
client.tm.degradeCheck=true(默認(rèn)是false),那么就會開啟定時(shí)任務(wù)不斷地與TC通信,如果建立通信失敗的次數(shù)超過了閾值client.tm.degradeCheckAllowTimes,那么就會觸發(fā)TM降級,此時(shí)無法開啟新的分布式事務(wù),降級前開啟的分布式事務(wù)沒有影響;
可以正常地準(zhǔn)備分布式事務(wù)了,那么開始收集注解的相關(guān)信息;
- 如果是GlobalTransactional注解,交給
handleGlobalTransaction()處理; - 如果是GlobalLock注解,交給
handleGlobalLock()處理;
需要注意的是,我們從源碼當(dāng)中了解到,原來TM還可以做一個(gè)降級的配置。降級后的TM是不會開啟新的分布式事務(wù)的,這個(gè)時(shí)候只能保證本地事務(wù)的正常進(jìn)行,只有當(dāng)TM與TC通信恢復(fù)后,降級后的TM會立馬恢復(fù),可以重新開啟新的分布式事務(wù)。
在TM降級期間的需要業(yè)務(wù)側(cè)自行處理因降級導(dǎo)致的數(shù)據(jù)臟寫和臟讀問題。
handleGlobalTransaction
處理被@GlobalTransactional標(biāo)注的業(yè)務(wù)邏輯
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
// 默認(rèn)succeed=true
boolean succeed = true;
try {
// 執(zhí)行分布式事務(wù)處理邏輯
// 詳細(xì)內(nèi)容后面介紹
return transactionalTemplate.execute(new TransactionalExecutor() {
// 執(zhí)行業(yè)務(wù)邏輯
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
// 分布式事務(wù)名稱,沒有指定的話,就用【方法名+參數(shù)類型】命名
public String name() {
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
// 分布式事務(wù)信息,其實(shí)就是@GlobalTransactional注解里面拿到的配置
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(aspectTransactional.getPropagation());
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// 發(fā)生異常
TransactionalExecutor.Code code = e.getCode();
switch (code) {
// 已經(jīng)回滾過了
case RollbackDone:
throw e.getOriginalException();
// 開啟分布式事務(wù)失敗
case BeginFailure:
// 分布式事務(wù)失敗
succeed = false;
// 調(diào)用失敗處理邏輯
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
// 分布式事務(wù)提交失敗
case CommitFailure:
// 分布式事務(wù)失敗
succeed = false;
// 調(diào)用失敗處理邏輯
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
// 回滾失敗
case RollbackFailure:
// 調(diào)用失敗處理邏輯
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
// 回滾重試
case RollbackRetrying:
// 調(diào)用失敗處理器中的回滾重試回調(diào)邏輯
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
// 啥也不是,直接拋異常
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
// 如果允許TM降級,那么這次處理完畢后,說明與TC恢復(fù)通信,可以解除降級
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
其實(shí)上面就一行代碼,使用的是模版模式,所以其實(shí)真正的重點(diǎn)還是應(yīng)該進(jìn)到模版里面去看看具體是怎么處理的。
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 拿到整理好的@GlobalTransactional注解里面的配置信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 獲取當(dāng)前的分布式事務(wù),如果為null的話,說明這是分布式事務(wù)的發(fā)起者;如果不為null,說明這是分布式事務(wù)的參與者
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 獲取分布式事務(wù)的傳播級別,其實(shí)就是按照spring的傳播級別來一套,區(qū)別就是spring事務(wù)是本地事務(wù),這是分布式事務(wù),原理都一樣
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
// 這個(gè)switch里面全都是處理分布式事務(wù)傳播級別的
switch (propagation) {
// 如果不支持分布式事務(wù),如果當(dāng)前存在事務(wù),那么先掛起當(dāng)前的分布式事務(wù),再執(zhí)行業(yè)務(wù)邏輯
case NOT_SUPPORTED:
// 分布式事務(wù)存在,先掛起
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// 執(zhí)行業(yè)務(wù)邏輯
return business.execute();
// 如果是每次都要創(chuàng)建一個(gè)新的分布式事務(wù),先把當(dāng)前存在的分布式事務(wù)掛起,然后創(chuàng)建一個(gè)新分布式事務(wù)
case REQUIRES_NEW:
// 如果分布式事務(wù)存在,先掛起當(dāng)前分布式事務(wù),再創(chuàng)建一個(gè)新的分布式事務(wù)
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// 之所以用break,是為了后面的代碼和其他的傳播級別一起共用,業(yè)務(wù)邏輯肯定還是要執(zhí)行的
break;
// 如果支持分布式事務(wù),如果當(dāng)前不存在分布式事務(wù),那么直接執(zhí)行業(yè)務(wù)邏輯,否則以分布式事務(wù)的方式執(zhí)行業(yè)務(wù)邏輯
case SUPPORTS:
// 如果不存在分布式事務(wù),直接執(zhí)行業(yè)務(wù)邏輯
if (notExistingTransaction(tx)) {
return business.execute();
}
// 否則,以分布式事務(wù)的方式執(zhí)行業(yè)務(wù)邏輯
break;
// 如果有分布式事務(wù),就在當(dāng)前分布式事務(wù)下執(zhí)行業(yè)務(wù)邏輯,否則創(chuàng)建一個(gè)新的分布式事務(wù)執(zhí)行業(yè)務(wù)邏輯
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
// 如果不允許有分布式事務(wù),那么一旦發(fā)現(xiàn)存在分布式事務(wù),直接拋異常;只有不存在分布式事務(wù)的時(shí)候才正常執(zhí)行
case NEVER:
// 存在分布式事務(wù),拋異常
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// 不存在分布式事務(wù),執(zhí)行業(yè)務(wù)邏輯
return business.execute();
}
// 一定要有分布式事務(wù),分布式事務(wù)不存在的話,拋異常;
case MANDATORY:
// 不存在分布式事務(wù),拋異常
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 上面的傳播級別的邏輯處理完畢,下面就是公共的處理邏輯
// 1.3 如果當(dāng)前分布式事務(wù)沒有的話,那么我們就要創(chuàng)建新的分布式事務(wù),此時(shí)我們就是分布式事務(wù)的發(fā)起者,也就是TM本身,否則不能稱之為`TM`
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// 開始準(zhǔn)備干活的條件
// 把我們這個(gè)方法的全局鎖配置放進(jìn)當(dāng)前線程中,并且把線程中已有的全局鎖的配置取出來
// 我們在干完自己的活后,會把這個(gè)取出來的配置放回去的
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. 如果我們是分布式事務(wù)的發(fā)起者的話,那么我們會和TC通信,并且拿到一個(gè)XID;如果我們不是分布式事務(wù)的發(fā)起者的話,那么這一步啥也不干
// 這個(gè)XID可以從RootContext中獲取
beginTransaction(txInfo, tx);
Object rs;
try {
// 執(zhí)行業(yè)務(wù)邏輯
rs = business.execute();
} catch (Throwable ex) {
// 3. 發(fā)生任何異常,我們準(zhǔn)備啟動回滾機(jī)制
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. 一切順利,通知提交分布式事務(wù)
commitTransaction(tx);
return rs;
} finally {
//5. 恢復(fù)現(xiàn)場,把之前的配置放回去
resumeGlobalLockConfig(previousConfig);
// 觸發(fā)回調(diào)
triggerAfterCompletion();
// 清理工作
cleanUp();
}
} finally {
// 恢復(fù)之前掛起的事務(wù)
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
根據(jù)上面的源碼分析,execute方法做了以下幾件事情:
處理分布式事務(wù)的傳播級別,參照spring的事務(wù)傳播級別;
如果是分布式事務(wù)的發(fā)起者,那么需要與TC通信,并獲取XID開啟分布式事務(wù);
如果業(yè)務(wù)邏輯處理出現(xiàn)異常,說明分布式事務(wù)需要準(zhǔn)備回滾;如果沒有任何異常,那么準(zhǔn)備發(fā)起分布式事務(wù)提交
分布式事務(wù)處理完畢后,準(zhǔn)備恢復(fù)現(xiàn)場
分布式事務(wù)開啟:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 回調(diào),默認(rèn)是空回調(diào)
triggerBeforeBegin();
// 發(fā)起分布式事務(wù)
tx.begin(txInfo.getTimeOut(), txInfo.getName());
// 回調(diào),默認(rèn)是空回調(diào)
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
@Override
public void begin(int timeout, String name) throws TransactionException {
// 如果不是分布式事務(wù)發(fā)起者,那么啥也不做
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
// 如果當(dāng)前已經(jīng)處于分布式事務(wù)當(dāng)中,那么拋異常,因?yàn)槭聞?wù)發(fā)起者不可能事先處于別的分布式事務(wù)當(dāng)中
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// 發(fā)起分布式事務(wù)
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
// 把xid綁定到當(dāng)前線程中
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 發(fā)起分布式事務(wù)開啟的請求
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
// 獲取拿到的xid,表示分布式事務(wù)開啟成功
return response.getXid();
}
1.分布式事務(wù)的發(fā)起其實(shí)就是TM向TC請求,獲取XID,并把XID綁定到當(dāng)前線程中
異常回滾:
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
// 如果異常類型和指定的類型一致,那么發(fā)起回滾;不一致還是要提交分布式事務(wù)
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
// 回滾分布式事務(wù)
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// 回滾失敗拋異常
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
// 不是指定的異常類型,還是繼續(xù)提交分布式事務(wù)
commitTransaction(tx);
}
}
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
// 執(zhí)行回調(diào),默認(rèn)空回調(diào)
triggerBeforeRollback();
// 回滾
tx.rollback();
// 執(zhí)行回調(diào),默認(rèn)空回調(diào)
triggerAfterRollback();
// 就算回滾沒問題,照樣拋異常,目的應(yīng)該是告知開發(fā)人員此處產(chǎn)生了回滾
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
@Override
public void rollback() throws TransactionException {
// 如果是分布式事務(wù)參與者,那么啥也不做,RM的回滾不在這里,這是TM的回滾
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of rollback
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
// 下面就是一個(gè)循環(huán)重試發(fā)起分布式事務(wù)回滾
int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
// 發(fā)起回滾的核心代碼
status = transactionManager.rollback(xid);
// 回滾成功跳出循環(huán)
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
// 重試失敗次數(shù)完成才會跳出循環(huán)
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);
}
}
}
} finally {
// 如果回滾的分布式事務(wù)就是當(dāng)前的分布式事務(wù),那么從當(dāng)前線程中解綁XID
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);
}
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
// 準(zhǔn)備發(fā)起請求給TC,回滾指定的分布式事務(wù)
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
分布式事務(wù)回滾邏輯中有以下幾個(gè)點(diǎn):
觸發(fā)回滾需要產(chǎn)生的異常和注解中指定的異常一致才會發(fā)起回滾,否則還是繼續(xù)提交;
回滾是可以設(shè)置重試次數(shù)的,只有重試都失敗了,才會導(dǎo)致回滾失敗,否則只要有一次成功,那么回滾就成功;
TM發(fā)起的回滾其實(shí)只是和TC發(fā)起一次分布式事務(wù)回滾的通信,并沒有數(shù)據(jù)庫的操作;
分布式事務(wù)提交:
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 回調(diào),默認(rèn)空回調(diào)
triggerBeforeCommit();
// 分布式事務(wù)提交
tx.commit();
// 回調(diào),默認(rèn)空回調(diào)
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 提交出異常,提交失敗
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}
@Override
public void commit() throws TransactionException {
// 如果只是分布式事務(wù)參與者,那么啥也不干,TM只能有一個(gè),哈哈
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
// 分布式事務(wù)提交也是有重試的
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
// 發(fā)起分布式事務(wù)提交
status = transactionManager.commit(xid);
// 提交成功跳出循環(huán)
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
// 重試結(jié)束,依然失敗就拋異常
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
// 如果提交的分布式事務(wù)就是當(dāng)前事務(wù),那么需要清理當(dāng)前線程中的XID
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 發(fā)起分布式事務(wù)提交請求,這是與TC通信
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
分布式事務(wù)回滾也是可以設(shè)置重試次數(shù)的;
分布式事務(wù)提交其實(shí)也是TM與TC進(jìn)行通信,告知TC這個(gè)XID對應(yīng)的分布式事務(wù)可以提交了;
handleGlobalLock
private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
// 模版模式實(shí)現(xiàn)全局鎖
return globalLockTemplate.execute(new GlobalLockExecutor() {
// 執(zhí)行業(yè)務(wù)邏輯
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
// 獲取全局鎖配置
// 一個(gè)是全局鎖重試間隔時(shí)間
// 一個(gè)是全局鎖重試次數(shù)
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
}
public Object execute(GlobalLockExecutor executor) throws Throwable {
// 判斷當(dāng)前是否有全局鎖
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
// 如果沒有全局鎖,那么在當(dāng)前線程中設(shè)置需要全局鎖標(biāo)識
if (!alreadyInGlobalLock) {
RootContext.bindGlobalLockFlag();
}
// 把全局鎖的配置設(shè)置進(jìn)當(dāng)前線程,并把線程中已有的全局鎖配置拿出來,后面恢復(fù)現(xiàn)場需要用
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
// 執(zhí)行業(yè)務(wù)邏輯
return executor.execute();
} finally {
// 清除線程中的全局鎖標(biāo)記
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
// 恢復(fù)現(xiàn)場
if (previousConfig != null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
GlobalLockConfigHolder.remove();
}
}
}
其實(shí)真正的全局鎖邏輯并不在TM當(dāng)中,TM只是負(fù)責(zé)根據(jù)@GlobalLock注解把相應(yīng)的全局鎖標(biāo)記綁定到線程中,真正負(fù)責(zé)處理全局鎖的還是底層的RM;
小結(jié)
至此我們已經(jīng)把TM的所有工作都解讀完畢了,下面來做一個(gè)小結(jié):
1.TM主要針對兩個(gè)注解GlobalTransactional和GlobalLock來實(shí)現(xiàn)處理邏輯,原理都是基于Aop和反射;處理邏輯里面涉及到TM降級的一個(gè)情況,這是一個(gè)值得注意的點(diǎn)
2.處理GlobalTransactional主要分兩步:
- 開啟分布式事務(wù),需要與TC交互,存在rpc開銷;
- 根據(jù)RM的處理情況決定是提交分布式事務(wù)還是回滾分布式事務(wù),也是需要與TC交互,存在rpc開銷;在提交或回滾分布式事務(wù)中,還可以設(shè)置重試次數(shù);
3.處理GlobalLock,主要就是在當(dāng)前線程中設(shè)置一個(gè)需要檢查全局鎖的標(biāo)記,讓底層的RM去做全局鎖的檢測動作;
以上就是Seata AT模式TM處理流程圖文示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Seata AT模式TM處理流程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java OpenSSL生成的RSA公私鑰進(jìn)行數(shù)據(jù)加解密詳細(xì)介紹
這篇文章主要介紹了Java OpenSSL生成的RSA公私鑰進(jìn)行數(shù)據(jù)加解密詳細(xì)介紹的相關(guān)資料,這里提供實(shí)例代碼及說明具體如何實(shí)現(xiàn),需要的朋友可以參考下2016-12-12
springboot2.X整合prometheus監(jiān)控的實(shí)例講解
這篇文章主要介紹了springboot2.X整合prometheus監(jiān)控的實(shí)例講解,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03
Java 如何快速,優(yōu)雅的實(shí)現(xiàn)導(dǎo)出Excel
這篇文章主要介紹了Java 如何快速,優(yōu)雅的實(shí)現(xiàn)導(dǎo)出Excel,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下2021-03-03
基于java file 文件操作operate file of java的應(yīng)用
本篇文章介紹了,基于java file 文件操作operate file of java的應(yīng)用。需要的朋友參考下2013-05-05
詳解Mybatis極其(最)簡(好)單(用)的一個(gè)分頁插件
這篇文章主要介紹了詳解Mybatis極其(最)簡(好)單(用)的一個(gè)分頁插件,非常具有實(shí)用價(jià)值,需要的朋友可以參考下。2016-12-12
淺談java switch如果case后面沒有break,會出現(xiàn)什么情況?
這篇文章主要介紹了淺談java switch如果case后面沒有break,會出現(xiàn)什么情況?具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨想小編過來看看吧2020-09-09
springboot 項(xiàng)目使用jasypt加密數(shù)據(jù)源的方法
Jasypt 是一個(gè) Java 庫,它允許開發(fā)者以最小的努力為他/她的項(xiàng)目添加基本的加密功能,而且不需要對密碼學(xué)的工作原理有深刻的了解。接下來通過本文給大家介紹springboot 項(xiàng)目使用jasypt加密數(shù)據(jù)源的問題,一起看看吧2021-11-11

