使用@TransactionalEventListener監(jiān)聽事務(wù)教程
@TransactionalEventListener監(jiān)聽事務(wù)
項目背景
最近在項目遇到一個問題
A方法體內(nèi)有 INSERT、UPDATE或者DELETE操作,最后會發(fā)送一段MQ給外部,外部接收到MQ后會再發(fā)送一段請求過來,系統(tǒng)收到請求后會執(zhí)行B方法,B方法會依賴A方法修改后的結(jié)果,這就有一個問題,如果A方法事務(wù)沒有提交;且B方法的請求過來了會查詢到事務(wù)未提交前的狀態(tài),這就會有問題
解決辦法:@TransactionalEventListener
在Spring4.2+,有一種叫做TransactionEventListener的方式,能夠控制在事務(wù)的時候Event事件的處理方式。 我們知道,Spring的發(fā)布訂閱模型實際上并不是異步的,而是同步的來將代碼進(jìn)行解耦。而TransactionEventListener仍是通過這種方式,只不過加入了回調(diào)的方式來解決,這樣就能夠在事務(wù)進(jìn)行Commited,Rollback…等的時候才會去進(jìn)行Event的處理。
具體實現(xiàn)
//創(chuàng)建一個事件類 package com.qk.cas.config; import org.springframework.context.ApplicationEvent; public class MyTransactionEvent extends ApplicationEvent { private static final long serialVersionUID = 1L; private IProcesser processer; public MyTransactionEvent(IProcesser processer) { super(processer); this.processer = processer; } public IProcesser getProcesser() { return this.processer; } @FunctionalInterface public interface IProcesser { void handle(); } } //創(chuàng)建一個監(jiān)聽類 package com.qk.cas.config; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @Component public class MyTransactionListener { @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void hanldeOrderCreatedEvent(MyTransactionEvent event) { event.getProcesser().handle(); } } //MQ方法的變動 @Autowired private ApplicationEventPublisher eventPublisher; @Autowired private AmqpTemplate rabbitTemplate; public void sendCreditResult(String applyNo, String jsonString) { eventPublisher.publishEvent(new MyTransactionEvent(() -> { LOGGER.info("MQ。APPLY_NO:[{}]。KEY:[{}]。通知報文:[{}]", applyNo, Queues.CREDIT_RESULT, jsonString); rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString); })); }
拓展
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有當(dāng)前事務(wù)提交之后,才會執(zhí)行事件監(jiān)聽的方法,其中參數(shù)phase默認(rèn)為AFTER_COMMIT,共有四個枚舉:
public enum TransactionPhase { /** * Fire the event before transaction commit. * @see TransactionSynchronization#beforeCommit(boolean) */ BEFORE_COMMIT, /** * Fire the event after the commit has completed successfully. * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and * therefore executes in the same after-completion sequence of events, * (and not in {@link TransactionSynchronization#afterCommit()}). * @see TransactionSynchronization#afterCompletion(int) * @see TransactionSynchronization#STATUS_COMMITTED */ AFTER_COMMIT, /** * Fire the event if the transaction has rolled back. * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and * therefore executes in the same after-completion sequence of events. * @see TransactionSynchronization#afterCompletion(int) * @see TransactionSynchronization#STATUS_ROLLED_BACK */ AFTER_ROLLBACK, /** * Fire the event after the transaction has completed. * <p>For more fine-grained events, use {@link #AFTER_COMMIT} or * {@link #AFTER_ROLLBACK} to intercept transaction commit * or rollback, respectively. * @see TransactionSynchronization#afterCompletion(int) */ AFTER_COMPLETION }
注解@TransactionalEventListener
例如 用戶注冊之后需要計算用戶的邀請關(guān)系,遞歸操作。如果注冊的時候包含多步驗證,生成基本初始化數(shù)據(jù),這時候我們通過mq發(fā)送消息來處理這個邀請關(guān)系,會出現(xiàn)一個問題,就是用戶還沒注冊數(shù)據(jù)還沒入庫,邀請關(guān)系就開始執(zhí)行,但是查不到數(shù)據(jù),導(dǎo)致出錯。
@TransactionalEventListener 可以實現(xiàn)事務(wù)的監(jiān)聽,可以在提交之后再進(jìn)行操作。
監(jiān)聽的對象
package com.jinglitong.springshop.interceptor; import com.jinglitong.springshop.entity.Customer; import org.springframework.context.ApplicationEvent; public class RegCustomerEvent extends ApplicationEvent{ public RegCustomerEvent(Customer customer){ super(customer); } }
監(jiān)聽到之后的操作
package com.jinglitong.springshop.interceptor; import com.alibaba.fastjson.JSON; import com.jinglitong.springshop.entity.Customer; import com.jinglitong.springshop.entity.MqMessageRecord; import com.jinglitong.springshop.servcie.MqMessageRecordService; import com.jinglitong.springshop.util.AliMQServiceUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; import java.util.Date; import java.util.HashMap; import java.util.Map; @Component @Slf4j public class RegCustomerListener { @Value("${aliyun.mq.order.topic}") private String topic; @Value("${aliyun.mq.regist.product}") private String registGroup; @Value("${aliyun.mq.regist.tag}") private String registTag; @Autowired MqMessageRecordService mqMessageRecordService; @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) { Customer cust = (Customer) regCustomerEvent.getSource(); Map<String, String> map = new HashMap<String, String>(); map.put("custId", cust.getZid()); map.put("account", cust.getAccount()); log.info("put regist notice to Mq start"); String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup); MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup); if(StringUtils.isEmpty(hdResult)) { insert.setStatus(false); }else { insert.setStatus(true); } mqMessageRecordService.insertRecord(insert); log.info("put regist notice to Mq end"); log.info("regist notice userId : " + cust.getAccount()); } private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) { MqMessageRecord msg = new MqMessageRecord(); msg.setFlowId(custId); msg.setGroupName(groupId); msg.setTopic(topic); msg.setTag(tag); msg.setMsgId(result); msg.setDataBody(jsonStr); msg.setSendType(3); msg.setGroupType(1); msg.setCreateTime(new Date()); return msg; } } @Autowired private ApplicationEventPublisher applicationEventPublisher; applicationEventPublisher.publishEvent(new RegCustomerEvent (XXX));
這樣可以確保數(shù)據(jù)入庫之后再進(jìn)行異步計算
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java 語言守護(hù)線程 Daemon Thread使用示例詳解
這篇文章主要為大家介紹了Java 語言守護(hù)線程 Daemon Thread使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10springboot druid數(shù)據(jù)庫配置密碼加密的實現(xiàn)
Druid是阿里開發(fā)的數(shù)據(jù)庫連接池,本文主要介紹了springboot druid數(shù)據(jù)庫配置密碼加密的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-06-06