Spring多線程事務(wù)處理解決方案
一、背景
本文主要介紹了spring多線程事務(wù)的解決方案,心急的小伙伴可以跳過上面的理論介紹分析部分直接看最終解決方案。
在我們?nèi)粘5臉I(yè)務(wù)活動(dòng)中,經(jīng)常會(huì)出現(xiàn)大規(guī)模的修改插入操作,比如在3.0的活動(dòng)賽事創(chuàng)建,涉及到十幾張表的插入(一張表可能插入一行或者多行數(shù)據(jù)),由于單線程模型的關(guān)系,所有的sql都是串行,即后面的sql必須都要等到前面的sql執(zhí)行完成才能繼續(xù)。但是在很多場(chǎng)景下,sql的執(zhí)行順序并不影響業(yè)務(wù)的結(jié)果,面對(duì)這樣的場(chǎng)景,我們很自然的想到了使用異步的方式去處理,可是我們同時(shí)又希望整個(gè)創(chuàng)建操作是事務(wù)性的,即要全部成功,要么全部失敗,但是單純的使用異步線程并不能達(dá)到我們理想的效果。
這個(gè)時(shí)候,我們需要一種多線程下保證事務(wù)的解決方案。
代碼片段,大量的同步保存操作
public void much(){
//業(yè)務(wù)操作1
doBusiness1();
//業(yè)務(wù)操作2
doBusiness2();
//業(yè)務(wù)操作3
doBusiness3();
//業(yè)務(wù)操作4
doBusiness4();
}
private void doBusiness1() {
//執(zhí)行sql1
//執(zhí)行sql2
//執(zhí)行sql3
//執(zhí)行sql4
}每個(gè)業(yè)務(wù)操作可以是相關(guān)聯(lián)的,也有可能是完全無關(guān)的,但如果做成異步的話我們就無法保證事務(wù),怎么去解決這個(gè)問題呢?
二、理論先行
1.事務(wù)介紹
我們先確定spring事務(wù)的本質(zhì)是什么,spring本身不支持事務(wù),spring實(shí)現(xiàn)事務(wù)只是對(duì)我們?cè)械臉I(yè)務(wù)邏輯做了一層包裝,他替我們決定了什么時(shí)候開啟事務(wù),什么情況下應(yīng)該向數(shù)據(jù)庫提交,什么時(shí)候回滾,及實(shí)現(xiàn)我們?cè)O(shè)置的一些事務(wù)參數(shù),包括回滾的條件,傳播類型等。
我們所熟知的spring事務(wù)有兩種主流的解決方式,一種是聲明式事務(wù),一種是編程式事務(wù)。
先來講我們最常用的聲明式事務(wù)。
1.1聲明式事務(wù)
聲明式事務(wù)就是我們最常用的@Transactional注解,通常我們只需要在我們想交由spring控制的事務(wù)方法上加上注解即可,這個(gè)注解有一些重要的參數(shù),由于不是本文重點(diǎn),就不在此展開。這是一個(gè)經(jīng)典的spring的aop實(shí)現(xiàn),為了弄清楚在加上@Transactional注解后spring到底為我們做了什么,我們可以從兩方面入手,一是spring如何給我們生成相應(yīng)的代理對(duì)象,二是這個(gè)代理對(duì)象為我們做了什么。
事務(wù)的開始是由@EnableTransactionManagement 注解產(chǎn)生,這個(gè)注解在運(yùn)行時(shí)會(huì)導(dǎo)入TransactionManagementConfigurationSelector這個(gè)類,這個(gè)類本質(zhì)上是一個(gè)ImportSelector,他根據(jù)adviceMode將特定的配置類導(dǎo)入進(jìn)去,分別為AutoProxyRegistrar 后置處理器和ProxyTransactionManagementConfiguration Advisor。
AutoProxyRegistrar 實(shí)現(xiàn)了ImportBeanDefinitionRegistrar 重寫了registerBeanDefinitions 方法
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
for (String annType : annTypes) {
// ...
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
}
// ...
}
@Nullable
public static BeanDefinition registerAutoProxyCreatorIfNecessary(
BeanDefinitionRegistry registry, @Nullable Object source) {
return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}。該方法最終注入了InfrastructureAdvisorAutoProxyCreator。InfrastructureAdvisorAutoProxyCreator這個(gè)類就是一個(gè)bean的后置處理器,最終的作用就是處理需要的代理對(duì)象。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// ...
// 拿當(dāng)前bean去匹配容器中的 Advisors,如果找到符合的就生成代理對(duì)象
// Create proxy if we have advice.
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}ProxyTransactionManagementConfiguration的作用就是來生成具體的Advisor,他注冊(cè)了三個(gè)bean,
- 該類主要完成以下幾個(gè)任務(wù):
- 創(chuàng)建TransactionAttributeSource對(duì)象:用于解析@Transactional注解并生成事務(wù)屬性。
- 創(chuàng)建TransactionInterceptor對(duì)象:用于創(chuàng)建事務(wù)通知,將事務(wù)屬性應(yīng)用到目標(biāo)方法,這其實(shí)就是一個(gè)事務(wù)模板,如下所示
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
//TransactionAttributeSource內(nèi)部保存著當(dāng)前類某個(gè)方法對(duì)應(yīng)的TransactionAttribute---事務(wù)屬性源
//可以看做是一個(gè)存放TransactionAttribute與method方法映射的池子
TransactionAttributeSource tas = getTransactionAttributeSource();
//獲取當(dāng)前事務(wù)方法對(duì)應(yīng)的TransactionAttribute
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//定位TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
.....
//類型轉(zhuǎn)換為局部事務(wù)管理器
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
//TransactionManager根據(jù)TransactionAttribute創(chuàng)建事務(wù)后返回
//TransactionInfo封裝了當(dāng)前事務(wù)的信息--包括TransactionStatus
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
//繼續(xù)執(zhí)行過濾器鏈---過濾鏈最終會(huì)調(diào)用目標(biāo)方法
//因此可以理解為這里是調(diào)用目標(biāo)方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
//目標(biāo)方法拋出異常則進(jìn)行判斷是否需要回滾
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清除當(dāng)前事務(wù)信息
cleanupTransactionInfo(txInfo);
}
...
//正常返回,那么就正常提交事務(wù)唄(當(dāng)然還是需要判斷TransactionStatus狀態(tài)先)
commitTransactionAfterReturning(txInfo);
return retVal;
}
...- 創(chuàng)建TransactionAdvisor對(duì)象:將事務(wù)通知和切點(diǎn)(Pointcut)組合成Advisor。
- 創(chuàng)建TransactionAttributeSourceAdvisor對(duì)象:將事務(wù)屬性和切點(diǎn)組合成Advisor
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource);
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
// TransactionAttributeSource 是一個(gè)接口,具體注入的是 Annotationxxxx
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}觀察TransactionAdvisor代碼實(shí)現(xiàn)@SuppressWarnings("serial")
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
@Nullable
private TransactionAttributeSource transactionAttributeSource;
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return transactionAttributeSource;
}
};
/**
* Set the transaction attribute source which is used to find transaction
* attributes. This should usually be identical to the source reference
* set on the transaction interceptor itself.
* @see TransactionInterceptor#setTransactionAttributeSource
*/
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionAttributeSource = transactionAttributeSource;
}
/**
* Set the {@link ClassFilter} to use for this pointcut.
* Default is {@link ClassFilter#TRUE}.
*/
public void setClassFilter(ClassFilter classFilter) {
this.pointcut.setClassFilter(classFilter);
}
@Override
public Pointcut getPointcut() {
return this.pointcut;
}可以見到里面已經(jīng)包含了pointcut,這就能將我們需要被增加的事務(wù)方法找出。
ProxyTransactionManagementConfiguration負(fù)責(zé)將需要包裝的bean和方法找出并包裝成advisor,InfrastructureAdvisorAutoProxyCreator根據(jù)advisor生成相應(yīng)的代理對(duì)象。
小結(jié):InfrastructureAdvisorAutoProxyCreator遍歷容器中的bean,嘗試去自動(dòng)代理,匹配的工作就交由advisor中的point,如果匹配成功就為其創(chuàng)建代理對(duì)象,這個(gè)代理對(duì)象中放入了TransactionInterceptor攔截器
,等到相關(guān)方法調(diào)用時(shí),調(diào)用的是代理對(duì)象的方法,然后通過責(zé)任鏈模式通過TransactionInterceptor處理,以此來進(jìn)行事務(wù)的操作。
聲明式事務(wù)的介紹先到這里,接下來我們來介紹下編程式事務(wù)。
1.2編程式事務(wù)
編程式事務(wù)的核心就是將spring為我們做好的那些步驟拆出來,交由開發(fā)者去控制事務(wù)何時(shí)開啟、提交、回滾,他的運(yùn)行本質(zhì)和聲明式事務(wù)并沒有兩樣。
模板如下
public class TransactionMain {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
test();
}
private static void test() {
DataSource dataSource = getDS();
JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
//JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
//包括隔離級(jí)別和傳播行為等
DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
//開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
TransactionStatus ts = jtm.getTransaction(transactionDef);
//進(jìn)行業(yè)務(wù)邏輯操作
try {
update(dataSource);
jtm.commit(ts);
}catch (Exception e){
jtm.rollback(ts);
System.out.println("發(fā)生異常,我已回滾");
}
}
private static void update(DataSource dataSource) throws Exception {
JdbcTemplate jt = new JdbcTemplate();
jt.setDataSource(dataSource);
jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
throw new Exception("我是來搗亂的");
}
}三、方案探索
1.直接使用多線程
我們?cè)陂_啟代碼中事務(wù),并在業(yè)務(wù)邏輯中直接使用多線程,是否能保證事務(wù)?
@Transactional
public void testDirect() {
new Thread(()->{
Per per = new Per();
per.setName("t1");
perService.save(per);
}).start();
new Thread(()->{
Per per1 = new Per();
per1.setName("t2");
perService.save(per1);
throw new RuntimeException("Exception test");
}).start();
}顯然,這種方式并不能保證事務(wù),哪怕加上了事務(wù)注解,因?yàn)樽泳€程拋出的異常并不能在主線程中捕獲,也不能被其他線程感知到。
2.事務(wù)模板中使用多線程
package com.user.util;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
/**
* 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源
*/
private final DataSource dataSource;
public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
if(executor==null){
throw new IllegalArgumentException("線程池不能為空");
}
DataSourceTransactionManager transactionManager = getTransactionManager();
//是否發(fā)生了異常
AtomicBoolean ex=new AtomicBoolean();
List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
tasks.forEach(task->{
taskFutureList.add(CompletableFuture.runAsync(
() -> {
try{
//1.開啟新事務(wù)
transactionStatusList.add(openNewTransaction(transactionManager));
//2.異步任務(wù)執(zhí)行
task.run();
}catch (Throwable throwable){
//打印異常
throwable.printStackTrace();
//其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記
ex.set(Boolean.TRUE);
//其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
}
}
, executor)
);
});
try {
//阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//發(fā)生了異常則進(jìn)行回滾操作,否則提交
if(ex.get()){
System.out.println("發(fā)生異常,全部事務(wù)回滾");
transactionStatusList.forEach(transactionManager::rollback);
}else {
System.out.println("全部事務(wù)正常提交");
transactionStatusList.forEach(transactionManager::commit);
}
}
private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
//JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
//包括隔離級(jí)別和傳播行為等
DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
//開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
return transactionManager.getTransaction(transactionDef);
}
private DataSourceTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
}測(cè)試
public void test(){
List<Runnable> tasks=new ArrayList<>();
tasks.add(()->{
Per per = new Per();
per.setName("t1");
perService.save(per);
});
tasks.add(()->{
Per per = new Per();
per.setName("t2");
perService.save(per);
});
multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}執(zhí)行結(jié)果
java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread
at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:198) ~[spring-tx-5.3.10.jar:5.3.10]
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:371) ~[spring-jdbc-5.3.10.jar:5.3.10]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:992) ~[spring-tx-5.3.10.jar:5.3.10]
at org.springframework.transaction.suppoAbstractPlatformTransactionrt.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager
結(jié)果報(bào)了這個(gè)錯(cuò),這個(gè)錯(cuò)誤信息室找不到綁定在線程上的key為HikariDataSource的資源,因?yàn)槭聞?wù)資源都是綁定在線程上的,當(dāng)事務(wù)提交或者回滾時(shí),他需要尋找綁定在當(dāng)前線程上的資源,如果找不到,就會(huì)報(bào)錯(cuò)。
原理剖析:
首先我們找到綁定線程資源的關(guān)鍵方法org.springframework.transaction.support.TransactionSynchronizationManager#bindResource
/**
* Bind the given resource for the given key to the current thread.
* @param key the key to bind the value to (usually the resource factory)
* @param value the value to bind (usually the active resource object)
* @throws IllegalStateException if there is already a value bound to the thread
* @see ResourceTransactionManager#getResourceFactory()
*/
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
// set ThreadLocal Map if none found
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// Transparently suppress a ResourceHolder that was marked as void...
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException(
"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
}
}根據(jù)debug會(huì)發(fā)現(xiàn),spring在開啟事務(wù)時(shí)會(huì)自動(dòng)為我們調(diào)用這個(gè)方法,綁定key為HikariDataSource,value為ConnectionHolder到threadlocal中。第二次sql執(zhí)行時(shí)會(huì)綁定key為DefaultSqlSessionFactory,value為DefaultSqlSessionFactory。


既然講到了事務(wù)資源的綁定時(shí)機(jī),下面就順便講一下這兩種資源在何時(shí)釋放。我們?cè)倩仡櫼幌率聞?wù)的執(zhí)行流程及機(jī)制。spring處理事務(wù)的原理就是基于aop,每個(gè)需要實(shí)現(xiàn)事務(wù)的方法都要通過TransactionInterceptor這個(gè)攔截器,通過這個(gè)攔截器去實(shí)現(xiàn)事務(wù)增強(qiáng)。
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
//重點(diǎn),這里注冊(cè)了一個(gè)回調(diào),最后會(huì)調(diào)回下面
//父類實(shí)現(xiàn)
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
//原始方法
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}他最終會(huì)交給他的父類模板類org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction實(shí)現(xiàn),在來看一下這個(gè)類為我們處理了什么,直接看重點(diǎn)
/**
* General delegate for around-advice-based subclasses, delegating to several other template
* methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
* as well as regular {@link PlatformTransactionManager} implementations and
* {@link ReactiveTransactionManager} implementations for reactive return types.
* @param method the Method being invoked
* @param targetClass the target class that we're invoking the method on
* @param invocation the callback to use for proceeding with the target invocation
* @return the return value of the method, if any
* @throws Throwable propagated from the target invocation
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
//說人話就是獲取事務(wù)資源,裝配事務(wù)管理器
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
//事務(wù)相關(guān)信息,包括傳播級(jí)別,什么異常下回滾等
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//就是他的子類注冊(cè)的回調(diào),真正的業(yè)務(wù)邏輯
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//回滾
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清理事務(wù)信息
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
//提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
...
}繼續(xù)往下,觀察commitTransactionAfterReturning(txInfo)的實(shí)現(xiàn),發(fā)現(xiàn)這一步是由事務(wù)管理器完成的。
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}進(jìn)而可以推斷,rollback操作也是同樣的道理,有興趣的小伙伴可以自己debug一下,繼續(xù)走下去,觀察事務(wù)管理器為我們做了什么。
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}到此,spring的整個(gè)事務(wù)流程就已經(jīng)非常清晰了,我們想要實(shí)現(xiàn)多事務(wù)管理的方法也找到了,難就是去控制事務(wù)的資源。只要我們拿到了相應(yīng)的事務(wù)資源,然后在創(chuàng)建自己的事務(wù)管理器控制事務(wù)何時(shí)提交或者回滾,這樣我們就可以實(shí)現(xiàn)一個(gè)多線程同時(shí)提交回滾,類似于二階段提交的操作,來達(dá)到多線程事務(wù)的統(tǒng)一。
3.多線程事務(wù)管理器
不多說,直接上代碼看最終版本
package com.controller;
import lombok.Builder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils;
import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多線程事務(wù)管理
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
/**
* 如果是多數(shù)據(jù)源的情況下,需要指定具體是哪一個(gè)數(shù)據(jù)源
*/
private final DataSource dataSource;
private final static ThreadLocal<Boolean> immediatelyCommitFlag = new ThreadLocal<>();
private final static ThreadLocal<List<TransactionStatus>> transactionStatusListThreadLocal = new ThreadLocal<>();
private final static ThreadLocal<List<TransactionResource>> transactionResourcesthreadLocal = new ThreadLocal<>();
private final static ThreadLocal<Map<Object, Object>> mainNativeResourceThreadLocal = new ThreadLocal<>();
/**
* 多線程下事務(wù)執(zhí)行
*
* @param tasks 任務(wù)列表
* @param immediatelyCommit 是否需要立即提交
*/
public List<CompletableFuture> runAsyncButWaitUntilAllDown(List<Runnable> tasks, Boolean immediatelyCommit) {
Executor executor = Executors.newCachedThreadPool();
DataSourceTransactionManager transactionManager = getTransactionManager();
//是否發(fā)生了異常
AtomicBoolean ex = new AtomicBoolean();
List<CompletableFuture> taskFutureList = new CopyOnWriteArrayList<>();
List<TransactionStatus> transactionStatusList = new CopyOnWriteArrayList<>();
List<TransactionResource> transactionResources = new CopyOnWriteArrayList<>();
//記錄原生主事務(wù)資源
//這一步可能在原生sql執(zhí)行前,也可能在原生sql執(zhí)行后,所以這個(gè)資源可能不夠充分,需要在下面繼續(xù)處理
//如果返回的是原資源集合的引用,下面一步可以不用
Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
if (!CollectionUtils.isEmpty(resourceMap)) {
mainNativeResourceThreadLocal.set(new HashMap<>(resourceMap));
}
Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
Executor finalExecutor = executor;
AtomicInteger atomicInteger = new AtomicInteger(0);
tasks.forEach(task -> {
taskFutureList.add(CompletableFuture.runAsync(
() -> {
log.info("任務(wù)開始");
try {
//1.開啟新事務(wù)
TransactionStatus transactionStatus = openNewTransaction(transactionManager);
log.info("開啟新事務(wù) successfully");
transactionStatusList.add(transactionStatus);
atomicInteger.incrementAndGet();
System.out.println("atomicInteger.get()"+atomicInteger.incrementAndGet());
System.out.println(transactionStatus);
//2.異步任務(wù)執(zhí)行
task.run();
log.info("異步任務(wù)執(zhí)行 successfully");
//3.繼續(xù)事務(wù)資源復(fù)制,因?yàn)樵趕ql執(zhí)行是會(huì)產(chǎn)生新的資源對(duì)象
transactionResources.add(TransactionResource.copyTransactionResource());
} catch (Throwable throwable) {
log.info("任務(wù)執(zhí)行異常"+throwable.getMessage());
log.error("任務(wù)執(zhí)行異常",throwable);
//其中某個(gè)異步任務(wù)執(zhí)行出現(xiàn)了異常,進(jìn)行標(biāo)記
ex.set(Boolean.TRUE);
//其他任務(wù)還沒執(zhí)行的不需要執(zhí)行了
taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
}
}
, finalExecutor)
);
});
try {
//阻塞直到所有任務(wù)全部執(zhí)行結(jié)束---如果有任務(wù)被取消,這里會(huì)拋出異常滴,需要捕獲
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
} catch (InterruptedException | ExecutionException e) {
log.info("任務(wù)被取消");
log.error("任務(wù)被取消",e);
}
//發(fā)生了異常則進(jìn)行回滾操作,否則提交
if (ex.get()) {
log.info("發(fā)生異常,全部事務(wù)回滾");
for (int i = 0; i < transactionStatusList.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
Map<Object, Object> rollBackResourceMap = TransactionSynchronizationManager.getResourceMap();
log.info("回滾前事務(wù)資源size{},本身{}",rollBackResourceMap.size(),rollBackResourceMap);
transactionManager.rollback(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
} else {
if (immediatelyCommit) {
log.info("全部事務(wù)正常提交");
for (int i = 0; i < transactionStatusList.size(); i++) {
//transactionResources.get(i).autoWiredTransactionResource();
Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap();
log.info("提交前事務(wù)資源size{},本身{}",commitResourceMap.size(),commitResourceMap);
transactionManager.commit(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
} else {
//緩存全部待提交數(shù)據(jù)
immediatelyCommitFlag.set(immediatelyCommit);
transactionResourcesthreadLocal.set(transactionResources);
transactionStatusListThreadLocal.set(transactionStatusList);
}
}
//交還給主事務(wù)
if (immediatelyCommit) {
mainTransactionResourceBack(!ex.get());
}
return taskFutureList;
}
public void multiplyThreadTransactionCommit() {
try {
Boolean immediatelyCommit = immediatelyCommitFlag.get();
if (immediatelyCommit) {
throw new IllegalStateException("immediatelyCommit cant call multiplyThreadTransactionCommit");
}
//提交
//獲取存儲(chǔ)的事務(wù)資源和狀態(tài)
List<TransactionResource> transactionResources = transactionResourcesthreadLocal.get();
List<TransactionStatus> transactionStatusList = transactionStatusListThreadLocal.get();
if (CollectionUtils.isEmpty(transactionResources) || CollectionUtils.isEmpty(transactionStatusList)) {
throw new IllegalStateException("transactionResources or transactionStatusList is empty");
}
//重新提交
DataSourceTransactionManager transactionManager = getTransactionManager();
log.info("全部事務(wù)正常提交");
for (int i = 0; i < transactionStatusList.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap();
log.info("提交前事務(wù)資源size{},本身{}",commitResourceMap.size(),commitResourceMap);
transactionManager.commit(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
} catch (Exception e) {
mainTransactionResourceBack(false);
log.error("multiplyThreadTransactionCommit fail", e);
} finally {
transactionResourcesthreadLocal.remove();
transactionStatusListThreadLocal.remove();
immediatelyCommitFlag.remove();
}
//交還給主事務(wù)
mainTransactionResourceBack(true);
}
//主線程事務(wù)資源返還
public void mainTransactionResourceBack(Boolean subTransactionSuccess) {
if (CollectionUtils.isEmpty(mainNativeResourceThreadLocal.get())) {
//清除數(shù)據(jù)
mainNativeResourceThreadLocal.remove();
return;
}
Map<Object, Object> nativeResource = new HashMap<>(mainNativeResourceThreadLocal.get());
Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
log.info("當(dāng)前線程資事務(wù)源size{}--------------------------------{}",resourceMap.size(), resourceMap);
log.info("原生線程事務(wù)資源size{}--------------------------------{}",nativeResource.size(), nativeResource);
//已經(jīng)被綁定的資源不能重復(fù)綁定
if (!CollectionUtils.isEmpty(resourceMap)) {
for (Object o : resourceMap.keySet()) {
if (nativeResource.containsKey(o)) {
nativeResource.remove(o);
}
}
}
nativeResource.forEach((k,v)->{
if (!(k instanceof DataSource)){
log.info("nativeResource 沒有 DataSource");
}
});
//交還不能綁定factory
nativeResource.forEach((k,v)->{
if (k instanceof DataSource){
TransactionSynchronizationManager.bindResource(k,v);
}
});
Map<Object, Object> finResource = TransactionSynchronizationManager.getResourceMap();
log.info("主線程最終事務(wù)源size{}--------------------------------{}",finResource.size(), finResource);
//防止未激活事務(wù)
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.initSynchronization();
}
//清除數(shù)據(jù)
mainNativeResourceThreadLocal.remove();
if (!subTransactionSuccess) {
throw new RuntimeException("子事務(wù)失敗,需要回滾");
}
}
private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
//JdbcTransactionManager根據(jù)TransactionDefinition信息來進(jìn)行一些連接屬性的設(shè)置
//包括隔離級(jí)別和傳播行為等
DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
//開啟一個(gè)新事務(wù)---此時(shí)autocommit已經(jīng)被設(shè)置為了false,并且當(dāng)前沒有事務(wù),這里創(chuàng)建的是一個(gè)新事務(wù)
return transactionManager.getTransaction(transactionDef);
}
private DataSourceTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
/**
* 保存當(dāng)前事務(wù)資源,用于線程間的事務(wù)資源COPY操作
*/
@Builder
private static class TransactionResource {
//事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
private Map<Object, Object> resources = new HashMap<>();
//下面五個(gè)屬性會(huì)在事務(wù)結(jié)束后被自動(dòng)清理,無需我們手動(dòng)清理
private Set<TransactionSynchronization> synchronizations = new HashSet<>();
private String currentTransactionName;
private Boolean currentTransactionReadOnly;
private Integer currentTransactionIsolationLevel;
private Boolean actualTransactionActive;
public static TransactionResource copyTransactionResource() {
return TransactionResource.builder()
//返回的是不可變集合,這里為了更加靈活,copy出一個(gè)集合過來
.resources(new HashMap<>(TransactionSynchronizationManager.getResourceMap()))
//如果需要注冊(cè)事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值
.synchronizations(new LinkedHashSet<>())
.currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
.currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
.currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
.actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
.build();
}
//裝配事務(wù)資源,為提交/回滾做儲(chǔ)備
public void autoWiredTransactionResource() {
//獲取當(dāng)前線程事務(wù)資源
Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
for (Object o : resourceMap.keySet()) {
if (resourceMap.containsKey(o)) {
//移除重復(fù)事務(wù)資源key,避免綁定報(bào)錯(cuò)
resources.remove(o);
}
}
boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
//綁定事務(wù)資源,注意 綁定是綁定到當(dāng)前主線程上,記得最后釋放交換主線程,再由主線程收回原有事務(wù)自選
resources.forEach(TransactionSynchronizationManager::bindResource);
//如果需要注冊(cè)事務(wù)監(jiān)聽者,這里記得修改--我們這里不需要,就采用默認(rèn)負(fù)責(zé)--spring事務(wù)內(nèi)部默認(rèn)也是這個(gè)值
//避免重復(fù)激活或者事務(wù)未激活
if (!synchronizationActive) {
TransactionSynchronizationManager.initSynchronization();
}
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
}
public void removeTransactionResource() {
Map<Object, Object> resourceMap = new HashMap<>(TransactionSynchronizationManager.getResourceMap());
//事務(wù)結(jié)束后默認(rèn)會(huì)移除集合中的DataSource作為key關(guān)聯(lián)的資源記錄
//DataSource如果重復(fù)移除,unbindResource時(shí)會(huì)因?yàn)椴淮嬖诖薻ey關(guān)聯(lián)的事務(wù)資源而報(bào)錯(cuò)
resources.keySet().forEach(key -> {
if (resourceMap.containsKey(key)) {
TransactionSynchronizationManager.unbindResource(key);
}
});
}
}
}驗(yàn)證
@Transactional
public String test(Integer par) {
log.info("get(" + par + ")");
if (par == 3 || par == 5 || par == 6) {
Per per2 = new Per();
per2.setName("t3");
per2.setGrou(Thread.currentThread().getName());
perService.save(per2);
}
List<Runnable> list = new ArrayList<>();
list.add(() -> {
Per per = new Per();
per.setName("t1");
per.setGrou(Thread.currentThread().getName());
log.info("任務(wù)開始save");
perService.save(per);
log.info("任務(wù)完成save");
if (par == 1) {
throw new RuntimeException();
}
});
list.add(() -> {
Per per1 = new Per();
per1.setName("t2");
per1.setGrou(Thread.currentThread().getName());
log.info("任務(wù)開始save");
perService.save(per1);
log.info("任務(wù)完成save");
if (par == 2) {
throw new RuntimeException();
}
});
log.info("runAsyncButWaitUntilAllDown start");
multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(list, false);
if (par == 4 || par == 5 || par == 6) {
Per per3 = new Per();
per3.setName("t4");
per3.setGrou(Thread.currentThread().getName());
perService.save(per3);
if (par == 6) {
throw new RuntimeException();
}
}
log.info("multiplyThreadTransactionCommit start");
multiplyThreadTransactionManager.multiplyThreadTransactionCommit();
return "ss";
}2024-03-11 16:00:22.048 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 提交前事務(wù)資源size2,本身{org.apache.ibatis.session.defaults.DefaultSqlSessionFactory@3c1f842f=org.mybatis.spring.SqlSessionHolder@f0c0875, HikariDataSource (HikariPool-1)=org.springframework.jdbc.datasource.ConnectionHolder@3b3508e5}
2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 當(dāng)前線程資事務(wù)源size0--------------------------------{}
2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 原生線程事務(wù)資源size2--------------------------------{org.apache.ibatis.session.defaults.DefaultSqlSessionFactory@3c1f842f=org.mybatis.spring.SqlSessionHolder@493bb8e6, HikariDataSource (HikariPool-1)=org.springframework.jdbc.datasource.ConnectionHolder@3950dc5b}
2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : nativeResource 沒有 DataSource
2024-03-11 16:00:22.141 INFO 44900 --- [ Thread-1] c.c.MultiplyThreadTransactionManager : 主線程最終事務(wù)源size1--------------------------------{HikariDataSource (HikariPool-1)=org.springframework.jdbc.datasource.ConnectionHolder@3950dc5b}有興趣的小伙伴可以做進(jìn)一步測(cè)試。
在本公司項(xiàng)目組中利用這個(gè)機(jī)制優(yōu)化現(xiàn)有的業(yè)務(wù),性能提升了約70%。
比起網(wǎng)上常見的多線程事務(wù)管理器,主要做了如下增強(qiáng)
1.支持在已存在事務(wù)下運(yùn)行。
在很多場(chǎng)景下,我們可能會(huì)遇到多線程事務(wù)外還存在其他事物的場(chǎng)景下,我們需要支持兼容多種事務(wù)環(huán)境。
2.支持自定義提交時(shí)機(jī)。
有時(shí)候我們不希望事務(wù)立馬提交,希望他能夠和外圍事務(wù)保持一致,這時(shí)候可以將runAsyncButWaitUntilAllDown的immediatelyCommit參數(shù)寫成false,并手動(dòng)調(diào)用multiplyThreadTransactionCommit方法去主動(dòng)提交。
我們需要注意的地方,任何事都是有舍有得的,耗時(shí)的顯著降低是因?yàn)槔昧烁嗟馁Y源,比如線程資源和數(shù)據(jù)庫連接資源,尤其是數(shù)據(jù)庫連接資源,更是額外寶貴,我們一定要合理評(píng)估我們的每一項(xiàng)決策是否有意義,風(fēng)險(xiǎn)和回報(bào)是否成正比。
還有一點(diǎn)需要注意,在極高并發(fā)的情況下,多線程事務(wù)容易造成死鎖,因?yàn)楫?dāng)主事務(wù)開啟的情況下,他要為他下面的子線程事務(wù)開啟連接,當(dāng)連接不夠時(shí)就容易造成循環(huán)等待。一個(gè)比較好的做法是提前獲得所有連接,并設(shè)置一個(gè)合理的超時(shí)時(shí)間
如果有小伙伴遇到了其他相關(guān)疑問,或者使用此代碼發(fā)現(xiàn)了問題,歡迎留言討論,共同進(jìn)步。
站在巨人的肩膀上,在解決這一問題時(shí),我也參考了很多網(wǎng)上其他的文章。我不否認(rèn)我借鑒了許多,但是終究是在此基礎(chǔ)上有所突破,如果有感興趣的小伙伴可以去看一下,在此附上地址。
https://www.cnblogs.com/hefeng2014/p/17759000.html
https://d9bp4nr5ye.feishu.cn/wiki/OJdiwdYeXirkdBk3NV8c5evrnmh
到此這篇關(guān)于Spring多線程事務(wù)處理的文章就介紹到這了,更多相關(guān)Spring多線程事務(wù)處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
jsp頁面中獲取servlet請(qǐng)求中的參數(shù)的辦法詳解
在JAVA WEB應(yīng)用中,如何獲取servlet請(qǐng)求中的參數(shù),本文講解了jsp頁面中獲取servlet請(qǐng)求中的參數(shù)的辦法2018-03-03
Java之多個(gè)線程順序循環(huán)執(zhí)行的幾種實(shí)現(xiàn)
這篇文章主要介紹了Java之多個(gè)線程順序循環(huán)執(zhí)行的幾種實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
關(guān)于Idea創(chuàng)建Java項(xiàng)目并引入lombok包的問題(lombok.jar包免費(fèi)下載)
很多朋友遇到當(dāng)idea創(chuàng)建java項(xiàng)目時(shí),命名安裝了lombok插件卻不能使用注解,原因有兩個(gè)大家可以參考下本文,本文對(duì)每種原因分析給出了解決方案,需要的朋友參考下吧2021-06-06
SpringBoot項(xiàng)目獲取統(tǒng)一前綴配置及獲取非確定名稱配置方法
在SpringBoot項(xiàng)目中,使用@ConfigurationProperties注解可獲取統(tǒng)一前綴的配置,具體做法是創(chuàng)建配置類,使用prefix屬性指定配置的前綴,本文給大家介紹SpringBoot項(xiàng)目獲取統(tǒng)一前綴配置以及獲取非確定名稱配置方法,感興趣的朋友跟隨小編一起看看吧2024-09-09
Java連接 JDBC基礎(chǔ)知識(shí)(操作數(shù)據(jù)庫:增刪改查)
這篇文章主要介紹了Java連接 JDBC基礎(chǔ)知識(shí),包括操作數(shù)據(jù)庫之增刪改查操作,需要的朋友可以參考下2021-04-04

