Spring多線程事務處理解決方案
一、背景
本文主要介紹了spring多線程事務的解決方案,心急的小伙伴可以跳過上面的理論介紹分析部分直接看最終解決方案。
在我們日常的業(yè)務活動中,經(jīng)常會出現(xiàn)大規(guī)模的修改插入操作,比如在3.0的活動賽事創(chuàng)建,涉及到十幾張表的插入(一張表可能插入一行或者多行數(shù)據(jù)),由于單線程模型的關系,所有的sql都是串行,即后面的sql必須都要等到前面的sql執(zhí)行完成才能繼續(xù)。但是在很多場景下,sql的執(zhí)行順序并不影響業(yè)務的結果,面對這樣的場景,我們很自然的想到了使用異步的方式去處理,可是我們同時又希望整個創(chuàng)建操作是事務性的,即要全部成功,要么全部失敗,但是單純的使用異步線程并不能達到我們理想的效果。
這個時候,我們需要一種多線程下保證事務的解決方案。
代碼片段,大量的同步保存操作
public void much(){ //業(yè)務操作1 doBusiness1(); //業(yè)務操作2 doBusiness2(); //業(yè)務操作3 doBusiness3(); //業(yè)務操作4 doBusiness4(); } private void doBusiness1() { //執(zhí)行sql1 //執(zhí)行sql2 //執(zhí)行sql3 //執(zhí)行sql4 }
每個業(yè)務操作可以是相關聯(lián)的,也有可能是完全無關的,但如果做成異步的話我們就無法保證事務,怎么去解決這個問題呢?
二、理論先行
1.事務介紹
我們先確定spring事務的本質是什么,spring本身不支持事務,spring實現(xiàn)事務只是對我們原有的業(yè)務邏輯做了一層包裝,他替我們決定了什么時候開啟事務,什么情況下應該向數(shù)據(jù)庫提交,什么時候回滾,及實現(xiàn)我們設置的一些事務參數(shù),包括回滾的條件,傳播類型等。
我們所熟知的spring事務有兩種主流的解決方式,一種是聲明式事務,一種是編程式事務。
先來講我們最常用的聲明式事務。
1.1聲明式事務
聲明式事務就是我們最常用的@Transactional注解,通常我們只需要在我們想交由spring控制的事務方法上加上注解即可,這個注解有一些重要的參數(shù),由于不是本文重點,就不在此展開。這是一個經(jīng)典的spring的aop實現(xiàn),為了弄清楚在加上@Transactional注解后spring到底為我們做了什么,我們可以從兩方面入手,一是spring如何給我們生成相應的代理對象,二是這個代理對象為我們做了什么。
事務的開始是由@EnableTransactionManagement 注解產(chǎn)生,這個注解在運行時會導入TransactionManagementConfigurationSelector這個類,這個類本質上是一個ImportSelector,他根據(jù)adviceMode將特定的配置類導入進去,分別為AutoProxyRegistrar 后置處理器和ProxyTransactionManagementConfiguration Advisor。
AutoProxyRegistrar 實現(xiàn)了ImportBeanDefinitionRegistrar 重寫了registerBeanDefinitions 方法
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { boolean candidateFound = false; Set<String> annTypes = importingClassMetadata.getAnnotationTypes(); for (String annType : annTypes) { // ... AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); } // ... } @Nullable public static BeanDefinition registerAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source); }
。該方法最終注入了InfrastructureAdvisorAutoProxyCreator。InfrastructureAdvisorAutoProxyCreator這個類就是一個bean的后置處理器,最終的作用就是處理需要的代理對象。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) { if (bean != null) { Object cacheKey = getCacheKey(bean.getClass(), beanName); if (this.earlyProxyReferences.remove(cacheKey) != bean) { return wrapIfNecessary(bean, beanName, cacheKey); } } return bean; } protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // ... // 拿當前bean去匹配容器中的 Advisors,如果找到符合的就生成代理對象 // Create proxy if we have advice. Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null); if (specificInterceptors != DO_NOT_PROXY) { this.advisedBeans.put(cacheKey, Boolean.TRUE); Object proxy = createProxy( bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean)); this.proxyTypes.put(cacheKey, proxy.getClass()); return proxy; } this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; }
ProxyTransactionManagementConfiguration的作用就是來生成具體的Advisor,他注冊了三個bean,
- 該類主要完成以下幾個任務:
- 創(chuàng)建TransactionAttributeSource對象:用于解析@Transactional注解并生成事務屬性。
- 創(chuàng)建TransactionInterceptor對象:用于創(chuàng)建事務通知,將事務屬性應用到目標方法,這其實就是一個事務模板,如下所示
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { //TransactionAttributeSource內部保存著當前類某個方法對應的TransactionAttribute---事務屬性源 //可以看做是一個存放TransactionAttribute與method方法映射的池子 TransactionAttributeSource tas = getTransactionAttributeSource(); //獲取當前事務方法對應的TransactionAttribute final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); //定位TransactionManager final TransactionManager tm = determineTransactionManager(txAttr); ..... //類型轉換為局部事務管理器 PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { //TransactionManager根據(jù)TransactionAttribute創(chuàng)建事務后返回 //TransactionInfo封裝了當前事務的信息--包括TransactionStatus TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { //繼續(xù)執(zhí)行過濾器鏈---過濾鏈最終會調用目標方法 //因此可以理解為這里是調用目標方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { //目標方法拋出異常則進行判斷是否需要回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除當前事務信息 cleanupTransactionInfo(txInfo); } ... //正常返回,那么就正常提交事務唄(當然還是需要判斷TransactionStatus狀態(tài)先) commitTransactionAfterReturning(txInfo); return retVal; } ...
- 創(chuàng)建TransactionAdvisor對象:將事務通知和切點(Pointcut)組合成Advisor。
- 創(chuàng)建TransactionAttributeSourceAdvisor對象:將事務屬性和切點組合成Advisor
@Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration { @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor( TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) { BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource); advisor.setAdvice(transactionInterceptor); if (this.enableTx != null) { advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } return advisor; } @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { // TransactionAttributeSource 是一個接口,具體注入的是 Annotationxxxx return new AnnotationTransactionAttributeSource(); } @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource); if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; } }
觀察TransactionAdvisor代碼實現(xiàn)@SuppressWarnings("serial")
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
@Nullable private TransactionAttributeSource transactionAttributeSource; private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override @Nullable protected TransactionAttributeSource getTransactionAttributeSource() { return transactionAttributeSource; } }; /** * Set the transaction attribute source which is used to find transaction * attributes. This should usually be identical to the source reference * set on the transaction interceptor itself. * @see TransactionInterceptor#setTransactionAttributeSource */ public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) { this.transactionAttributeSource = transactionAttributeSource; } /** * Set the {@link ClassFilter} to use for this pointcut. * Default is {@link ClassFilter#TRUE}. */ public void setClassFilter(ClassFilter classFilter) { this.pointcut.setClassFilter(classFilter); } @Override public Pointcut getPointcut() { return this.pointcut; }
可以見到里面已經(jīng)包含了pointcut,這就能將我們需要被增加的事務方法找出。
ProxyTransactionManagementConfiguration負責將需要包裝的bean和方法找出并包裝成advisor,InfrastructureAdvisorAutoProxyCreator根據(jù)advisor生成相應的代理對象。
小結:InfrastructureAdvisorAutoProxyCreator遍歷容器中的bean,嘗試去自動代理,匹配的工作就交由advisor中的point,如果匹配成功就為其創(chuàng)建代理對象,這個代理對象中放入了TransactionInterceptor攔截器
,等到相關方法調用時,調用的是代理對象的方法,然后通過責任鏈模式通過TransactionInterceptor處理,以此來進行事務的操作。
聲明式事務的介紹先到這里,接下來我們來介紹下編程式事務。
1.2編程式事務
編程式事務的核心就是將spring為我們做好的那些步驟拆出來,交由開發(fā)者去控制事務何時開啟、提交、回滾,他的運行本質和聲明式事務并沒有兩樣。
模板如下
public class TransactionMain { public static void main(String[] args) throws ClassNotFoundException, SQLException { test(); } private static void test() { DataSource dataSource = getDS(); JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource); //JdbcTransactionManager根據(jù)TransactionDefinition信息來進行一些連接屬性的設置 //包括隔離級別和傳播行為等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //開啟一個新事務---此時autocommit已經(jīng)被設置為了false,并且當前沒有事務,這里創(chuàng)建的是一個新事務 TransactionStatus ts = jtm.getTransaction(transactionDef); //進行業(yè)務邏輯操作 try { update(dataSource); jtm.commit(ts); }catch (Exception e){ jtm.rollback(ts); System.out.println("發(fā)生異常,我已回滾"); } } private static void update(DataSource dataSource) throws Exception { JdbcTemplate jt = new JdbcTemplate(); jt.setDataSource(dataSource); jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6"); throw new Exception("我是來搗亂的"); } }
三、方案探索
1.直接使用多線程
我們在開啟代碼中事務,并在業(yè)務邏輯中直接使用多線程,是否能保證事務?
@Transactional public void testDirect() { new Thread(()->{ Per per = new Per(); per.setName("t1"); perService.save(per); }).start(); new Thread(()->{ Per per1 = new Per(); per1.setName("t2"); perService.save(per1); throw new RuntimeException("Exception test"); }).start(); }
顯然,這種方式并不能保證事務,哪怕加上了事務注解,因為子線程拋出的異常并不能在主線程中捕獲,也不能被其他線程感知到。
2.事務模板中使用多線程
package com.user.util; import lombok.RequiredArgsConstructor; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @Component @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個數(shù)據(jù)源 */ private final DataSource dataSource; public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) { if(executor==null){ throw new IllegalArgumentException("線程池不能為空"); } DataSourceTransactionManager transactionManager = getTransactionManager(); //是否發(fā)生了異常 AtomicBoolean ex=new AtomicBoolean(); List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size()); List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size()); tasks.forEach(task->{ taskFutureList.add(CompletableFuture.runAsync( () -> { try{ //1.開啟新事務 transactionStatusList.add(openNewTransaction(transactionManager)); //2.異步任務執(zhí)行 task.run(); }catch (Throwable throwable){ //打印異常 throwable.printStackTrace(); //其中某個異步任務執(zhí)行出現(xiàn)了異常,進行標記 ex.set(Boolean.TRUE); //其他任務還沒執(zhí)行的不需要執(zhí)行了 taskFutureList.forEach(completableFuture -> completableFuture.cancel(true)); } } , executor) ); }); try { //阻塞直到所有任務全部執(zhí)行結束---如果有任務被取消,這里會拋出異常滴,需要捕獲 CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } //發(fā)生了異常則進行回滾操作,否則提交 if(ex.get()){ System.out.println("發(fā)生異常,全部事務回滾"); transactionStatusList.forEach(transactionManager::rollback); }else { System.out.println("全部事務正常提交"); transactionStatusList.forEach(transactionManager::commit); } } private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) { //JdbcTransactionManager根據(jù)TransactionDefinition信息來進行一些連接屬性的設置 //包括隔離級別和傳播行為等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //開啟一個新事務---此時autocommit已經(jīng)被設置為了false,并且當前沒有事務,這里創(chuàng)建的是一個新事務 return transactionManager.getTransaction(transactionDef); } private DataSourceTransactionManager getTransactionManager() { return new DataSourceTransactionManager(dataSource); } }
測試
public void test(){ List<Runnable> tasks=new ArrayList<>(); tasks.add(()->{ Per per = new Per(); per.setName("t1"); perService.save(per); }); tasks.add(()->{ Per per = new Per(); per.setName("t2"); perService.save(per); }); multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool()); }
執(zhí)行結果
java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread
at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:198) ~[spring-tx-5.3.10.jar:5.3.10]
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:371) ~[spring-jdbc-5.3.10.jar:5.3.10]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:992) ~[spring-tx-5.3.10.jar:5.3.10]
at org.springframework.transaction.suppoAbstractPlatformTransactionrt.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager
結果報了這個錯,這個錯誤信息室找不到綁定在線程上的key為HikariDataSource的資源,因為事務資源都是綁定在線程上的,當事務提交或者回滾時,他需要尋找綁定在當前線程上的資源,如果找不到,就會報錯。
原理剖析:
首先我們找到綁定線程資源的關鍵方法org.springframework.transaction.support.TransactionSynchronizationManager#bindResource
/** * Bind the given resource for the given key to the current thread. * @param key the key to bind the value to (usually the resource factory) * @param value the value to bind (usually the active resource object) * @throws IllegalStateException if there is already a value bound to the thread * @see ResourceTransactionManager#getResourceFactory() */ public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == null) { map = new HashMap<>(); resources.set(map); } Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { throw new IllegalStateException( "Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread"); } }
根據(jù)debug會發(fā)現(xiàn),spring在開啟事務時會自動為我們調用這個方法,綁定key為HikariDataSource,value為ConnectionHolder到threadlocal中。第二次sql執(zhí)行時會綁定key為DefaultSqlSessionFactory,value為DefaultSqlSessionFactory。
既然講到了事務資源的綁定時機,下面就順便講一下這兩種資源在何時釋放。我們再回顧一下事務的執(zhí)行流程及機制。spring處理事務的原理就是基于aop,每個需要實現(xiàn)事務的方法都要通過TransactionInterceptor這個攔截器,通過這個攔截器去實現(xiàn)事務增強。
@Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... //重點,這里注冊了一個回調,最后會調回下面 //父類實現(xiàn) return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { @Override @Nullable public Object proceedWithInvocation() throws Throwable { //原始方法 return invocation.proceed(); } @Override public Object getTarget() { return invocation.getThis(); } @Override public Object[] getArguments() { return invocation.getArguments(); } }); }
他最終會交給他的父類模板類org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction實現(xiàn),在來看一下這個類為我們處理了什么,直接看重點
/** * General delegate for around-advice-based subclasses, delegating to several other template * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager} * as well as regular {@link PlatformTransactionManager} implementations and * {@link ReactiveTransactionManager} implementations for reactive return types. * @param method the Method being invoked * @param targetClass the target class that we're invoking the method on * @param invocation the callback to use for proceeding with the target invocation * @return the return value of the method, if any * @throws Throwable propagated from the target invocation */ @Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. //說人話就是獲取事務資源,裝配事務管理器 TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. //事務相關信息,包括傳播級別,什么異常下回滾等 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. //就是他的子類注冊的回調,真正的業(yè)務邏輯 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception //回滾 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清理事務信息 cleanupTransactionInfo(txInfo); } if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } //提交 commitTransactionAfterReturning(txInfo); return retVal; } ... }
繼續(xù)往下,觀察commitTransactionAfterReturning(txInfo)的實現(xiàn),發(fā)現(xiàn)這一步是由事務管理器完成的。
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
進而可以推斷,rollback操作也是同樣的道理,有興趣的小伙伴可以自己debug一下,繼續(xù)走下去,觀察事務管理器為我們做了什么。
@Override public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } processCommit(defStatus); }
到此,spring的整個事務流程就已經(jīng)非常清晰了,我們想要實現(xiàn)多事務管理的方法也找到了,難就是去控制事務的資源。只要我們拿到了相應的事務資源,然后在創(chuàng)建自己的事務管理器控制事務何時提交或者回滾,這樣我們就可以實現(xiàn)一個多線程同時提交回滾,類似于二階段提交的操作,來達到多線程事務的統(tǒng)一。
3.多線程事務管理器
不多說,直接上代碼看最終版本
package com.controller; import lombok.Builder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.CollectionUtils; import javax.sql.DataSource; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * 多線程事務管理 */ @Component @Slf4j @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個數(shù)據(jù)源 */ private final DataSource dataSource; private final static ThreadLocal<Boolean> immediatelyCommitFlag = new ThreadLocal<>(); private final static ThreadLocal<List<TransactionStatus>> transactionStatusListThreadLocal = new ThreadLocal<>(); private final static ThreadLocal<List<TransactionResource>> transactionResourcesthreadLocal = new ThreadLocal<>(); private final static ThreadLocal<Map<Object, Object>> mainNativeResourceThreadLocal = new ThreadLocal<>(); /** * 多線程下事務執(zhí)行 * * @param tasks 任務列表 * @param immediatelyCommit 是否需要立即提交 */ public List<CompletableFuture> runAsyncButWaitUntilAllDown(List<Runnable> tasks, Boolean immediatelyCommit) { Executor executor = Executors.newCachedThreadPool(); DataSourceTransactionManager transactionManager = getTransactionManager(); //是否發(fā)生了異常 AtomicBoolean ex = new AtomicBoolean(); List<CompletableFuture> taskFutureList = new CopyOnWriteArrayList<>(); List<TransactionStatus> transactionStatusList = new CopyOnWriteArrayList<>(); List<TransactionResource> transactionResources = new CopyOnWriteArrayList<>(); //記錄原生主事務資源 //這一步可能在原生sql執(zhí)行前,也可能在原生sql執(zhí)行后,所以這個資源可能不夠充分,需要在下面繼續(xù)處理 //如果返回的是原資源集合的引用,下面一步可以不用 Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap(); if (!CollectionUtils.isEmpty(resourceMap)) { mainNativeResourceThreadLocal.set(new HashMap<>(resourceMap)); } Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap(); Executor finalExecutor = executor; AtomicInteger atomicInteger = new AtomicInteger(0); tasks.forEach(task -> { taskFutureList.add(CompletableFuture.runAsync( () -> { log.info("任務開始"); try { //1.開啟新事務 TransactionStatus transactionStatus = openNewTransaction(transactionManager); log.info("開啟新事務 successfully"); transactionStatusList.add(transactionStatus); atomicInteger.incrementAndGet(); System.out.println("atomicInteger.get()"+atomicInteger.incrementAndGet()); System.out.println(transactionStatus); //2.異步任務執(zhí)行 task.run(); log.info("異步任務執(zhí)行 successfully"); //3.繼續(xù)事務資源復制,因為在sql執(zhí)行是會產(chǎn)生新的資源對象 transactionResources.add(TransactionResource.copyTransactionResource()); } catch (Throwable throwable) { log.info("任務執(zhí)行異常"+throwable.getMessage()); log.error("任務執(zhí)行異常",throwable); //其中某個異步任務執(zhí)行出現(xiàn)了異常,進行標記 ex.set(Boolean.TRUE); //其他任務還沒執(zhí)行的不需要執(zhí)行了 taskFutureList.forEach(completableFuture -> completableFuture.cancel(true)); } } , finalExecutor) ); }); try { //阻塞直到所有任務全部執(zhí)行結束---如果有任務被取消,這里會拋出異常滴,需要捕獲 CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get(); } catch (InterruptedException | ExecutionException e) { log.info("任務被取消"); log.error("任務被取消",e); } //發(fā)生了異常則進行回滾操作,否則提交 if (ex.get()) { log.info("發(fā)生異常,全部事務回滾"); for (int i = 0; i < transactionStatusList.size(); i++) { transactionResources.get(i).autoWiredTransactionResource(); Map<Object, Object> rollBackResourceMap = TransactionSynchronizationManager.getResourceMap(); log.info("回滾前事務資源size{},本身{}",rollBackResourceMap.size(),rollBackResourceMap); transactionManager.rollback(transactionStatusList.get(i)); transactionResources.get(i).removeTransactionResource(); } } else { if (immediatelyCommit) { log.info("全部事務正常提交"); for (int i = 0; i < transactionStatusList.size(); i++) { //transactionResources.get(i).autoWiredTransactionResource(); Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap(); log.info("提交前事務資源size{},本身{}",commitResourceMap.size(),commitResourceMap); transactionManager.commit(transactionStatusList.get(i)); transactionResources.get(i).removeTransactionResource(); } } else { //緩存全部待提交數(shù)據(jù) immediatelyCommitFlag.set(immediatelyCommit); transactionResourcesthreadLocal.set(transactionResources); transactionStatusListThreadLocal.set(transactionStatusList); } } //交還給主事務 if (immediatelyCommit) { mainTransactionResourceBack(!ex.get()); } return taskFutureList; } public void multiplyThreadTransactionCommit() { try { Boolean immediatelyCommit = immediatelyCommitFlag.get(); if (immediatelyCommit) { throw new IllegalStateException("immediatelyCommit cant call multiplyThreadTransactionCommit"); } //提交 //獲取存儲的事務資源和狀態(tài) List<TransactionResource> transactionResources = transactionResourcesthreadLocal.get(); List<TransactionStatus> transactionStatusList = transactionStatusListThreadLocal.get(); if (CollectionUtils.isEmpty(transactionResources) || CollectionUtils.isEmpty(transactionStatusList)) { throw new IllegalStateException("transactionResources or transactionStatusList is empty"); } //重新提交 DataSourceTransactionManager transactionManager = getTransactionManager(); log.info("全部事務正常提交"); for (int i = 0; i < transactionStatusList.size(); i++) { transactionResources.get(i).autoWiredTransactionResource(); Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap(); log.info("提交前事務資源size{},本身{}",commitResourceMap.size(),commitResourceMap); transactionManager.commit(transactionStatusList.get(i)); transactionResources.get(i).removeTransactionResource(); } } catch (Exception e) { mainTransactionResourceBack(false); log.error("multiplyThreadTransactionCommit fail", e); } finally { transactionResourcesthreadLocal.remove(); transactionStatusListThreadLocal.remove(); immediatelyCommitFlag.remove(); } //交還給主事務 mainTransactionResourceBack(true); } //主線程事務資源返還 public void mainTransactionResourceBack(Boolean subTransactionSuccess) { if (CollectionUtils.isEmpty(mainNativeResourceThreadLocal.get())) { //清除數(shù)據(jù) mainNativeResourceThreadLocal.remove(); return; } Map<Object, Object> nativeResource = new HashMap<>(mainNativeResourceThreadLocal.get()); Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap(); log.info("當前線程資事務源size{}--------------------------------{}",resourceMap.size(), resourceMap); log.info("原生線程事務資源size{}--------------------------------{}",nativeResource.size(), nativeResource); //已經(jīng)被綁定的資源不能重復綁定 if (!CollectionUtils.isEmpty(resourceMap)) { for (Object o : resourceMap.keySet()) { if (nativeResource.containsKey(o)) { nativeResource.remove(o); } } } nativeResource.forEach((k,v)->{ if (!(k instanceof DataSource)){ log.info("nativeResource 沒有 DataSource"); } }); //交還不能綁定factory nativeResource.forEach((k,v)->{ if (k instanceof DataSource){ TransactionSynchronizationManager.bindResource(k,v); } }); Map<Object, Object> finResource = TransactionSynchronizationManager.getResourceMap(); log.info("主線程最終事務源size{}--------------------------------{}",finResource.size(), finResource); //防止未激活事務 if (!TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.initSynchronization(); } //清除數(shù)據(jù) mainNativeResourceThreadLocal.remove(); if (!subTransactionSuccess) { throw new RuntimeException("子事務失敗,需要回滾"); } } private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) { //JdbcTransactionManager根據(jù)TransactionDefinition信息來進行一些連接屬性的設置 //包括隔離級別和傳播行為等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //開啟一個新事務---此時autocommit已經(jīng)被設置為了false,并且當前沒有事務,這里創(chuàng)建的是一個新事務 return transactionManager.getTransaction(transactionDef); } private DataSourceTransactionManager getTransactionManager() { return new DataSourceTransactionManager(dataSource); } /** * 保存當前事務資源,用于線程間的事務資源COPY操作 */ @Builder private static class TransactionResource { //事務結束后默認會移除集合中的DataSource作為key關聯(lián)的資源記錄 private Map<Object, Object> resources = new HashMap<>(); //下面五個屬性會在事務結束后被自動清理,無需我們手動清理 private Set<TransactionSynchronization> synchronizations = new HashSet<>(); private String currentTransactionName; private Boolean currentTransactionReadOnly; private Integer currentTransactionIsolationLevel; private Boolean actualTransactionActive; public static TransactionResource copyTransactionResource() { return TransactionResource.builder() //返回的是不可變集合,這里為了更加靈活,copy出一個集合過來 .resources(new HashMap<>(TransactionSynchronizationManager.getResourceMap())) //如果需要注冊事務監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認負責--spring事務內部默認也是這個值 .synchronizations(new LinkedHashSet<>()) .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName()) .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly()) .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()) .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive()) .build(); } //裝配事務資源,為提交/回滾做儲備 public void autoWiredTransactionResource() { //獲取當前線程事務資源 Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap(); for (Object o : resourceMap.keySet()) { if (resourceMap.containsKey(o)) { //移除重復事務資源key,避免綁定報錯 resources.remove(o); } } boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive(); //綁定事務資源,注意 綁定是綁定到當前主線程上,記得最后釋放交換主線程,再由主線程收回原有事務自選 resources.forEach(TransactionSynchronizationManager::bindResource); //如果需要注冊事務監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認負責--spring事務內部默認也是這個值 //避免重復激活或者事務未激活 if (!synchronizationActive) { TransactionSynchronizationManager.initSynchronization(); } TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive); TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly); } public void removeTransactionResource() { Map<Object, Object> resourceMap = new HashMap<>(TransactionSynchronizationManager.getResourceMap()); //事務結束后默認會移除集合中的DataSource作為key關聯(lián)的資源記錄 //DataSource如果重復移除,unbindResource時會因為不存在此key關聯(lián)的事務資源而報錯 resources.keySet().forEach(key -> { if (resourceMap.containsKey(key)) { TransactionSynchronizationManager.unbindResource(key); } }); } } }
驗證
@Transactional public String test(Integer par) { log.info("get(" + par + ")"); if (par == 3 || par == 5 || par == 6) { Per per2 = new Per(); per2.setName("t3"); per2.setGrou(Thread.currentThread().getName()); perService.save(per2); } List<Runnable> list = new ArrayList<>(); list.add(() -> { Per per = new Per(); per.setName("t1"); per.setGrou(Thread.currentThread().getName()); log.info("任務開始save"); perService.save(per); log.info("任務完成save"); if (par == 1) { throw new RuntimeException(); } }); list.add(() -> { Per per1 = new Per(); per1.setName("t2"); per1.setGrou(Thread.currentThread().getName()); log.info("任務開始save"); perService.save(per1); log.info("任務完成save"); if (par == 2) { throw new RuntimeException(); } }); log.info("runAsyncButWaitUntilAllDown start"); multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(list, false); if (par == 4 || par == 5 || par == 6) { Per per3 = new Per(); per3.setName("t4"); per3.setGrou(Thread.currentThread().getName()); perService.save(per3); if (par == 6) { throw new RuntimeException(); } } log.info("multiplyThreadTransactionCommit start"); multiplyThreadTransactionManager.multiplyThreadTransactionCommit(); return "ss"; }
2024-03-11 16:00:22.048 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 提交前事務資源size2,本身{org.apache.ibatis.session.defaults.DefaultSqlSessionFactory@3c1f842f=org.mybatis.spring.SqlSessionHolder@f0c0875, HikariDataSource (HikariPool-1)=org.springframework.jdbc.datasource.ConnectionHolder@3b3508e5} 2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 當前線程資事務源size0--------------------------------{} 2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 原生線程事務資源size2--------------------------------{org.apache.ibatis.session.defaults.DefaultSqlSessionFactory@3c1f842f=org.mybatis.spring.SqlSessionHolder@493bb8e6, HikariDataSource (HikariPool-1)=org.springframework.jdbc.datasource.ConnectionHolder@3950dc5b} 2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : nativeResource 沒有 DataSource 2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 主線程最終事務源size1--------------------------------{HikariDataSource (HikariPool-1)=org.springframework.jdbc.datasource.ConnectionHolder@3950dc5b}
有興趣的小伙伴可以做進一步測試。
在本公司項目組中利用這個機制優(yōu)化現(xiàn)有的業(yè)務,性能提升了約70%。
比起網(wǎng)上常見的多線程事務管理器,主要做了如下增強
1.支持在已存在事務下運行。
在很多場景下,我們可能會遇到多線程事務外還存在其他事物的場景下,我們需要支持兼容多種事務環(huán)境。
2.支持自定義提交時機。
有時候我們不希望事務立馬提交,希望他能夠和外圍事務保持一致,這時候可以將runAsyncButWaitUntilAllDown的immediatelyCommit參數(shù)寫成false,并手動調用multiplyThreadTransactionCommit方法去主動提交。
我們需要注意的地方,任何事都是有舍有得的,耗時的顯著降低是因為利用了更多的資源,比如線程資源和數(shù)據(jù)庫連接資源,尤其是數(shù)據(jù)庫連接資源,更是額外寶貴,我們一定要合理評估我們的每一項決策是否有意義,風險和回報是否成正比。
還有一點需要注意,在極高并發(fā)的情況下,多線程事務容易造成死鎖,因為當主事務開啟的情況下,他要為他下面的子線程事務開啟連接,當連接不夠時就容易造成循環(huán)等待。一個比較好的做法是提前獲得所有連接,并設置一個合理的超時時間
如果有小伙伴遇到了其他相關疑問,或者使用此代碼發(fā)現(xiàn)了問題,歡迎留言討論,共同進步。
站在巨人的肩膀上,在解決這一問題時,我也參考了很多網(wǎng)上其他的文章。我不否認我借鑒了許多,但是終究是在此基礎上有所突破,如果有感興趣的小伙伴可以去看一下,在此附上地址。
https://www.cnblogs.com/hefeng2014/p/17759000.html
https://d9bp4nr5ye.feishu.cn/wiki/OJdiwdYeXirkdBk3NV8c5evrnmh
到此這篇關于Spring多線程事務處理的文章就介紹到這了,更多相關Spring多線程事務處理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
jsp頁面中獲取servlet請求中的參數(shù)的辦法詳解
在JAVA WEB應用中,如何獲取servlet請求中的參數(shù),本文講解了jsp頁面中獲取servlet請求中的參數(shù)的辦法2018-03-03Java之多個線程順序循環(huán)執(zhí)行的幾種實現(xiàn)
這篇文章主要介紹了Java之多個線程順序循環(huán)執(zhí)行的幾種實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09關于Idea創(chuàng)建Java項目并引入lombok包的問題(lombok.jar包免費下載)
很多朋友遇到當idea創(chuàng)建java項目時,命名安裝了lombok插件卻不能使用注解,原因有兩個大家可以參考下本文,本文對每種原因分析給出了解決方案,需要的朋友參考下吧2021-06-06SpringBoot項目獲取統(tǒng)一前綴配置及獲取非確定名稱配置方法
在SpringBoot項目中,使用@ConfigurationProperties注解可獲取統(tǒng)一前綴的配置,具體做法是創(chuàng)建配置類,使用prefix屬性指定配置的前綴,本文給大家介紹SpringBoot項目獲取統(tǒng)一前綴配置以及獲取非確定名稱配置方法,感興趣的朋友跟隨小編一起看看吧2024-09-09Java連接 JDBC基礎知識(操作數(shù)據(jù)庫:增刪改查)
這篇文章主要介紹了Java連接 JDBC基礎知識,包括操作數(shù)據(jù)庫之增刪改查操作,需要的朋友可以參考下2021-04-04