Spring TransactionalEventListener事務(wù)未提交讀取不到數(shù)據(jù)的解決
一、背景
業(yè)務(wù)處理過程,發(fā)現(xiàn)了以下問題,代碼一是原代碼能正常執(zhí)行,代碼二是經(jīng)過迭代一次非正常執(zhí)行代碼
- 代碼一:以下代碼開啟線程后,代碼正常執(zhí)行
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); @Transactional public Long test() { // ...... // 插入記錄 Long studentId = studentService.insert(student); // 異步線程 writeStatisticsData(studentId); return studentId; } private void writeStatisticsData(Long studentId) { executor.execute(() -> { Student student = studentService.findById(studentId); //........ }); }
- 代碼二:以下代碼開啟線程后,代碼不正常執(zhí)行
@Transactional public Long test() { // ...... // 插入記錄 Long studentId = studentService.insert(student); // 異步線程 writeStatisticsData(studentId); // 插入學(xué)生地址記錄 Long addressId = addressService.insert(address); return studentId; } private void writeStatisticsData(Long studentId) { executor.execute(() -> { Student student = studentService.findById(studentId); //........ }); }
二、問題分析
這里使用了spring事務(wù),顯然需要考慮事務(wù)的隔離級別
2.1、mysql隔離級別
查看mysql隔離級別
SELECT @@tx_isolation; READ-COMMITTED
讀提交,即在事務(wù)A插入數(shù)據(jù)過程中,事務(wù)B在A提交之前讀取A插入的數(shù)據(jù)讀取不到,而B在A提交之后再去讀就會讀取到A插入的數(shù)據(jù),也即Read Committed不能保證在一個事務(wù)中每次讀都能讀到相同的數(shù)據(jù),因為在每次讀數(shù)據(jù)之后其他并發(fā)事務(wù)可能會對剛才讀到的數(shù)據(jù)進(jìn)行修改。
2.2、問題原因分析
- 代碼一正常運行的原因
由于mysql事務(wù)的隔離級別是讀提交,test方法在開啟異步線程后,異步線程也開啟了事務(wù),同時以讀者身份去讀 test 方法中插入的 student 記錄,但此時 test 方法已經(jīng)提交了事務(wù),所以可以讀取到 student 記錄(即在異步方法中可以讀取到 student 記錄),但此代碼有風(fēng)險,若事務(wù)提交的時間晚一點,異步線程也有可能讀取不到 student 記錄。
- 代碼二不能正常運行的原因
經(jīng)過上面分析,很明顯異步方法中不能讀取到 student 記錄,由于代碼二在異步線程下面又執(zhí)行了其他操作,延時了test方法中事務(wù)的提交,所以代碼二不能正常運行。
三、解決問題方案
解決思路是在事務(wù)提交后再做其他的處理(如異步發(fā)消息處理等),這里還是從Spring執(zhí)行事務(wù)的過程中入手,Spring事務(wù)的處理過程不再分析,這里直接看Spring事務(wù)增強(qiáng)器TransactionInterceptor的核心處理流程,源碼如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // 獲取事務(wù)屬性 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); //加載配置中配置的TransactionManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 聲明式事務(wù)的處理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; //...... retVal = invocation.proceedWithInvocation(); //...... commitTransactionAfterReturning(txInfo); return retVal; } else { // 編程式事務(wù)的處理...... } //...... }
這里主要看聲明式事務(wù)的處理,因為編程式事務(wù)的處理及提交都是用戶在編碼中進(jìn)行控制。在聲明式事務(wù)處理中,當(dāng)方法執(zhí)行完后,會執(zhí)行 commitTransactionAfterReturning 方法來進(jìn)行提交事務(wù),該方法在 TransactionAspectSupport 類中,源碼如下:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
再看 commit 方法,該方法在 AbstractPlatformTransactionManager 類中,源碼如下:
public final void commit(TransactionStatus status) throws TransactionException { // 這里省略很多代碼,如事務(wù)回滾...... processCommit(defStatus); } private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { // 提交事務(wù) doCommit(status); } //...... } catch (......) { // 事務(wù)異常處理...... } try { // 事務(wù)提交成功后的處理-----這里是重點 triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } } private void triggerAfterCommit(DefaultTransactionStatus status) { if (status.isNewSynchronization()) { TransactionSynchronizationUtils.triggerAfterCommit(); } }
最終會走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中
public static void triggerAfterCommit() { invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations()); } public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) { if (synchronizations != null) { for (TransactionSynchronization synchronization : synchronizations) { synchronization.afterCommit(); } } }
上面會把緩存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按順序來執(zhí)行 afterCommit 方法,其中 TransactionSynchronization 以集合形式緩存在 TransactionSynchronizationManager 的 ThreadLocal 中。
3.1、方式一
經(jīng)過上面分析,只需要代碼中重新生成個 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解決方案,如下:
private void writeStatisticsData(Long studentId) { if(TransactionSynchronizationManager.isActualTransactionActive()) { // 當(dāng)前存在事務(wù) TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { executor.execute(() -> {Student student = studentService.findById(studentId); //........ }); }}); } else { // 當(dāng)前不存在事務(wù) executor.execute(() -> {Student student = studentService.findById(studentId); //........ }); } }
3.2、方式二
使用 @TransactionalEventListener 結(jié)合 Spring事件監(jiān)聽機(jī)制,該注解自從Spring4.2版本開始有的,如下:
// 事件 public class StudentEvent extends ApplicationEvent { public StudentEvent(Long studentId) { super(studentId); } } // 監(jiān)聽器 public class StudentEventListener{ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void writeStatisticsData(StudentEvent studentEvent) { executor.execute(() -> { Student student = studentService.findById(studentEvent.getSource()); //........ }); } } @Service public class StudentService { // Spring4.2之后,ApplicationEventPublisher自動被注入到容器中,采用Autowired即可獲取 @Autowired private ApplicationEventPublisher applicationEventPublisher; @Transactional public Long test() { // ...... // 插入記錄 Long studentId = studentService.insert(student); // 發(fā)布事件 applicationEventPublisher.publishEvent(new StudentEvent(studentId)); // 插入學(xué)生地址記錄 Long addressId = addressService.insert(address); return studentId; } }
原理分析
Spring Bean在加載配置文件時,會使用 AnnotationDrivenBeanDefinitionParser 來解析 annotation-driven 標(biāo)簽,如下:
public class TxNamespaceHandler extends NamespaceHandlerSupport { //...... @Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); } }
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser { @Override public BeanDefinition parse(Element element, ParserContext parserContext) { // 重點——將TransactionalEventListenerFactory加入到容器中 registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; } private void registerTransactionalEventListenerFactory(ParserContext parserContext) { RootBeanDefinition def = new RootBeanDefinition(); def.setBeanClass(TransactionalEventListenerFactory.class); parserContext.registerBeanComponent(new BeanComponentDefinition(def, TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)); } }
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered { //省略部分代碼...... @Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); } }
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter { //省略部分代碼...... @Override public void onApplicationEvent(ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive()) { // 事務(wù)存在時,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存集合中 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); } else if (this.annotation.fallbackExecution()) { //....... } processEvent(event); } else { // 當(dāng)前不存在事務(wù)什么也不做 } }
上述 @TransactionalEventListener 本質(zhì)上是一個 @EventListener,TransactionalEventListenerFactory類會將每一個掃描到的方法有TransactionalEventListener注解包裝成ApplicationListenerMethodTransactionalAdapter對象,通過ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若當(dāng)前存在事務(wù),就會生成TransactionSynchronization并加入到 TransactionSynchronizationManager的緩存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 結(jié)合 Spring事件監(jiān)聽機(jī)制,并使用到異步方式感覺有點別扭,這里是為了說明問題)。
四、使用案例
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build()); @Override @Transactional(rollbackFor = Exception.class) public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) { // 數(shù)據(jù)庫代碼... // 異步代碼 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { log.warn("afterCommit..."); executorService.execute(() -> { // 異步業(yè)務(wù) execApiCache(api); }); }}); return ResultUtil.buildSucc(); }
Ps:setDaemon(false) 注意這里守護(hù)線程標(biāo)記必須設(shè)置為 false,否則主線程執(zhí)行完,異步線程沒執(zhí)行完的話,異步線程會馬上被中斷、關(guān)閉,所以這里不能設(shè)置成守護(hù)(用戶)線程。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringSecurity自動登錄流程與實現(xiàn)詳解
這篇文章主要介紹了SpringSecurity自動登錄流程與實現(xiàn)詳解,所謂的自動登錄是在訪問鏈接時瀏覽器自動攜帶上了Cookie中的Token交給后端校驗,如果刪掉了Cookie或者過期了同樣是需要再次驗證的,需要的朋友可以參考下2024-01-01SpringBoot整合Redis實現(xiàn)訪問量統(tǒng)計的示例代碼
本文主要介紹了SpringBoot整合Redis實現(xiàn)訪問量統(tǒng)計的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02mybatis打印的sql日志不寫入到log文件的問題及解決
這篇文章主要介紹了mybatis打印的sql日志不寫入到log文件的問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08SpringBoot中使用com.alibaba.druid.filter.config.ConfigTools對數(shù)據(jù)庫
這篇文章主要介紹了SpringBoot中使用com.alibaba.druid.filter.config.ConfigTools對數(shù)據(jù)庫密碼加密的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01