@TransactionalEventListener的使用和實(shí)現(xiàn)原理分析
一、問題描述
平時(shí)我們在完成某些數(shù)據(jù)的入庫后,發(fā)布了一個(gè)事件,此時(shí)使用的是@EventListener,然后在這個(gè)事件中,又去對剛才入庫的數(shù)據(jù)進(jìn)行查詢,從而完成后續(xù)的操作。
例如(數(shù)據(jù)入庫=>對入庫數(shù)據(jù)進(jìn)行查詢審核),這時(shí)候會發(fā)現(xiàn),查詢不到剛才入庫的數(shù)據(jù),這是因?yàn)槭聞?wù)還沒提交完成,在同一個(gè)事務(wù)當(dāng)中,查詢不到才存入的數(shù)據(jù),那么就引出了下面的解決方式。
為了解決上述問題,Spring為我們提供了兩種方式:
(1) @TransactionalEventListener注解。
(2) 事務(wù)同步管理器TransactionSynchronizationManager。
以便我們可以在事務(wù)提交后再觸發(fā)某一事件來進(jìn)行其他操作。
二、使用場景
在項(xiàng)目當(dāng)中,我們有時(shí)候需要在執(zhí)行數(shù)據(jù)庫操作之后,發(fā)送消息或事件來異步調(diào)用其他組件執(zhí)行相應(yīng)的操作,例如:
1.數(shù)據(jù)完成導(dǎo)入之后,發(fā)布審核事件,對入庫的數(shù)據(jù)進(jìn)行審核。
2.用戶在完成注冊后發(fā)送激活碼。
3.配置修改后,發(fā)送更新配置的事件。
三、@TransactionalEventListener詳解
我們可以從命名上直接看出,它就是個(gè)EventListener,
在Spring4.2+,有一種叫做@TransactionEventListener的方式,能夠?qū)崿F(xiàn)在控制事務(wù)的同時(shí),完成對對事件的處理。
我們知道,Spring的事件監(jiān)聽機(jī)制(發(fā)布訂閱模型)實(shí)際上并不是異步的(默認(rèn)情況下),而是同步的來將代碼進(jìn)行解耦。而@TransactionEventListener仍是通過這種方式,但是加入了回調(diào)的方式來解決,這樣就能夠在事務(wù)進(jìn)行Commited,Rollback…等時(shí)候才去進(jìn)行Event的處理,來達(dá)到事務(wù)同步的目的。
// @since 4.2 注解的方式提供的相對較晚,其實(shí)API的方式在第一個(gè)版本就已經(jīng)提供了。
// 值得注意的是,在這個(gè)注解上面有一個(gè)注解:`@EventListener`,所以表明其實(shí)這個(gè)注解也是個(gè)事件監(jiān)聽器。
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener //有類似于注解繼承的效果
public @interface TransactionalEventListener {
// 這個(gè)注解取值有:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
// 各個(gè)值都代表什么意思表達(dá)什么功能,非常清晰,下面解釋了對應(yīng)的枚舉類~
// 需要注意的是:AFTER_COMMIT + AFTER_COMPLETION是可以同時(shí)生效的
// AFTER_ROLLBACK + AFTER_COMPLETION是可以同時(shí)生效的
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
// 表明若沒有事務(wù)的時(shí)候,對應(yīng)的event是否需要執(zhí)行,默認(rèn)值為false表示,沒事務(wù)就不執(zhí)行了。
boolean fallbackExecution() default false;
// 這里巧妙的用到了@AliasFor的能力,放到了@EventListener身上
// 注意:一般建議都需要指定此值,否則默認(rèn)可以處理所有類型的事件,范圍太廣了。
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};
String condition() default "";
}
public enum TransactionPhase {
// 指定目標(biāo)方法在事務(wù)commit之前執(zhí)行
BEFORE_COMMIT,
// 指定目標(biāo)方法在事務(wù)commit之后執(zhí)行
AFTER_COMMIT,
// 指定目標(biāo)方法在事務(wù)rollback之后執(zhí)行
AFTER_ROLLBACK,
// 指定目標(biāo)方法在事務(wù)完成時(shí)執(zhí)行,這里的完成是指無論事務(wù)是成功提交還是事務(wù)回滾了
AFTER_COMPLETION
}
四、代碼示例
這里主要是為了講解如何使用@TransactionalEventListener,所以就不列出所有代碼了。
@Data
public class User {
private long id;
private String name;
private Integer age;
}
業(yè)務(wù)實(shí)現(xiàn):
@Service
@Slf4j
public class UserServiceImpl extends implements UserService {
@Autowired
UserMapper userMapper;
@Autowired
ApplicationEventPublisher eventPublisher;
public void userRegister(User user){
userMapper.insertUser(user);
eventPublisher.publishEvent(new UserRegisterEvent(new Date()));
}
}
自定義事件:
@Getter
@Setter
public class UserRegisterEvent extends ApplicationEvent {
private Date registerDate;
public UserRegisterEvent(Date registerDate) {
super(registerDate);
this.registerDate = registerDate;
}
}
事件監(jiān)聽器:
@Slf4j
@Component
public class UserListener {
@Autowired
UserService userService;
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserRegisterEvent.class)
public void onUserRegisterEvent(UserRegisterEvent event) {
userService.sendActivationCode(event.getRegisterDate());
}
}
五、實(shí)現(xiàn)原理
關(guān)于事務(wù)的實(shí)現(xiàn)原理,這里其實(shí)是比較簡單的,Spring對事務(wù)監(jiān)控的處理邏輯在TransactionSynchronization中,如下是該接口的聲明:
public interface TransactionSynchronization extends Flushable {
// 在當(dāng)前事務(wù)掛起時(shí)執(zhí)行
default void suspend() {
}
// 在當(dāng)前事務(wù)重新加載時(shí)執(zhí)行
default void resume() {
}
// 在當(dāng)前數(shù)據(jù)刷新到數(shù)據(jù)庫時(shí)執(zhí)行
default void flush() {
}
// 在當(dāng)前事務(wù)commit之前執(zhí)行
default void beforeCommit(boolean readOnly) {
}
// 在當(dāng)前事務(wù)completion之前執(zhí)行
default void beforeCompletion() {
}
// 在當(dāng)前事務(wù)commit之后實(shí)質(zhì)性
default void afterCommit() {
}
// 在當(dāng)前事務(wù)completion之后執(zhí)行
default void afterCompletion(int status) {
}
}
很明顯,這里的TransactionSynchronization接口只是抽象了一些行為,用于事務(wù)事件發(fā)生時(shí)觸發(fā),這些行為在Spring事務(wù)中提供了內(nèi)在支持,即在相應(yīng)的事務(wù)事件時(shí),其會獲取當(dāng)前所有注冊的TransactionSynchronization對象,然后調(diào)用其相應(yīng)的方法。
那么這里TransactionSynchronization對象的注冊點(diǎn)對于我們了解事務(wù)事件觸發(fā)有至關(guān)重要的作用了。
這里我們首先回到事務(wù)標(biāo)簽的解析處,在前面講解事務(wù)標(biāo)簽解析時(shí),我們講到Spring會注冊一個(gè)TransactionalEventListenerFactory類型的bean到Spring容器中,這里關(guān)于標(biāo)簽的解析讀者可以閱讀本人前面的文章Spring事務(wù)用法示例與實(shí)現(xiàn)原理。
這里注冊的TransactionalEventListenerFactory實(shí)現(xiàn)了EventListenerFactory接口,這個(gè)接口的主要作用是先判斷目標(biāo)方法是否是某個(gè)監(jiān)聽器的類型,然后為目標(biāo)方法生成一個(gè)監(jiān)聽器,其會在某個(gè)bean初始化之后由Spring調(diào)用其方法用于生成監(jiān)聽器。如下是該類的實(shí)現(xiàn):
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
// 指定當(dāng)前監(jiān)聽器的順序
private int order = 50;
public void setOrder(int order) {
this.order = order;
}
@Override
public int getOrder() {
return this.order;
}
// 指定目標(biāo)方法是否是所支持的監(jiān)聽器的類型,這里的判斷邏輯就是如果目標(biāo)方法上包含有
// TransactionalEventListener注解,則說明其是一個(gè)事務(wù)事件監(jiān)聽器
@Override
public boolean supportsMethod(Method method) {
return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null);
}
// 為目標(biāo)方法生成一個(gè)事務(wù)事件監(jiān)聽器,這里ApplicationListenerMethodTransactionalAdapter實(shí)現(xiàn)了
// ApplicationEvent接口
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}
這里關(guān)于事務(wù)事件監(jiān)聽的邏輯其實(shí)已經(jīng)比較清楚了。
ApplicationListenerMethodTransactionalAdapter本質(zhì)上是實(shí)現(xiàn)了ApplicationListener接口的,也就是說,其是Spring的一個(gè)事件監(jiān)聽器,這也就是為什么進(jìn)行事務(wù)處理時(shí)需要使用ApplicationEventPublisher.publish()方法發(fā)布一下當(dāng)前事務(wù)的事件。
ApplicationListenerMethodTransactionalAdapter在監(jiān)聽到發(fā)布的事件之后會生成一個(gè)TransactionSynchronization對象,并且將該對象注冊到當(dāng)前事務(wù)邏輯中,如下是監(jiān)聽事務(wù)事件的處理邏輯:
@Override
public void onApplicationEvent(ApplicationEvent event) {
// 如果當(dāng)前TransactionManager已經(jīng)配置開啟事務(wù)事件監(jiān)聽,
// 此時(shí)才會注冊TransactionSynchronization對象
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 通過當(dāng)前事務(wù)事件發(fā)布的參數(shù),創(chuàng)建一個(gè)TransactionSynchronization對象
TransactionSynchronization transactionSynchronization =
createTransactionSynchronization(event);
// 注冊TransactionSynchronization對象到TransactionManager中
TransactionSynchronizationManager
.registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
// 如果當(dāng)前TransactionManager沒有開啟事務(wù)事件處理,但是當(dāng)前事務(wù)監(jiān)聽方法中配置了
// fallbackExecution屬性為true,說明其需要對當(dāng)前事務(wù)事件進(jìn)行監(jiān)聽,無論其是否有事務(wù)
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK
&& logger.isWarnEnabled()) {
logger.warn("Processing "
+ event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
} else {
// 走到這里說明當(dāng)前是不需要事務(wù)事件處理的,因而直接略過
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
這里需要說明的是,上述annotation屬性就是在事務(wù)監(jiān)聽方法上解析的TransactionalEventListener注解中配置的屬性。
可以看到,對于事務(wù)事件的處理,這里創(chuàng)建了一個(gè)TransactionSynchronization對象,其實(shí)主要的處理邏輯就是在返回的這個(gè)對象中,而createTransactionSynchronization()方法內(nèi)部只是創(chuàng)建了一個(gè)TransactionSynchronizationEventAdapter對象就返回了。
這里我們直接看該對象的源碼:
private static class TransactionSynchronizationEventAdapter
extends TransactionSynchronizationAdapter {
private final ApplicationListenerMethodAdapter listener;
private final ApplicationEvent event;
private final TransactionPhase phase;
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter
listener, ApplicationEvent event, TransactionPhase phase) {
this.listener = listener;
this.event = event;
this.phase = phase;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
// 在目標(biāo)方法配置的phase屬性為BEFORE_COMMIT時(shí),處理before commit事件
public void beforeCommit(boolean readOnly) {
if (this.phase == TransactionPhase.BEFORE_COMMIT) {
processEvent();
}
}
// 這里對于after completion事件的處理,雖然分為了三個(gè)if分支,但是實(shí)際上都是執(zhí)行的processEvent()
// 方法,因?yàn)閍fter completion事件是事務(wù)事件中一定會執(zhí)行的,因而這里對于commit,
// rollback和completion事件都在當(dāng)前方法中處理也是沒問題的
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEvent();
} else if (this.phase == TransactionPhase.AFTER_ROLLBACK
&& status == STATUS_ROLLED_BACK) {
processEvent();
} else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
processEvent();
}
}
// 執(zhí)行事務(wù)事件
protected void processEvent() {
this.listener.processEvent(this.event);
}
}
可以看到,對于事務(wù)事件的處理,最終都是委托給了ApplicationListenerMethodAdapter.processEvent()方法進(jìn)行的。如下是該方法的源碼:
public void processEvent(ApplicationEvent event) {
// 處理事務(wù)事件的相關(guān)參數(shù),這里主要是判斷TransactionalEventListener注解中是否配置了value
// 或classes屬性,如果配置了,則將方法參數(shù)轉(zhuǎn)換為該指定類型傳給監(jiān)聽的方法;如果沒有配置,則判斷
// 目標(biāo)方法是ApplicationEvent類型還是PayloadApplicationEvent類型,是則轉(zhuǎn)換為該類型傳入
Object[] args = resolveArguments(event);
// 這里主要是獲取TransactionalEventListener注解中的condition屬性,然后通過
// Spring expression language將其與目標(biāo)類和方法進(jìn)行匹配
if (shouldHandle(event, args)) {
// 通過處理得到的參數(shù)借助于反射調(diào)用事務(wù)監(jiān)聽方法
Object result = doInvoke(args);
if (result != null) {
// 對方法的返回值進(jìn)行處理
handleResult(result);
} else {
logger.trace("No result object given - no result to handle");
}
}
}
// 處理事務(wù)監(jiān)聽方法的參數(shù)
protected Object[] resolveArguments(ApplicationEvent event) {
// 獲取發(fā)布事務(wù)事件時(shí)傳入的參數(shù)類型
ResolvableType declaredEventType = getResolvableType(event);
if (declaredEventType == null) {
return null;
}
// 如果事務(wù)監(jiān)聽方法的參數(shù)個(gè)數(shù)為0,則直接返回
if (this.method.getParameterCount() == 0) {
return new Object[0];
}
// 如果事務(wù)監(jiān)聽方法的參數(shù)不為ApplicationEvent或PayloadApplicationEvent,則直接將發(fā)布事務(wù)
// 事件時(shí)傳入的參數(shù)當(dāng)做事務(wù)監(jiān)聽方法的參數(shù)傳入。從這里可以看出,如果事務(wù)監(jiān)聽方法的參數(shù)不是
// ApplicationEvent或PayloadApplicationEvent類型,那么其參數(shù)必須只能有一個(gè),并且這個(gè)
// 參數(shù)必須與發(fā)布事務(wù)事件時(shí)傳入的參數(shù)一致
Class<?> eventClass = declaredEventType.getRawClass();
if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) &&
event instanceof PayloadApplicationEvent) {
return new Object[] {((PayloadApplicationEvent) event).getPayload()};
} else {
// 如果參數(shù)類型為ApplicationEvent或PayloadApplicationEvent,則直接將其傳入事務(wù)事件方法
return new Object[] {event};
}
}
// 判斷事務(wù)事件方法方法是否需要進(jìn)行事務(wù)事件處理
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
if (args == null) {
return false;
}
String condition = getCondition();
if (StringUtils.hasText(condition)) {
Assert.notNull(this.evaluator, "EventExpressionEvaluator must no be null");
EvaluationContext evaluationContext = this.evaluator.createEvaluationContext(
event, this.targetClass, this.method, args, this.applicationContext);
return this.evaluator.condition(condition, this.methodKey, evaluationContext);
}
return true;
}
// 對事務(wù)事件方法的返回值進(jìn)行處理,這里的處理方式主要是將其作為一個(gè)事件繼續(xù)發(fā)布出去,這樣就可以在
// 一個(gè)統(tǒng)一的位置對事務(wù)事件的返回值進(jìn)行處理
protected void handleResult(Object result) {
// 如果返回值是數(shù)組類型,則對數(shù)組元素一個(gè)一個(gè)進(jìn)行發(fā)布
if (result.getClass().isArray()) {
Object[] events = ObjectUtils.toObjectArray(result);
for (Object event : events) {
publishEvent(event);
}
} else if (result instanceof Collection<?>) {
// 如果返回值是集合類型,則對集合進(jìn)行遍歷,并且發(fā)布集合中的每個(gè)元素
Collection<?> events = (Collection<?>) result;
for (Object event : events) {
publishEvent(event);
}
} else {
// 如果返回值是一個(gè)對象,則直接將其進(jìn)行發(fā)布
publishEvent(result);
}
}
六、總結(jié)
對于事務(wù)事件的處理,總結(jié)而言,就是為每個(gè)事務(wù)事件監(jiān)聽方法創(chuàng)建了一個(gè)TransactionSynchronizationEventAdapter對象,通過該對象在發(fā)布事務(wù)事件的時(shí)候,會在當(dāng)前線程中注冊該對象,這樣就可以保證每個(gè)線程每個(gè)監(jiān)聽器中只會對應(yīng)一個(gè)TransactionSynchronizationEventAdapter對象。
在Spring進(jìn)行事務(wù)事件的時(shí)候會調(diào)用該對象對應(yīng)的監(jiān)聽方法,從而達(dá)到對事務(wù)事件進(jìn)行監(jiān)聽的目的。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
淺析java修飾符訪問權(quán)限(動力節(jié)點(diǎn)Java學(xué)院整理)
Java有四種訪問權(quán)限,其中三種有訪問權(quán)限修飾符,分別為private,public和protected,還有一種不帶任何修飾符,下面通過本文給大家簡單介紹下java修飾符訪問權(quán)限相關(guān)知識,感興趣的朋友一起學(xué)習(xí)吧2017-04-04
java數(shù)據(jù)結(jié)構(gòu)與算法之插入排序詳解
這篇文章主要介紹了java數(shù)據(jù)結(jié)構(gòu)與算法之插入排序,結(jié)合實(shí)例形式分析了java插入排序的概念、分類、原理、實(shí)現(xiàn)方法與相關(guān)注意事項(xiàng),需要的朋友可以參考下2017-05-05
Spring @Valid @Validated實(shí)現(xiàn)驗(yàn)證
這篇文章主要介紹了Spring @Valid @Validated實(shí)現(xiàn)驗(yàn)證,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01
Java實(shí)現(xiàn)文件上傳至服務(wù)器的方法
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)文件上傳至服務(wù)器的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01

