欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot實(shí)現(xiàn)事務(wù)鉤子函數(shù)的示例

 更新時(shí)間:2025年08月19日 09:44:28   作者:mb685e2ead6a5e7  
本文主要介紹了SpringBoot實(shí)現(xiàn)事務(wù)鉤子函數(shù)的示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

一、案例背景

拿支付系統(tǒng)相關(guān)的業(yè)務(wù)來舉例。在支付系統(tǒng)中,我們需要記錄每個(gè)賬戶的資金流水(記錄用戶A因?yàn)槟膫€(gè)操作扣了錢,因?yàn)槟膫€(gè)操作加了錢),這樣我們才能對每個(gè)賬戶的做到心中有數(shù),對于支付系統(tǒng)而言,資金流水的數(shù)據(jù)可謂是最重要的。因此,為了防止支付系統(tǒng)的老大徇私舞弊,CTO提了一個(gè)流水存檔的需求:要求支付系統(tǒng)對每個(gè)賬戶的資金流水做一份存檔,要求支付系統(tǒng)在寫流水的時(shí)候,把流水相關(guān)的信息以消息的形式推送到kafka,由存檔系統(tǒng)消費(fèi)這個(gè)消息并落地到庫里(這個(gè)庫只有存檔系統(tǒng)擁有寫權(quán)限)。整個(gè)需求的流程如下所示:

整個(gè)需求的流程還是比較簡單的,考慮到后續(xù)會有其他事業(yè)部也要進(jìn)行數(shù)據(jù)存檔操作,CTO建議支付系統(tǒng)團(tuán)隊(duì)內(nèi)部開發(fā)一個(gè)二方庫,這個(gè)二方庫的主要功能就是發(fā)送消息到kafka中去。

二、確定方案

既然要求開發(fā)一個(gè)二方庫,因此,我們需要考慮如下幾件事情:

1、技術(shù)棧使用的springboot,因此,這里最好以starter的方式提供

2、二方庫需要發(fā)送消息給kafka,最好是二方庫內(nèi)部基于kafka生產(chǎn)者的api創(chuàng)建生產(chǎn)者,不要使用Spring自帶的kafkaTemplate,因?yàn)榧煞接锌赡芤呀?jīng)使用了kafkaTemplate。不能與集成方造成沖突。

3、減少對接方的集成難度、學(xué)習(xí)成本,最好是提供一個(gè)簡單實(shí)用的api,業(yè)務(wù)側(cè)能簡單上手。

4、發(fā)送消息這個(gè)操作需要支持事務(wù),盡量不影響主業(yè)務(wù)

在上述的幾件事情中,最需要注意的應(yīng)該就是第4點(diǎn):發(fā)送消息這個(gè)操作需要支持事務(wù),盡量不影響主業(yè)務(wù)。這是什么意思呢?首先,盡量不影響主業(yè)務(wù),這個(gè)最簡單的方式就是使用異步機(jī)制。其次,需要支持事務(wù)是指:假設(shè)我們的api是在事務(wù)方法內(nèi)部調(diào)用的,那么我們需要保證事務(wù)提交后再執(zhí)行這個(gè)api。那么,我們的流水落地api應(yīng)該要有這樣的功能:

內(nèi)部可以判斷當(dāng)前是否存在事務(wù),如果存在事務(wù),則需要等事務(wù)提交后再異步發(fā)送消息給kafka。如果不存在事務(wù)則直接異步發(fā)送消息給kafka。而且這樣的判斷邏輯得放在二方庫內(nèi)部才行。那現(xiàn)在擺在我們面前的問題就是:我要如何判斷當(dāng)前是否存在事務(wù),以及如何在事務(wù)提交后再觸發(fā)我們自定義的邏輯呢?

三、TransactionSynchronizationManager顯神威

這個(gè)類內(nèi)部所有的變量、方法都是static修飾的,也就是說它其實(shí)是一個(gè)工具類。是一個(gè)事務(wù)同步器。下述是流水落地API的偽代碼,這段代碼就解決了我們上述提到的疑問:

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendLog() {
    // 判斷當(dāng)前是否存在事務(wù)
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
        // 無事務(wù),異步發(fā)送消息給kafka
        
        executor.submit(() -> {
            // 發(fā)送消息給kafka
            try {
                // 發(fā)送消息給kafka
            } catch (Exception e) {
                // 記錄異常信息,發(fā)郵件或者進(jìn)入待處理列表,讓開發(fā)人員感知異常
            }
        });
        return;
    }

    // 有事務(wù),則添加一個(gè)事務(wù)同步器,并重寫afterCompletion方法(此方法在事務(wù)提交后會做回調(diào))
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                // 事務(wù)提交后,再異步發(fā)送消息給kafka
                executor.submit(() -> {
                    try {
                     // 發(fā)送消息給kafka
                    } catch (Exception e) {
                     // 記錄異常信息,發(fā)郵件或者進(jìn)入待處理列表,讓開發(fā)人員感知異常
                    }
                });
            }
        }

    });

}

代碼比較簡單,其主要是TransactionSynchronizationManager的使用。

3.1、判斷是否存在事務(wù)?TransactionSynchronizationManager.isSynchronizationActive() 方法顯神威

我們先看下這個(gè)方法的源碼:

// TransactionSynchronizationManager.java類內(nèi)部的部分代碼

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");

public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}

很明顯,synchronizations是一個(gè)線程變量(ThreadLocal)。那它是在什么時(shí)候set進(jìn)去的呢?這里的話,可以參考下這個(gè)方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源碼如下所示:

/**
  * Activate transaction synchronization for the current thread.
  * Called by a transaction manager on transaction begin.
  * @throws IllegalStateException if synchronization is already active
  */
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
}

由源碼中的注釋也可以知道,它是在事務(wù)管理器開啟事務(wù)時(shí)調(diào)用的。換句話說,只要我們的程序執(zhí)行到帶有事務(wù)特性的方法時(shí),就會在線程變量中放入一個(gè)LinkedHashSet,用來標(biāo)識當(dāng)前存在事務(wù)。只要isSynchronizationActive返回true,則代表當(dāng)前有事務(wù)。因此,結(jié)合這兩個(gè)方法我們是指能解決我們最開始提出的疑問:**要如何判斷當(dāng)前是否存在事務(wù)**

3.2、如何在事務(wù)提交后觸發(fā)自定義邏輯?TransactionSynchronizationManager.registerSynchronization()方法顯神威

我們來看下這個(gè)方法的源代碼:

/**
  * Register a new transaction synchronization for the current thread.
  * Typically called by resource management code.
  * <p>Note that synchronizations can implement the
  * {@link org.springframework.core.Ordered} interface.
  * They will be executed in an order according to their order value (if any).
  * @param synchronization the synchronization object to register
  * @throws IllegalStateException if transaction synchronization is not active
  * @see org.springframework.core.Ordered
  */
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
}

這里又使用到了synchronizations線程變量,我們在判斷是否存在事務(wù)時(shí),就是判斷這個(gè)線程變量內(nèi)部是否有值。那我們現(xiàn)在想在事務(wù)提交后觸發(fā)自定義邏輯和這個(gè)有什么關(guān)系呢?我們在上面構(gòu)建流水落地api的偽代碼中有向synchronizations內(nèi)部添加了一個(gè)TransactionSynchronizationAdapter,內(nèi)部并重寫了afterCompletion方法,其代碼如下所示:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

    @Override
    public void afterCompletion(int status) {
        if (status == TransactionSynchronization.STATUS_COMMITTED) {
            // 事務(wù)提交后,再異步發(fā)送消息給kafka
            executor.submit(() -> {
                    try {
                     // 發(fā)送消息給kafka
                    } catch (Exception e) {
                     // 記錄異常信息,發(fā)郵件或者進(jìn)入待處理列表,讓開發(fā)人員感知異常
                    }
            });
        }
    }

});

我們結(jié)合registerSynchronization的源碼來看,其實(shí)這段代碼主要就是向線程變量內(nèi)部的LinkedHashSet添加了一個(gè)對象而已,但就是這么一個(gè)操作,讓Spring在事務(wù)執(zhí)行的過程中變得“有事情可做”。這是什么意思呢?是因?yàn)镾pring在執(zhí)行事務(wù)方法時(shí),對于操作事務(wù)的每一個(gè)階段都有一個(gè)回調(diào)操作,比如:trigger系列的回調(diào)

invoke系列的回調(diào)

而我們現(xiàn)在的需求就是在事務(wù)提交后觸發(fā)自定義的函數(shù),那就是在invokeAfterCommit和invokeAfterCompletion這兩個(gè)方法來選了。首先,這兩個(gè)方法都會拿到所有TransactionSynchronization的集合(其中會包括我們上述添加的TransactionSynchronizationAdapter)。但是要注意一點(diǎn):invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合還有一個(gè)int類型的參數(shù),而這個(gè)int類型的參數(shù)其實(shí)是當(dāng)前事務(wù)的一種狀態(tài)。也就是說,如果我們重寫了invokeAfterCompletion方法,我們除了能拿到集合外,還能拿到當(dāng)前事務(wù)的狀態(tài)。因此,此時(shí)我們可以根據(jù)這個(gè)狀態(tài)來做不同的事情,比如:可以在事務(wù)提交時(shí)做自定義處理,也可以在事務(wù)回滾時(shí)做自定義處理等等。

四、總結(jié)

上面有說到,我們判斷當(dāng)前是否存在事務(wù)、添加鉤子函數(shù)都是依賴線程變量的。因此,我們在使用過程中,一定要避免切換線程。否則會出現(xiàn)不生效的情況。

到此這篇關(guān)于SpringBoot實(shí)現(xiàn)事務(wù)鉤子函數(shù)的示例的文章就介紹到這了,更多相關(guān)SpringBoot 事務(wù)鉤子函數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解springboot設(shè)置cors跨域請求的兩種方式

    詳解springboot設(shè)置cors跨域請求的兩種方式

    這篇文章主要介紹了詳解springboot設(shè)置cors跨域請求的兩種方式,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-11-11
  • grpc-java?k8s下的負(fù)載均衡處理方法

    grpc-java?k8s下的負(fù)載均衡處理方法

    這篇文章主要為大家介紹了grpc-java?k8s下的負(fù)載均衡的處理方法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-02-02
  • Java線程本地變量導(dǎo)致的緩存問題解決方法

    Java線程本地變量導(dǎo)致的緩存問題解決方法

    使用緩存可以緩解大流量壓力,顯著提高程序的性能,我們在使用緩存系統(tǒng)時(shí),尤其是大并發(fā)情況下,經(jīng)常會遇到一些疑難雜癥,這篇文章主要給大家介紹了關(guān)于Java線程本地變量導(dǎo)致的緩存問題的解決方法,需要的朋友可以參考下,
    2024-08-08
  • java必學(xué)必會之線程(1)

    java必學(xué)必會之線程(1)

    java必學(xué)必會之線程第一篇,介紹了線程的基本概念、線程的創(chuàng)建和啟動,想要學(xué)好java線程的朋友一定要好好閱讀這篇文章
    2015-12-12
  • Java迭代器遍歷list的方法及代碼分析

    Java迭代器遍歷list的方法及代碼分析

    在本篇內(nèi)容里系小編給大家分享的是一篇關(guān)于Java迭代器遍歷list的方法總結(jié)內(nèi)容,有需要的朋友們可以參考學(xué)習(xí)下。
    2022-11-11
  • java自定義注解接口實(shí)現(xiàn)方案

    java自定義注解接口實(shí)現(xiàn)方案

    java注解是附加在代碼中的一些元信息,用于一些工具在編譯、運(yùn)行時(shí)進(jìn)行解析和使用,起到說明、配置的功能,本文將詳細(xì)介紹,此功能的實(shí)現(xiàn)方法
    2012-11-11
  • 深入淺出講解Java8函數(shù)式編程

    深入淺出講解Java8函數(shù)式編程

    不管是前端還是后端開發(fā)人員,學(xué)習(xí)一些函數(shù)式編程的思想和概念,對于手頭的開發(fā)工作和以后的職業(yè)發(fā)展,都是大有裨益的,下面這篇文章主要給大家介紹了關(guān)于Java8函數(shù)式編程的相關(guān)資料,需要的朋友可以參考下
    2022-01-01
  • lombok?找不到get/set方法的原因及分析

    lombok?找不到get/set方法的原因及分析

    這篇文章主要介紹了lombok?找不到get/set方法的原因及分析,具有很好的參考價(jià)值,希望對大家有所幫助。
    2022-06-06
  • Java排序算法之SleepSort排序示例

    Java排序算法之SleepSort排序示例

    這篇文章主要介紹了Java排序算法之SleepSort排序,結(jié)合實(shí)例形式分析了SleepSort排序的實(shí)現(xiàn)步驟與相關(guān)操作技巧,需要的朋友可以參考下
    2017-01-01
  • SpringBoot啟動失敗的解決方法:A component required a bean of type ‘xxxxxxx‘ that could not be found.

    SpringBoot啟動失敗的解決方法:A component required a&nb

    這篇文章主要介紹了解決SpringBoot啟動失敗:A component required a bean of type ‘xxxxxxx‘ that could not be found.,目前解決方法有兩種,一種是不注入bean的方式,另一種是使用@Component的方式,本文給大家詳細(xì)講解,需要的朋友可以參考下
    2023-02-02

最新評論