@TransactionalEventListener的使用和實現(xiàn)原理分析
一、問題描述
平時我們在完成某些數(shù)據(jù)的入庫后,發(fā)布了一個事件,此時使用的是@EventListener,然后在這個事件中,又去對剛才入庫的數(shù)據(jù)進行查詢,從而完成后續(xù)的操作。
例如(數(shù)據(jù)入庫=>對入庫數(shù)據(jù)進行查詢審核),這時候會發(fā)現(xiàn),查詢不到剛才入庫的數(shù)據(jù),這是因為事務還沒提交完成,在同一個事務當中,查詢不到才存入的數(shù)據(jù),那么就引出了下面的解決方式。
為了解決上述問題,Spring為我們提供了兩種方式:
(1) @TransactionalEventListener注解。
(2) 事務同步管理器TransactionSynchronizationManager。
以便我們可以在事務提交后再觸發(fā)某一事件來進行其他操作。
二、使用場景
在項目當中,我們有時候需要在執(zhí)行數(shù)據(jù)庫操作之后,發(fā)送消息或事件來異步調(diào)用其他組件執(zhí)行相應的操作,例如:
1.數(shù)據(jù)完成導入之后,發(fā)布審核事件,對入庫的數(shù)據(jù)進行審核。
2.用戶在完成注冊后發(fā)送激活碼。
3.配置修改后,發(fā)送更新配置的事件。
三、@TransactionalEventListener詳解
我們可以從命名上直接看出,它就是個EventListener,
在Spring4.2+,有一種叫做@TransactionEventListener的方式,能夠?qū)崿F(xiàn)在控制事務的同時,完成對對事件的處理。
我們知道,Spring的事件監(jiān)聽機制(發(fā)布訂閱模型)實際上并不是異步的(默認情況下),而是同步的來將代碼進行解耦。而@TransactionEventListener仍是通過這種方式,但是加入了回調(diào)的方式來解決,這樣就能夠在事務進行Commited,Rollback…等時候才去進行Event的處理,來達到事務同步的目的。
// @since 4.2 注解的方式提供的相對較晚,其實API的方式在第一個版本就已經(jīng)提供了。 // 值得注意的是,在這個注解上面有一個注解:`@EventListener`,所以表明其實這個注解也是個事件監(jiān)聽器。 @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @EventListener //有類似于注解繼承的效果 public @interface TransactionalEventListener { // 這個注解取值有:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION // 各個值都代表什么意思表達什么功能,非常清晰,下面解釋了對應的枚舉類~ // 需要注意的是:AFTER_COMMIT + AFTER_COMPLETION是可以同時生效的 // AFTER_ROLLBACK + AFTER_COMPLETION是可以同時生效的 TransactionPhase phase() default TransactionPhase.AFTER_COMMIT; // 表明若沒有事務的時候,對應的event是否需要執(zhí)行,默認值為false表示,沒事務就不執(zhí)行了。 boolean fallbackExecution() default false; // 這里巧妙的用到了@AliasFor的能力,放到了@EventListener身上 // 注意:一般建議都需要指定此值,否則默認可以處理所有類型的事件,范圍太廣了。 @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] value() default {}; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] classes() default {}; String condition() default ""; }
public enum TransactionPhase { // 指定目標方法在事務commit之前執(zhí)行 BEFORE_COMMIT, // 指定目標方法在事務commit之后執(zhí)行 AFTER_COMMIT, // 指定目標方法在事務rollback之后執(zhí)行 AFTER_ROLLBACK, // 指定目標方法在事務完成時執(zhí)行,這里的完成是指無論事務是成功提交還是事務回滾了 AFTER_COMPLETION }
四、代碼示例
這里主要是為了講解如何使用@TransactionalEventListener,所以就不列出所有代碼了。
@Data public class User { private long id; private String name; private Integer age; }
業(yè)務實現(xiàn):
@Service @Slf4j public class UserServiceImpl extends implements UserService { @Autowired UserMapper userMapper; @Autowired ApplicationEventPublisher eventPublisher; public void userRegister(User user){ userMapper.insertUser(user); eventPublisher.publishEvent(new UserRegisterEvent(new Date())); } }
自定義事件:
@Getter @Setter public class UserRegisterEvent extends ApplicationEvent { private Date registerDate; public UserRegisterEvent(Date registerDate) { super(registerDate); this.registerDate = registerDate; } }
事件監(jiān)聽器:
@Slf4j @Component public class UserListener { @Autowired UserService userService; @Async @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserRegisterEvent.class) public void onUserRegisterEvent(UserRegisterEvent event) { userService.sendActivationCode(event.getRegisterDate()); } }
五、實現(xiàn)原理
關于事務的實現(xiàn)原理,這里其實是比較簡單的,Spring對事務監(jiān)控的處理邏輯在TransactionSynchronization中,如下是該接口的聲明:
public interface TransactionSynchronization extends Flushable { // 在當前事務掛起時執(zhí)行 default void suspend() { } // 在當前事務重新加載時執(zhí)行 default void resume() { } // 在當前數(shù)據(jù)刷新到數(shù)據(jù)庫時執(zhí)行 default void flush() { } // 在當前事務commit之前執(zhí)行 default void beforeCommit(boolean readOnly) { } // 在當前事務completion之前執(zhí)行 default void beforeCompletion() { } // 在當前事務commit之后實質(zhì)性 default void afterCommit() { } // 在當前事務completion之后執(zhí)行 default void afterCompletion(int status) { } }
很明顯,這里的TransactionSynchronization接口只是抽象了一些行為,用于事務事件發(fā)生時觸發(fā),這些行為在Spring事務中提供了內(nèi)在支持,即在相應的事務事件時,其會獲取當前所有注冊的TransactionSynchronization對象,然后調(diào)用其相應的方法。
那么這里TransactionSynchronization對象的注冊點對于我們了解事務事件觸發(fā)有至關重要的作用了。
這里我們首先回到事務標簽的解析處,在前面講解事務標簽解析時,我們講到Spring會注冊一個TransactionalEventListenerFactory類型的bean到Spring容器中,這里關于標簽的解析讀者可以閱讀本人前面的文章Spring事務用法示例與實現(xiàn)原理。
這里注冊的TransactionalEventListenerFactory實現(xiàn)了EventListenerFactory接口,這個接口的主要作用是先判斷目標方法是否是某個監(jiān)聽器的類型,然后為目標方法生成一個監(jiān)聽器,其會在某個bean初始化之后由Spring調(diào)用其方法用于生成監(jiān)聽器。如下是該類的實現(xiàn):
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered { // 指定當前監(jiān)聽器的順序 private int order = 50; public void setOrder(int order) { this.order = order; } @Override public int getOrder() { return this.order; } // 指定目標方法是否是所支持的監(jiān)聽器的類型,這里的判斷邏輯就是如果目標方法上包含有 // TransactionalEventListener注解,則說明其是一個事務事件監(jiān)聽器 @Override public boolean supportsMethod(Method method) { return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null); } // 為目標方法生成一個事務事件監(jiān)聽器,這里ApplicationListenerMethodTransactionalAdapter實現(xiàn)了 // ApplicationEvent接口 @Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); } }
這里關于事務事件監(jiān)聽的邏輯其實已經(jīng)比較清楚了。
ApplicationListenerMethodTransactionalAdapter本質(zhì)上是實現(xiàn)了ApplicationListener接口的,也就是說,其是Spring的一個事件監(jiān)聽器,這也就是為什么進行事務處理時需要使用ApplicationEventPublisher.publish()方法發(fā)布一下當前事務的事件。
ApplicationListenerMethodTransactionalAdapter在監(jiān)聽到發(fā)布的事件之后會生成一個TransactionSynchronization對象,并且將該對象注冊到當前事務邏輯中,如下是監(jiān)聽事務事件的處理邏輯:
@Override public void onApplicationEvent(ApplicationEvent event) { // 如果當前TransactionManager已經(jīng)配置開啟事務事件監(jiān)聽, // 此時才會注冊TransactionSynchronization對象 if (TransactionSynchronizationManager.isSynchronizationActive()) { // 通過當前事務事件發(fā)布的參數(shù),創(chuàng)建一個TransactionSynchronization對象 TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); // 注冊TransactionSynchronization對象到TransactionManager中 TransactionSynchronizationManager .registerSynchronization(transactionSynchronization); } else if (this.annotation.fallbackExecution()) { // 如果當前TransactionManager沒有開啟事務事件處理,但是當前事務監(jiān)聽方法中配置了 // fallbackExecution屬性為true,說明其需要對當前事務事件進行監(jiān)聽,無論其是否有事務 if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) { logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase"); } processEvent(event); } else { // 走到這里說明當前是不需要事務事件處理的,因而直接略過 if (logger.isDebugEnabled()) { logger.debug("No transaction is active - skipping " + event); } } }
這里需要說明的是,上述annotation屬性就是在事務監(jiān)聽方法上解析的TransactionalEventListener注解中配置的屬性。
可以看到,對于事務事件的處理,這里創(chuàng)建了一個TransactionSynchronization對象,其實主要的處理邏輯就是在返回的這個對象中,而createTransactionSynchronization()方法內(nèi)部只是創(chuàng)建了一個TransactionSynchronizationEventAdapter對象就返回了。
這里我們直接看該對象的源碼:
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter { private final ApplicationListenerMethodAdapter listener; private final ApplicationEvent event; private final TransactionPhase phase; public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener, ApplicationEvent event, TransactionPhase phase) { this.listener = listener; this.event = event; this.phase = phase; } @Override public int getOrder() { return this.listener.getOrder(); } // 在目標方法配置的phase屬性為BEFORE_COMMIT時,處理before commit事件 public void beforeCommit(boolean readOnly) { if (this.phase == TransactionPhase.BEFORE_COMMIT) { processEvent(); } } // 這里對于after completion事件的處理,雖然分為了三個if分支,但是實際上都是執(zhí)行的processEvent() // 方法,因為after completion事件是事務事件中一定會執(zhí)行的,因而這里對于commit, // rollback和completion事件都在當前方法中處理也是沒問題的 public void afterCompletion(int status) { if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { processEvent(); } else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { processEvent(); } else if (this.phase == TransactionPhase.AFTER_COMPLETION) { processEvent(); } } // 執(zhí)行事務事件 protected void processEvent() { this.listener.processEvent(this.event); } }
可以看到,對于事務事件的處理,最終都是委托給了ApplicationListenerMethodAdapter.processEvent()方法進行的。如下是該方法的源碼:
public void processEvent(ApplicationEvent event) { // 處理事務事件的相關參數(shù),這里主要是判斷TransactionalEventListener注解中是否配置了value // 或classes屬性,如果配置了,則將方法參數(shù)轉(zhuǎn)換為該指定類型傳給監(jiān)聽的方法;如果沒有配置,則判斷 // 目標方法是ApplicationEvent類型還是PayloadApplicationEvent類型,是則轉(zhuǎn)換為該類型傳入 Object[] args = resolveArguments(event); // 這里主要是獲取TransactionalEventListener注解中的condition屬性,然后通過 // Spring expression language將其與目標類和方法進行匹配 if (shouldHandle(event, args)) { // 通過處理得到的參數(shù)借助于反射調(diào)用事務監(jiān)聽方法 Object result = doInvoke(args); if (result != null) { // 對方法的返回值進行處理 handleResult(result); } else { logger.trace("No result object given - no result to handle"); } } } // 處理事務監(jiān)聽方法的參數(shù) protected Object[] resolveArguments(ApplicationEvent event) { // 獲取發(fā)布事務事件時傳入的參數(shù)類型 ResolvableType declaredEventType = getResolvableType(event); if (declaredEventType == null) { return null; } // 如果事務監(jiān)聽方法的參數(shù)個數(shù)為0,則直接返回 if (this.method.getParameterCount() == 0) { return new Object[0]; } // 如果事務監(jiān)聽方法的參數(shù)不為ApplicationEvent或PayloadApplicationEvent,則直接將發(fā)布事務 // 事件時傳入的參數(shù)當做事務監(jiān)聽方法的參數(shù)傳入。從這里可以看出,如果事務監(jiān)聽方法的參數(shù)不是 // ApplicationEvent或PayloadApplicationEvent類型,那么其參數(shù)必須只能有一個,并且這個 // 參數(shù)必須與發(fā)布事務事件時傳入的參數(shù)一致 Class<?> eventClass = declaredEventType.getRawClass(); if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) && event instanceof PayloadApplicationEvent) { return new Object[] {((PayloadApplicationEvent) event).getPayload()}; } else { // 如果參數(shù)類型為ApplicationEvent或PayloadApplicationEvent,則直接將其傳入事務事件方法 return new Object[] {event}; } } // 判斷事務事件方法方法是否需要進行事務事件處理 private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) { if (args == null) { return false; } String condition = getCondition(); if (StringUtils.hasText(condition)) { Assert.notNull(this.evaluator, "EventExpressionEvaluator must no be null"); EvaluationContext evaluationContext = this.evaluator.createEvaluationContext( event, this.targetClass, this.method, args, this.applicationContext); return this.evaluator.condition(condition, this.methodKey, evaluationContext); } return true; } // 對事務事件方法的返回值進行處理,這里的處理方式主要是將其作為一個事件繼續(xù)發(fā)布出去,這樣就可以在 // 一個統(tǒng)一的位置對事務事件的返回值進行處理 protected void handleResult(Object result) { // 如果返回值是數(shù)組類型,則對數(shù)組元素一個一個進行發(fā)布 if (result.getClass().isArray()) { Object[] events = ObjectUtils.toObjectArray(result); for (Object event : events) { publishEvent(event); } } else if (result instanceof Collection<?>) { // 如果返回值是集合類型,則對集合進行遍歷,并且發(fā)布集合中的每個元素 Collection<?> events = (Collection<?>) result; for (Object event : events) { publishEvent(event); } } else { // 如果返回值是一個對象,則直接將其進行發(fā)布 publishEvent(result); } }
六、總結(jié)
對于事務事件的處理,總結(jié)而言,就是為每個事務事件監(jiān)聽方法創(chuàng)建了一個TransactionSynchronizationEventAdapter對象,通過該對象在發(fā)布事務事件的時候,會在當前線程中注冊該對象,這樣就可以保證每個線程每個監(jiān)聽器中只會對應一個TransactionSynchronizationEventAdapter對象。
在Spring進行事務事件的時候會調(diào)用該對象對應的監(jiān)聽方法,從而達到對事務事件進行監(jiān)聽的目的。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
淺析java修飾符訪問權限(動力節(jié)點Java學院整理)
Java有四種訪問權限,其中三種有訪問權限修飾符,分別為private,public和protected,還有一種不帶任何修飾符,下面通過本文給大家簡單介紹下java修飾符訪問權限相關知識,感興趣的朋友一起學習吧2017-04-04java數(shù)據(jù)結(jié)構與算法之插入排序詳解
這篇文章主要介紹了java數(shù)據(jù)結(jié)構與算法之插入排序,結(jié)合實例形式分析了java插入排序的概念、分類、原理、實現(xiàn)方法與相關注意事項,需要的朋友可以參考下2017-05-05Spring @Valid @Validated實現(xiàn)驗證
這篇文章主要介紹了Spring @Valid @Validated實現(xiàn)驗證,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-01-01