欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring TransactionalEventListener事務(wù)未提交讀取不到數(shù)據(jù)的解決

 更新時間:2021年09月16日 09:10:14   作者:程序員牧碼  
這篇文章主要介紹了Spring TransactionalEventListener事務(wù)未提交讀取不到數(shù)據(jù)的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

一、背景

業(yè)務(wù)處理過程,發(fā)現(xiàn)了以下問題,代碼一是原代碼能正常執(zhí)行,代碼二是經(jīng)過迭代一次非正常執(zhí)行代碼

  • 代碼一:以下代碼開啟線程后,代碼正常執(zhí)行
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
 
@Transactional
public Long test() {
  // ......
  // 插入記錄
  Long studentId = studentService.insert(student);
  // 異步線程
  writeStatisticsData(studentId);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
  • 代碼二:以下代碼開啟線程后,代碼不正常執(zhí)行
@Transactional
public Long test() {
  // ......
  // 插入記錄
  Long studentId = studentService.insert(student);
   // 異步線程
  writeStatisticsData(studentId);
  // 插入學(xué)生地址記錄
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}

二、問題分析

這里使用了spring事務(wù),顯然需要考慮事務(wù)的隔離級別

2.1、mysql隔離級別

查看mysql隔離級別

SELECT @@tx_isolation;
READ-COMMITTED

讀提交,即在事務(wù)A插入數(shù)據(jù)過程中,事務(wù)B在A提交之前讀取A插入的數(shù)據(jù)讀取不到,而B在A提交之后再去讀就會讀取到A插入的數(shù)據(jù),也即Read Committed不能保證在一個事務(wù)中每次讀都能讀到相同的數(shù)據(jù),因為在每次讀數(shù)據(jù)之后其他并發(fā)事務(wù)可能會對剛才讀到的數(shù)據(jù)進(jìn)行修改。

2.2、問題原因分析

  • 代碼一正常運行的原因

由于mysql事務(wù)的隔離級別是讀提交,test方法在開啟異步線程后,異步線程也開啟了事務(wù),同時以讀者身份去讀 test 方法中插入的 student 記錄,但此時 test 方法已經(jīng)提交了事務(wù),所以可以讀取到 student 記錄(即在異步方法中可以讀取到 student 記錄),但此代碼有風(fēng)險,若事務(wù)提交的時間晚一點,異步線程也有可能讀取不到 student 記錄。

  • 代碼二不能正常運行的原因

經(jīng)過上面分析,很明顯異步方法中不能讀取到 student 記錄,由于代碼二在異步線程下面又執(zhí)行了其他操作,延時了test方法中事務(wù)的提交,所以代碼二不能正常運行。

三、解決問題方案

解決思路是在事務(wù)提交后再做其他的處理(如異步發(fā)消息處理等),這里還是從Spring執(zhí)行事務(wù)的過程中入手,Spring事務(wù)的處理過程不再分析,這里直接看Spring事務(wù)增強(qiáng)器TransactionInterceptor的核心處理流程,源碼如下:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  // 獲取事務(wù)屬性
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  //加載配置中配置的TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  
  // 聲明式事務(wù)的處理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // 編程式事務(wù)的處理......
  }
  //......
}

這里主要看聲明式事務(wù)的處理,因為編程式事務(wù)的處理及提交都是用戶在編碼中進(jìn)行控制。在聲明式事務(wù)處理中,當(dāng)方法執(zhí)行完后,會執(zhí)行 commitTransactionAfterReturning 方法來進(jìn)行提交事務(wù),該方法在 TransactionAspectSupport 類中,源碼如下:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

再看 commit 方法,該方法在 AbstractPlatformTransactionManager 類中,源碼如下:

public final void commit(TransactionStatus status) throws TransactionException {
    // 這里省略很多代碼,如事務(wù)回滾......
		processCommit(defStatus);
}
 
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
        prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				}
				if (status.hasSavepoint()) {
					status.releaseHeldSavepoint();
				} else if (status.isNewTransaction()) {
          // 提交事務(wù)
					doCommit(status);
				}
				//......
			} catch (......) {
				// 事務(wù)異常處理......
			}
 
			
			try {
        // 事務(wù)提交成功后的處理-----這里是重點
				triggerAfterCommit(status);
			} finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}
		}
		finally {
			cleanupAfterCompletion(status);
    }
}
 
private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();
  }
}

最終會走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中

public static void triggerAfterCommit() {
  invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
 
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
  if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
    synchronization.afterCommit();
      }
  }
}

上面會把緩存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按順序來執(zhí)行 afterCommit 方法,其中 TransactionSynchronization 以集合形式緩存在 TransactionSynchronizationManager 的 ThreadLocal 中。

3.1、方式一

經(jīng)過上面分析,只需要代碼中重新生成個 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解決方案,如下:

private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            // 當(dāng)前存在事務(wù)
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
              }});
        } else {
            // 當(dāng)前不存在事務(wù)
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
        }
}

3.2、方式二

使用 @TransactionalEventListener 結(jié)合 Spring事件監(jiān)聽機(jī)制,該注解自從Spring4.2版本開始有的,如下:

// 事件
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
    }
}
 
// 監(jiān)聽器
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
    });
  }
}
 
@Service
public class StudentService {
  // Spring4.2之后,ApplicationEventPublisher自動被注入到容器中,采用Autowired即可獲取
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  
  @Transactional
  public Long test() {
    // ......
    // 插入記錄
    Long studentId = studentService.insert(student);
    // 發(fā)布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入學(xué)生地址記錄
    Long addressId = addressService.insert(address);
    return studentId;
  }
}

原理分析

Spring Bean在加載配置文件時,會使用 AnnotationDrivenBeanDefinitionParser 來解析 annotation-driven 標(biāo)簽,如下:

public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}
}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
  
  @Override
	public BeanDefinition parse(Element element, ParserContext parserContext) {
    // 重點——將TransactionalEventListenerFactory加入到容器中
		registerTransactionalEventListenerFactory(parserContext);
		String mode = element.getAttribute("mode");
		if ("aspectj".equals(mode)) {
			// mode="aspectj"
			registerTransactionAspect(element, parserContext);
		}
		else {
			// mode="proxy"
			AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
		}
		return null;
	}
  
  private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
		RootBeanDefinition def = new RootBeanDefinition();
		def.setBeanClass(TransactionalEventListenerFactory.class);
		parserContext.registerBeanComponent(new BeanComponentDefinition(def,
				TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
	}
}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
  
  //省略部分代碼......
  
	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
	}
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
  
   //省略部分代碼......
  
	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 事務(wù)存在時,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存集合中
			TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
			TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
		} else if (this.annotation.fallbackExecution()) {
			//.......
			}
			processEvent(event);
		} else {
			// 當(dāng)前不存在事務(wù)什么也不做
		}
	}

上述 @TransactionalEventListener 本質(zhì)上是一個 @EventListener,TransactionalEventListenerFactory類會將每一個掃描到的方法有TransactionalEventListener注解包裝成ApplicationListenerMethodTransactionalAdapter對象,通過ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若當(dāng)前存在事務(wù),就會生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 結(jié)合 Spring事件監(jiān)聽機(jī)制,并使用到異步方式感覺有點別扭,這里是為了說明問題)。

四、使用案例

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
        new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
 
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
 
    // 數(shù)據(jù)庫代碼...
 
    // 異步代碼
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            log.warn("afterCommit...");
            executorService.execute(() -> {
                // 異步業(yè)務(wù)
                execApiCache(api);
            });
    }});
 
    return ResultUtil.buildSucc();
}

Ps:setDaemon(false) 注意這里守護(hù)線程標(biāo)記必須設(shè)置為 false,否則主線程執(zhí)行完,異步線程沒執(zhí)行完的話,異步線程會馬上被中斷、關(guān)閉,所以這里不能設(shè)置成守護(hù)(用戶)線程。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • SpringSecurity自動登錄流程與實現(xiàn)詳解

    SpringSecurity自動登錄流程與實現(xiàn)詳解

    這篇文章主要介紹了SpringSecurity自動登錄流程與實現(xiàn)詳解,所謂的自動登錄是在訪問鏈接時瀏覽器自動攜帶上了Cookie中的Token交給后端校驗,如果刪掉了Cookie或者過期了同樣是需要再次驗證的,需要的朋友可以參考下
    2024-01-01
  • SpringBoot整合Redis實現(xiàn)訪問量統(tǒng)計的示例代碼

    SpringBoot整合Redis實現(xiàn)訪問量統(tǒng)計的示例代碼

    本文主要介紹了SpringBoot整合Redis實現(xiàn)訪問量統(tǒng)計的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • Java_異常類(錯誤和異常,兩者的區(qū)別介紹)

    Java_異常類(錯誤和異常,兩者的區(qū)別介紹)

    下面小編就為大家?guī)硪黄狫ava_異常類(錯誤和異常,兩者的區(qū)別介紹) 。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-09-09
  • Spring AOP統(tǒng)一功能處理示例代碼

    Spring AOP統(tǒng)一功能處理示例代碼

    AOP面向切面編程,它是一種思想,它是對某一類事情的集中處理,而AOP是一種思想,而Spring AOP是一個框架,提供了一種對AOP思想的實現(xiàn),它們的關(guān)系和loC與DI類似,這篇文章主要介紹了Spring AOP統(tǒng)一功能處理示例代碼,需要的朋友可以參考下
    2023-01-01
  • java基于UDP實現(xiàn)圖片群發(fā)功能

    java基于UDP實現(xiàn)圖片群發(fā)功能

    這篇文章主要為大家詳細(xì)介紹了java基于UDP實現(xiàn)圖片群發(fā)功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • Java實現(xiàn)解數(shù)獨的小程序

    Java實現(xiàn)解數(shù)獨的小程序

    最近在學(xué)習(xí)Java,然后上個月迷上了九宮格數(shù)獨,玩了幾天,覺得實在有趣,就想著能不能用編程來解決,于是就自己寫了個,還真解決了。下面這篇文章就給大家主要介紹了Java實現(xiàn)解數(shù)獨的小程序,需要的朋友可以參考借鑒。
    2017-01-01
  • 詳解Java中的時間處理與時間標(biāo)準(zhǔn)

    詳解Java中的時間處理與時間標(biāo)準(zhǔn)

    這篇文章主要為大家詳細(xì)介紹了三個時間標(biāo)準(zhǔn)GMT,CST,UTC的規(guī)定,以及Java進(jìn)行時間處理的相關(guān)知識,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-09-09
  • mybatis打印的sql日志不寫入到log文件的問題及解決

    mybatis打印的sql日志不寫入到log文件的問題及解決

    這篇文章主要介紹了mybatis打印的sql日志不寫入到log文件的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • SpringBoot中使用com.alibaba.druid.filter.config.ConfigTools對數(shù)據(jù)庫密碼加密的方法

    SpringBoot中使用com.alibaba.druid.filter.config.ConfigTools對數(shù)據(jù)庫

    這篇文章主要介紹了SpringBoot中使用com.alibaba.druid.filter.config.ConfigTools對數(shù)據(jù)庫密碼加密的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-01-01
  • 創(chuàng)建Java線程安全類的七種方法

    創(chuàng)建Java線程安全類的七種方法

    線程安全是指某個方法或某段代碼,在多線程中能夠正確的執(zhí)行,不會出現(xiàn)數(shù)據(jù)不一致或數(shù)據(jù)污染的情況,我們把這樣的程序稱之為線程安全的,反之則為非線程安全的,下面這篇文章主要給大家介紹了關(guān)于創(chuàng)建Java線程安全類的七種方法,需要的朋友可以參考下
    2022-06-06

最新評論