Seata?AT模式啟動過程圖文示例詳解

背景
為了了解Seata AT模式的原理,我通過源碼解讀的方式畫出了Seata AT模式啟動的圖示:

如果是基于Springboot項目的話,項目啟動的使用,一般約定會先查看spring.factories文件,配置了哪些類是需要自動裝配的。Seata也是利用了這個約定,在項目啟動的時候,默認會裝配指定的類,以完成Seata相關組件的初始化。
下面我們來一起根據源碼解讀Seata AT模式啟動流程。
由上圖可知,Seata AT模式可大概分成以下三部分:
1.與底層數據庫打交道的DataSource,這部分功能處理交給了SeataDataSourceAutoConfiguration。
2.處理@GlobalTransactional注解,實現分布式事務管理功能,這部分交給SeataAutoConfiguration處理。
3.分支事務獲取、銷毀全局事務XID,這部分功能交給HttpAutoConfiguration。
SeataDataSourceAutoConfiguration
首先,我們來看看Seata是如何處理DataSource的。
// 依賴DataSource
@ConditionalOnBean(DataSource.class)
// 三個配置都要為true
@ConditionalOnExpression("${seata.enabled:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataDataSourceAutoConfiguration {
/**
* The bean seataAutoDataSourceProxyCreator.
*/
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
// 可替換
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
}
1.@ConditionalOnBean(DataSource.class)意味著我們的項目中一定要有DataSource這個Bean。
2.@ConditionalOnExpression里面表示要滿足以下三個條件才會創(chuàng)建SeataDataSourceAutoConfiguration:
seata.enabled=true
seata.enableAutoDataSourceProxy=true
seata.enable-auto-data-source-proxy=true
3.@AutoConfigureAfter表示當前Bean創(chuàng)建一定在指定的SeataCoreAutoConfiguration之后。
根據以上分析,我們在引入Seata AT模式的時候,一定要先創(chuàng)建項目的DataSource Bean對象,其次保證相關的配置滿足要求,那么才能正確地保證DataSource被Seata代理。
下面繼續(xù)看SeataAutoDataSourceProxyCreator的創(chuàng)建:
@ConditionalOnMissingBean表示這個Bean的創(chuàng)建其實是可以開發(fā)人員自定義的,如果開發(fā)人員沒有自定義,那么就由Seata自己創(chuàng)建。
在SeataAutoDataSourceProxyCreator類中,它繼承了AbstractAutoProxyCreator,也就是AOP功能的標準實現。這個時候,我們主要關注wrapIfNecessary方法的實現:
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 不是DataSource對象不代理
if (!(bean instanceof DataSource)) {
return bean;
}
// 如果是DataSource對象,但是不是SeataDataSourceProxy對象
if (!(bean instanceof SeataDataSourceProxy)) {
// 先調用父類包裝一層
Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
// 如果代理后的對象和代理前的對象是同一個對象
// 說明要么這個對象之前已經被代理過
// 要么SeataDataSourceProxy被開發(fā)人員excluded
if (bean == enhancer) {
return bean;
}
// 如果是正常的DataSource對象的話,那么就會被自動構建成SeataDataSourceProxy,并返回
DataSource origin = (DataSource) bean;
SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
DataSourceProxyHolder.put(origin, proxy);
return enhancer;
}
/*
* things get dangerous when you try to register SeataDataSourceProxy bean by yourself!
* if you insist on doing so, you must make sure your method return type is DataSource,
* because this processor will never return any subclass of SeataDataSourceProxy
*/
// Seata是不建議用戶自己構建SeataDataSourceProxy對象的,即使用戶自己構建了SeataDataSourceProxy對象,Seata也會重新處理
LOGGER.warn("Manually register SeataDataSourceProxy(or its subclass) bean is discouraged! bean name: {}", beanName);
// 獲取用戶包裝好的代理對象
SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
// 獲取原生DataSource
DataSource origin = proxy.getTargetDataSource();
// 重新包裝,并返回
Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
// this mean origin is either excluded by user or had been proxy before
if (origin == originEnhancer) {
return origin;
}
// else, put <origin, proxy> to holder and return originEnhancer
DataSourceProxyHolder.put(origin, proxy);
// 返回包裝好的代理對象SeataDataSourceProxy
return originEnhancer;
}
}
1.通過以上代碼解讀,有一個點我們需要注意,就是開發(fā)人員不需要自己的構建SeataDataSourceProxy對象,使用原生的DataSource即可,Seata會幫助我們構建SeataDataSourceProxy對象。
SeatAutoConfiguration
SeatAutoConfiguration主要功能就是創(chuàng)建GlobalTransactionScanner對象,所以核心功能全部在GlobalTransactionScanner里面。
// 配置seata.enabled=true
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
// 裝配順序
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
@Bean(BEAN_NAME_FAILURE_HANDLER)
// 失敗處理器,可替換
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
// 開發(fā)人員可自定義GlobalTransactionScanner
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
ConfigurableListableBeanFactory beanFactory,
@Autowired(required = false) List<ScannerChecker> scannerCheckers) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
// set bean factory
GlobalTransactionScanner.setBeanFactory(beanFactory);
// add checkers
// '/META-INF/services/io.seata.spring.annotation.ScannerChecker'
GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));
// spring beans
GlobalTransactionScanner.addScannerCheckers(scannerCheckers);
// add scannable packages
GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());
// add excludeBeanNames
GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());
//set accessKey and secretKey
GlobalTransactionScanner.setAccessKey(seataProperties.getAccessKey());
GlobalTransactionScanner.setSecretKey(seataProperties.getSecretKey());
// create global transaction scanner
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}
1.裝配SeataAutoConfiguration要求配置中seata.enabled=true;
2.我們可以自定義FailureHandler;這個失敗處理器是專門給TM使用的;
3.同樣我們也可以自定義GlobalTransactionScanner,不過基本上不會這么做,除非有特殊需求;
GlobalTransactionScanner里面基本上做兩個事情:
- 代理所有被
@GlobalTransactional或@GlobalLock注解的方法; - 使用Neety初始化
TM Client和RM Client,以便實現和TC通信;TC也就是我們的Seata Server;
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
}
AbstractAutoProxyCreator:通過wrapIfNecessary方法代理所有被@GlobalTransactional或@GlobalLock注解的方法;
ConfigurationChangeListener:通過onChangeEvent方法監(jiān)聽配置service.disableGlobalTransaction的變化;InitializingBean:通過afterPropertiesSet方法初始化TM Client和RM Client;ApplicationContextAware:通過setApplicationContext方法獲取IOC容器;DisposableBean:當GlobalTransactionScanner被銷毀時,通過destroy方法來回收資源;
我們重點關注wrapIfNecessary和afterPropertiesSet方法:
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// 檢查Bean是否符合被代理的要求
// 1. 不能是配置類,比如以Configuration、Properties、Config結尾的Bean名稱
// 2. Bean所在的包名在掃描范圍內
// 3. 不能被@Scope注解
if (!doCheckers(bean, beanName)) {
return bean;
}
try {
synchronized (PROXYED_SET) {
// 如果已經被代理,就跳過
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
// 檢查是否被TCC注解
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// 初始化TCC Fence Clean Task
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
// 創(chuàng)建TCC代理類
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
// 如果不是TCC代理,那么先獲取當前類和它實現的接口
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 判斷當前類及相關方法是否被@GlobalTransactional或@GlobalLock注解
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
// 沒有被注解,不代理
return bean;
}
// 準備創(chuàng)建方法攔截器
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
// 攔截器創(chuàng)建完畢
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
// 如果bean不是代理對象,那么不做方法攔截,直接返回
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 準備把方法攔截器插入進去
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// 獲取所有的方法攔截器,包括GlobalTransactionalInterceptor
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
// 依次添加進目標對象中
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
// 返回被代理的bean
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
通過上述源碼分析可知:Seata是根據類、接口和方法上的@GlobalTransactional或@GlobalLock注解來判斷是否需要針對目標方法做攔截的。
@Override
public void afterPropertiesSet() {
// 如果不允許全局事務
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
// 添加監(jiān)聽器,監(jiān)聽配置的變化
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false, true)) {
// 準備初始化TM Client、RM Client
initClient();
}
}
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//初始化TM Client
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//初始化RM Client
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
至此,SeatAutoConfiguration的工作處理完畢;
HttpAutoConfiguration
HttpAutoConfiguration的工作比較簡單,我們想象一下,RM如何知道它屬于哪一個分布式事務?這就需要一個統一的標識來決定所有的分支事務都屬于同一個分布式事務,這個標識在Seata中叫做XID;
XID由TM開啟分布式事務時生成,通過RPC的方式從一個分支事務傳遞到另一個分支事務,所以我們在RM端需要一個從RPC中解析獲取XID的功能,以及在業(yè)務邏輯處理完畢后,銷毀當前線程中XID的功能。
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication
public class HttpAutoConfiguration extends WebMvcConfigurerAdapter {
// 注冊攔截器
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new TransactionPropagationInterceptor());
}
// 添加異常解析處理器
@Override
public void extendHandlerExceptionResolvers(List<HandlerExceptionResolver> exceptionResolvers) {
exceptionResolvers.add(new HttpHandlerExceptionResolver());
}
}
public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);
// 前置處理邏輯
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
// 獲取當前線程XID
String xid = RootContext.getXID();
// 從rpc中獲取XID
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
}
// 如果線程中沒有XID,并且從請求中拿到了XID,那么把請求中的XID綁定到當前線程
if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
RootContext.bind(rpcXid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] to RootContext", rpcXid);
}
}
return true;
}
// 后置處理邏輯
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 業(yè)務邏輯處理完畢,從當前線程中刪除XID
if (RootContext.inGlobalTransaction()) {
XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
}
}
}
public class HttpHandlerExceptionResolver extends AbstractHandlerExceptionResolver {
// 發(fā)生異常后,刪除當前線程中的XID
@Override
protected ModelAndView doResolveException(HttpServletRequest request, HttpServletResponse httpServletResponse, Object o, Exception e) {
XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
return null;
}
}
小結
通過以上源碼分析和圖解Seata AT模式,我們可以了解到以下幾點:
1.GlobalTransactionInterceptor屬于TM側,它主要負責通過TM Client開啟分布式事務、提交分布式事務以及回滾分布式事務;屬于大總管。
2.SeataDataSourceProxy屬于RM側,它主要負責分支事務的開啟,提交以及回滾,屬于真正干活的小兵。
3.TM Client和RM Client純屬于兩個通信工具,負責與TC端建立通信。
4.TransactionPropagationInterceptor和HttpHandlerExceptionResolver服務于分支事務,負責全局事務XID的獲取以及業(yè)務邏輯處理完畢的善后事宜。
以上就是Seata AT模式啟動過程圖文示例詳解的詳細內容,更多關于Seata AT模式啟動過程的資料請關注腳本之家其它相關文章!
相關文章
JDK與JRE的下載和安裝以及配置JDK環(huán)境變量圖文教程
JRE也就是(Java?RuntimeEnvironment)Java運行環(huán)境,是運行JAVA程序所必須的環(huán)境的集合,包含各種類庫,下面這篇文章主要給大家介紹了關于JDK與JRE的下載和安裝以及配置JDK環(huán)境變量的相關資料,需要的朋友可以參考下2023-12-12
使用MockMvc進行controller層單元測試 事務自動回滾的完整案例
這篇文章主要介紹了使用MockMvc進行controller層單元測試 事務自動回滾的完整案例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
myeclipse安裝Spring Tool Suite(STS)插件的方法步驟
這篇文章主要介紹了myeclipse安裝Spring Tool Suite(STS)插件的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-08-08

