Spring?AOP實(shí)現(xiàn)聲明式事務(wù)機(jī)制源碼解析
一、聲明式全局事務(wù)
在Seata
示例工程中,能看到@GlobalTransactional
,如下方法示例:
@GlobalTransactional public boolean purchase(long accountId, long stockId, long quantity) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " + xid); boolean stockResult = reduceAccount(accountId,stockId, quantity); if (!stockResult) { throw new RuntimeException("賬號服務(wù)調(diào)用失敗,事務(wù)回滾!"); } Long orderId = createOrder(accountId, stockId, quantity); if (orderId == null || orderId <= 0) { throw new RuntimeException("訂單服務(wù)調(diào)用失敗,事務(wù)回滾!"); } return true; }
purchase
方法上加上此注解,即表示此方法內(nèi)的reduceAccount
和createOrder
兩個(gè)微服務(wù)調(diào)用也將加入到分布式事務(wù)中,即扣除賬戶余額與創(chuàng)建訂單將具有分布式事務(wù)的數(shù)據(jù)一致性保障能力。
了解 Spring 注解事務(wù)實(shí)現(xiàn)的話,應(yīng)該也能推測出,Seata 的事務(wù)能力也可能是基于 Spring 的 AOP 機(jī)制,給標(biāo)注了@GlobalTransactional
的方法做 AOP 增加,織入額外的邏輯以完成分布式事務(wù)的能力,偽代碼大致如下:
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); try { tx.begin(xxx); ... purchase(xxx)//給purchase增加全局事務(wù)處理能力 ... tx.commit(); } catch (Exception exx) { tx.rollback(); throw exx; }
本篇就介紹Seata 如何使用 Spring AOP 來將注解變成分布式事務(wù)的代碼。
二、源碼
在上一篇《Seata1.6源碼全局事務(wù)注解@GlobalTransactional的識別》bean,并對這類 bean 添加GlobalTransactionalInterceptor
,進(jìn)行 AOP 增強(qiáng),加入分布式事務(wù)的能力。本篇延續(xù)這個(gè)話題繼續(xù),梳理 AOP 增強(qiáng)的邏輯。
通過下邊的調(diào)用堆棧幫大家梳理出 在源碼AbstractAutoProxyCreator#wrapIfNecessary
中有 createProxy
的調(diào)用。
createProxy:443, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) wrapIfNecessary:344, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) wrapIfNecessary:307, GlobalTransactionScanner (io.seata.spring.annotation) postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) createBean:542, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
AbstractAutoProxyCreator#createProxy
其中new ProxyFactory()
則是 AOP 的關(guān)鍵。
protected Object createProxy(Class<?> beanClass, @Nullable String beanName, @Nullable Object[] specificInterceptors, TargetSource targetSource) { if (this.beanFactory instanceof ConfigurableListableBeanFactory) { // 為目標(biāo) Bean 的 BeanDefinition 對象設(shè)置一個(gè)屬性 // org.springframework.aop.framework.autoproxy.AutoProxyUtils.originalTargetClass -> 目標(biāo) Bean 的 Class 對象 AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass); } // <1> 創(chuàng)建一個(gè)代理工廠 ProxyFactory proxyFactory = new ProxyFactory(); // <2> 復(fù)制當(dāng)前 ProxyConfig 的一些屬性(例如 proxyTargetClass、exposeProxy) proxyFactory.copyFrom(this); /** * <3> 判斷是否類代理,也就是是否開啟 CGLIB 代理 * 默認(rèn)配置下為 `false`,參考 {@link org.springframework.context.annotation.EnableAspectJAutoProxy} */ if (!proxyFactory.isProxyTargetClass()) { /* * <3.1> 如果這個(gè) Bean 配置了進(jìn)行類代理,則設(shè)置為 `proxyTargetClass` 為 `true` */ if (shouldProxyTargetClass(beanClass, beanName)) { proxyFactory.setProxyTargetClass(true); } else { /* * <3.2> 檢測當(dāng)前 Bean 實(shí)現(xiàn)的接口是否包含可代理的接口 * 如沒有實(shí)現(xiàn),則將 `proxyTargetClass` 設(shè)為 `true`,表示需要進(jìn)行 CGLIB 提升 */ evaluateProxyInterfaces(beanClass, proxyFactory); } } /* * <4> 對入?yún)⒌?Advisor 進(jìn)一步處理,因?yàn)槠渲锌赡苓€存在 Advice 類型,需要將他們包裝成 DefaultPointcutAdvisor 對象 * 如果配置了 `interceptorNames` 攔截器,也會添加進(jìn)來 */ Advisor[] advisors = buildAdvisors(beanName, specificInterceptors); // <5> 代理工廠添加 Advisor 數(shù)組 proxyFactory.addAdvisors(advisors); // <6> 代理工廠設(shè)置 TargetSource 對象 proxyFactory.setTargetSource(targetSource); // <7> 對 ProxyFactory 進(jìn)行加工處理,抽象方法,目前沒有子類實(shí)現(xiàn) customizeProxyFactory(proxyFactory); proxyFactory.setFrozen(this.freezeProxy); // <8> 是否這個(gè) AdvisedSupport 配置管理器已經(jīng)過濾過目標(biāo)類(默認(rèn)為 false) if (advisorsPreFiltered()) { // 設(shè)置 `preFiltered` 為 `true` // 這樣 Advisor 們就不會根據(jù) ClassFilter 進(jìn)行過濾了,而直接通過 MethodMatcher 判斷是否處理被攔截方法 proxyFactory.setPreFiltered(true); } // Use original ClassLoader if bean class not locally loaded in overriding class loader ClassLoader classLoader = getProxyClassLoader(); if (classLoader instanceof SmartClassLoader && classLoader != beanClass.getClassLoader()) { classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader(); } // <9> 通過 ProxyFactory 代理工廠創(chuàng)建代理對象 return proxyFactory.getProxy(getProxyClassLoader()); }
上邊源碼讀起來很生硬,對于我們使用來梳理核心源碼流程來說,留意 AOP 實(shí)現(xiàn)的幾個(gè)關(guān)鍵要素即可:
- 配置target,被代理者(類或方法中有標(biāo)注@GlobalTransactional的bean),最終還是要調(diào)用他么的方法
- 配置接口,即代理要具備的功能
- 配置額外的切面 addAdvisors,這里是指定
GlobalTransactionalInterceptor
- 根據(jù)ClassLoader 類加載器創(chuàng)建代理
由此我們可以推測中分布式事務(wù)的邏輯是在 GlobalTransactionalInterceptor
中,核心邏輯的實(shí)現(xiàn)應(yīng)該就是invoke
中,我們從GlobalTransactionalInterceptor#invoke
源碼中理一理:
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); // 獲取方法上的@GlobalTransactional注解中的內(nèi)容 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); // 獲取方法上的@GlobalLock注解中的內(nèi)容 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); //判斷是否禁用或者降級狀態(tài) boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { //構(gòu)建事務(wù)描述信息,這些基礎(chǔ)配置信息很重要 transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes(), globalTransactionalAnnotation.lockStrategyMode()); } else { transactional = this.aspectTransactional; } //若是@GlobalTransactional return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { //若是@GlobalLock return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); }
handleGlobalTransaction
中開始了重點(diǎn),transactionalTemplate
從其名字可知,這是模板方法模式,new TransactionalExecutor()
中 getTransactionInfo
是在構(gòu)建事務(wù)的一些基礎(chǔ)信息,execute()
中則是指定了事務(wù)目標(biāo)方法(如purchase
方法),
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true; try { return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } ... @Override public TransactionInfo getTransactionInfo() { // reset the value of timeout int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); ... return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { ... } } finally { if (ATOMIC_DEGRADE_CHECK.get()) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } }
execute
方法中的內(nèi)容是重點(diǎn)
public Object execute(TransactionalExecutor business) throws Throwable { // 1. Get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 1.2 Handle the transaction propagation. Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { //... // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'. //若全局事務(wù)上下文未就緒則new DefaultGlobalTransaction(); if (tx == null) { tx = GlobalTransactionContext.createNew(); } // set current tx config to holder GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, // else do nothing. Of course, the hooks will still be triggered. //2. 開啟全局事務(wù), // 2.1 triggerBeforeBegin() // 2.2 會跟TC通信獲取全局事務(wù)ID:xid, // 2.3 RootContext.bind(xid); // 2.4 triggerAfterBegin()的事件通知調(diào)用 beginTransaction(txInfo, tx); Object rs; try { // 執(zhí)行我們的事務(wù)方法如`purchase`方法 rs = business.execute(); } catch (Throwable ex) { // 3. 遇到 business exception 則回滾 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. 提交事務(wù),觸發(fā)事件回調(diào) // 4.1 triggerBeforeCommit(); // 4.2 tx.commit();與TC通信提交事務(wù),內(nèi)部默認(rèn)是有5次重試機(jī)會 // 4.3 triggerAfterCommit(); commitTransaction(tx, txInfo); //返回結(jié)果 return rs; } finally { //5. clear resumeGlobalLockConfig(previousConfig); //結(jié)束后的回調(diào) triggerAfterCompletion(); cleanUp(); } } finally { // If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } }
三、小結(jié):
本篇梳理了引入seata-spring-boot-starter
模塊后,其內(nèi)部會通過的自動裝配機(jī)制會在SeataAutoConfiguration
類中,掃描具有@GlobalTransactional
全局事務(wù)注解的類和方法的 bean,并通過ProxyFactory
機(jī)制對這類 bean 進(jìn)行AOP代理, 添加GlobalTransactionalInterceptor
,在其內(nèi)部invoke
中通過transactionalTemplate
加入分布式事務(wù)的能力:
- 開啟事務(wù)與 TC 進(jìn)行通信,獲取 xid ,注入事務(wù)上下文
- 調(diào)用目標(biāo)方法
- 之后根據(jù)結(jié)果是否正常執(zhí)行二階段的提交或回滾
但這里僅僅是 TM 的能力,仍未到RM的職能邊界。
以上就是Spring AOP實(shí)現(xiàn)聲明式事務(wù)機(jī)制源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Spring AOP聲明式事務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
- Spring事務(wù)框架之TransactionStatus源碼解析
- Spring事務(wù)控制策略及@Transactional失效問題解決避坑
- Spring強(qiáng)大事務(wù)兼容數(shù)據(jù)庫多種組合解決業(yè)務(wù)需求
- Spring事務(wù)@Transactional注解四種不生效案例場景分析
- Spring學(xué)習(xí)JdbcTemplate數(shù)據(jù)庫事務(wù)參數(shù)
- Spring框架JdbcTemplate數(shù)據(jù)庫事務(wù)管理完全注解方式
- Spring事務(wù)框架之TransactionDefinition源碼解析
相關(guān)文章
Java前后端任意參數(shù)類型轉(zhuǎn)換方式(Date、LocalDateTime、BigDecimal)
這篇文章主要介紹了Java前后端任意參數(shù)類型轉(zhuǎn)換方式(Date、LocalDateTime、BigDecimal),具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06Spring Boot中Elasticsearch的連接配置原理與使用詳解
在Spring Boot中,我們可以通過Elasticsearch實(shí)現(xiàn)對數(shù)據(jù)的搜索和分析,本文將介紹Spring Boot中Elasticsearch的連接配置、原理和使用方法,感興趣的可以了解一下2023-09-09解決SpringBoot整合ElasticSearch遇到的連接問題
這篇文章主要介紹了解決SpringBoot整合ElasticSearch遇到的連接問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Idea安裝Eslint插件提示:Plugin NativeScript was not installed的問題
這篇文章主要介紹了Idea安裝Eslint插件提示:Plugin NativeScript was not installed的問題,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10spring boot使用logback日志級別打印控制操作
這篇文章主要介紹了spring boot使用logback日志級別打印控制操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03