詳解全局事務注解@GlobalTransactional的識別
一、聲明式全局事務
在Seata示例工程中,能看到@GlobalTransactional,如下方法示例:
@GlobalTransactional
public boolean purchase(long accountId, long stockId, long quantity) {
String xid = RootContext.getXID();
LOGGER.info("New Transaction Begins: " + xid);
boolean stockResult = reduceAccount(accountId,stockId, quantity);
if (!stockResult) {
throw new RuntimeException("賬號服務調(diào)用失敗,事務回滾!");
}
Long orderId = createOrder(accountId, stockId, quantity);
if (orderId == null || orderId <= 0) {
throw new RuntimeException("訂單服務調(diào)用失敗,事務回滾!");
}
return true;
}
purchase方法上加上此注解,即表示此方法內(nèi)的reduceAccount和createOrder兩個微服務調(diào)用也將加入到分布式事務中,即扣除賬戶余額與創(chuàng)建訂單將具有分布式事務的數(shù)據(jù)一致性保障能力。
了解 Spring 注解事務實現(xiàn)的話,應該也能推測出,Seata 的事務能力也可能是基于 Spring 的 AOP 機制,給標注了@GlobalTransactional 的方法做 AOP 增加,織入額外的邏輯以完成分布式事務的能力,偽代碼大致如下:
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
tx.begin(xxx);
...
purchase(xxx)//給purchase增加全局事務處理能力
...
tx.commit();
} catch (Exception exx) {
tx.rollback();
throw exx;
}
二、@GlobalTransactional 注解如何被識別?
2.1 運行環(huán)境
1)引入seata-spring-boot-starter模塊
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
</dependency>
在spring.factories 中有自動裝配類SeataAutoConfiguration
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ ... io.seata.spring.boot.autoconfigure.SeataAutoConfiguration ...
此類負責處理全局事務掃描及設置,其中就有@GlobalTransactional。
2)條件要求:
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration
從類的自動裝備條件,可看出要滿足 2 個條件:
@ConditionalOnProperty,表明若要生效須具備以下配置條件:
seata.enabled = true
@AutoConfigureAfter,表示從 bean 的加載順序來看,要求是在SeataCoreAutoConfiguration之后,SeataCoreAutoConfiguration又是什么呢?
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@Configuration(proxyBeanMethods = false)
public class SeataCoreAutoConfiguration {
@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();
}
}
從源碼可知,也很清晰,大概有幾個功能點:
- 同樣要求
seata.enabled = true - 另外會掃描"io.seata.spring.boot.autoconfigure.properties"
- 會提供一個
SpringApplicationContextProvider,基于 Spring 的程序中,很常見這種方式,用于提供ApplicationContext,通常用于方便獲取 bean 和添加 bean 等。
public class SpringApplicationContextProvider implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
System.setProperty("file.listener.enabled", "false");
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT, applicationContext);
}
}
2.2 功能-注入事務執(zhí)行失敗處理器FailureHandler
注入一個FailureHandler,其默認實現(xiàn)DefaultFailureHandlerImpl中只會打印錯誤日志,建議重寫,異常發(fā)生時及時告知使用者。
2.3 功能-構(gòu)建全局事務掃描器GlobalTransactionScanner
構(gòu)建全局事務掃描器GlobalTransactionScanner,注入到容器中,其內(nèi)部做 2 件事情
- 會初始化
TM、RM客戶端 - 掃描
Bean,對添加了全局事務注解的類(@GlobalTransactional、@GlobalLock、@TwoPhaseBusinessAction)生成代理對象,做AOP增強,添加對應的攔截器(攔截器內(nèi)補充分布式事務的能力)
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
Spring 中 Bean 的關(guān)鍵初始化過程:
實例化 -> 屬性注入 -> postProcessBeforeInitialization -> afterPropertiesSet/init 方法 -> postProcessAfterInitialization
從以下堆??梢钥闯觯?code>AbstractAutoProxyCreator在 bean 初始化完成之后創(chuàng)建它的代理 AOP 代理,通過wrapIfNecessary判斷是否該 bean 是否存在全局事務注解,如果有則需要增強,添加相應的攔截器。
wrapIfNecessary:269, GlobalTransactionScanner (io.seata.spring.annotation) postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
先看一個關(guān)鍵方法existsAnnotation,當掃描到 bean 之后,獲取 bean 的原始類型,然后通過此方法原始類型的類或者方法中是否有@GlobalTransactional或@GlobalLock注解
private boolean existsAnnotation(Class<?>[] classes) {
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
if (clazz == null) {
continue;
}
//判斷類上是否有注解@GlobalTransactional
GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
//判斷方法上是否有注解@GlobalTransactional
trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
//判斷方法上是否有注解@GlobalLock
GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) {
return true;
}
}
}
}
return false;
}
wrapIfNecessary方法中實現(xiàn)了事務注解識別的核心邏輯:
/**
* The following will be scanned, and added corresponding interceptor:
*
* TM:
* @see io.seata.spring.annotation.GlobalTransactional // TM annotation
* Corresponding interceptor:
* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler
*
* GlobalLock:
* @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation
* Corresponding interceptor:
* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler
*
* TCC mode:
* @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface
* @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method
* @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser
* Corresponding interceptor:
* @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode
*/
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
// seata提供的有擴展邏輯用于輔助判斷是否需要增強
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
synchronized (PROXYED_SET) {
//如果已被代理,則跳過該Bean ,PROXYED_SET是一個Set<String> 集合
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
//判斷是否TCC模式,如果是TCC,則添加TCC 攔截器
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
//非TCC模式
// 查詢Bean的 Class 類型
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判斷是否有GlobalTransactional或者GlobalLock注解,如果沒有就不會代理,直接返回bean
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
// 初始化GlobalTransactionalInterceptor
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
//添加 禁用全局事務的監(jiān)聽器
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
// 判斷是否已經(jīng)是AOP代理類,如果不是,則執(zhí)行父類的wrapIfNecessary
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 給代理對象添加攔截器
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
//標識該bean已經(jīng)代理過,本方法入口處有判斷
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
Seata 提供的有擴展邏輯用于輔助判斷是否需要增強,這里的擴展點有好幾個,這些擴展是用于安全保障,使用者可以按需采用。

三、小結(jié):
本篇梳理了引入seata-spring-boot-starter模塊后,其內(nèi)部會通過的自動裝配機制會在SeataAutoConfiguration類中,掃描具有@GlobalTransactional全局事務注解的類和方法的 bean,并對這類 bean 添加GlobalTransactionalInterceptor,進行 AOP 增強,加入分布式事務的能力,增強后的功能復雜,下篇繼續(xù),更多關(guān)于全局事務注解@GlobalTransactional的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
為什么wait和notify必須放在synchronized中使用
這篇文章主要介紹了為什么wait和notify必須放在synchronized中使用,文章圍繞主題的相關(guān)問題展開詳細介紹,具有一定的參考價值,需要的小伙伴可以參考以參考一下2022-05-05
jdk中keytool的使用以及如何提取jks文件中的公鑰和私鑰
JKS文件由公鑰和密鑰構(gòu)成利用Java?Keytool工具生成的文件,它是由公鑰和密鑰構(gòu)成的,下面這篇文章主要給大家介紹了關(guān)于jdk中keytool的使用以及如何提取jks文件中公鑰和私鑰的相關(guān)資料,需要的朋友可以參考下2024-03-03
使用多個servlet時Spring security需要指明路由匹配策略問題
這篇文章主要介紹了使用多個servlet時Spring security需要指明路由匹配策略問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08
Java使用Maven BOM統(tǒng)一管理版本號的實現(xiàn)
這篇文章主要介紹了Java使用Maven BOM統(tǒng)一管理版本號的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04

