欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java @GlobalLock注解詳細(xì)分析講解

 更新時間:2022年11月20日 10:57:47   作者:氵奄不死的魚  
這篇文章主要介紹了Java @GlobalLock注解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧

GlobalLock的作用

對于某條數(shù)據(jù)進(jìn)行更新操作,如果全局事務(wù)正在進(jìn)行,當(dāng)某個本地事務(wù)需要更新該數(shù)據(jù)時,需要使用@GlobalLock確保其不會對全局事務(wù)正在操作的數(shù)據(jù)進(jìn)行修改。防止的本地事務(wù)對全局事務(wù)的數(shù)據(jù)臟寫。如果和select for update組合使用,還可以起到防止臟讀的效果。

全局鎖

首先我們知道,seata的AT模式是二段提交的,而且AT模式能夠做到事務(wù)ACID四種特性中的A原子性和D持久性,默認(rèn)情況下隔離級別也只能保證在讀未提交

那么為了保證原子性,在全局事務(wù)未提交之前,其中被修改的數(shù)據(jù)會被加上全局鎖,保證不再會被其他全局事務(wù)修改。

為什么要使用GlobalLock

但是全局鎖僅僅能防止全局事務(wù)對一個上鎖的數(shù)據(jù)再次進(jìn)行修改,在很多業(yè)務(wù)場景中我們是沒有跨系統(tǒng)的rpc調(diào)用的,通常是不會加分布式事務(wù)的。

例如有分布式事務(wù)執(zhí)行完畢A系統(tǒng)的業(yè)務(wù)邏輯,正在繼續(xù)執(zhí)行B系統(tǒng)邏輯,并且A系統(tǒng)事務(wù)已經(jīng)提交。此時A系統(tǒng)一個本地的spring事務(wù)去與分布式事務(wù)修改同一行數(shù)據(jù),是可以正常修改的

由于本地的spring事務(wù)并不受seata的全局鎖控制容易導(dǎo)致臟寫,即全局事務(wù)修改數(shù)據(jù)后,還未提交,數(shù)據(jù)又被本地事務(wù)改掉了。這很容易發(fā)生數(shù)據(jù)出錯的問題,而且十分有可能導(dǎo)致全局事務(wù)回滾時發(fā)現(xiàn) 數(shù)據(jù)已經(jīng)dirty(與uodoLog中的beforeImage不同)。那么就會回滾失敗,進(jìn)而導(dǎo)致全局鎖無法釋放,后續(xù)的操作無法進(jìn)行下去。也是比較嚴(yán)重的問題。

一種解決辦法就是,針對所有相關(guān)操作都加上AT全局事務(wù),但這顯然是沒必要的,因?yàn)槿质聞?wù)意味者需要與seata-server進(jìn)行通信,創(chuàng)建全局事務(wù),注冊分支事務(wù),記錄undoLog,判斷鎖沖突,注冊鎖。

那么對于不需要跨系統(tǒng),跨庫的的業(yè)務(wù)來說,使用GlobalTransactional實(shí)在是有點(diǎn)浪費(fèi)了

那么更加輕量的GlobalLock就能夠發(fā)揮作用了,其只需要判斷本地的修改是否與全局鎖沖突就夠了

工作原理

加上@GlobalLock之后,會進(jìn)入切面

io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

進(jìn)而進(jìn)入這個方法,處理全局鎖

    Object handleGlobalLock(final MethodInvocation methodInvocation,
        final GlobalLock globalLockAnno) throws Throwable {
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });
    }

進(jìn)入execute方法

public Object execute(GlobalLockExecutor executor) throws Throwable {
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        if (!alreadyInGlobalLock) {
            RootContext.bindGlobalLockFlag();
        }
        // set my config to config holder so that it can be access in further execution
        // for example, LockRetryController can access it with config holder
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
        try {
            return executor.execute();
        } finally {
            // only unbind when this is the root caller.
            // otherwise, the outer caller would lose global lock flag
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }
            // if previous config is not null, we need to set it back
            // so that the outer logic can still use their config
            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }
}

先判斷當(dāng)前是否已經(jīng)在globalLock范圍之內(nèi),如果已經(jīng)在范圍之內(nèi),那么把上層的配置取出來,用新的配置替換,并在方法執(zhí)行完畢時候,釋放鎖,或者將配置替換成之前的上層配置

如果開啟全局鎖,會在threadLocal put一個標(biāo)記

    //just put something not null
CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);

開始執(zhí)行業(yè)務(wù)方法

那么加上相關(guān)GlobalLock標(biāo)記的和普通方法的區(qū)別在哪里?

我們都知道,seata會對數(shù)據(jù)庫連接做代理,在生成PreparedStatement時會進(jìn)入

io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)

  @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        String dbType = getDbType();
        // support oracle 10.2+
        PreparedStatement targetPreparedStatement = null;
        if (BranchType.AT == RootContext.getBranchType()) {
            List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
            if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                    TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                            sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                    String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                    tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                    targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                }
            }
        }
        if (targetPreparedStatement == null) {
            targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        }
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

這里顯然不會進(jìn)入AT模式的邏輯,那么直接通過真正的數(shù)據(jù)庫連接,生成PreparedStatement,再使用PreparedStatementProxy進(jìn)行包裝,代理增強(qiáng)

在使用PreparedStatementProxy執(zhí)行sql時,會進(jìn)入seata定義的一些邏輯

 public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }

最終來到

io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object…)

   public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    dbType);
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }

如果當(dāng)前線程不需要鎖并且不不在AT模式的分支事務(wù)下,直接使用原生的preparedStatement執(zhí)行就好了

這里四種操作,通過不同的接口去執(zhí)行,接口又有多種不同的數(shù)據(jù)庫類型實(shí)現(xiàn)

插入分為不同的數(shù)據(jù)庫類型,通過spi獲取

seata提供了三種數(shù)據(jù)庫的實(shí)現(xiàn),

update,delete,select三種沒有多個實(shí)現(xiàn)類

他們在執(zhí)行時都會執(zhí)行父類的方法

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue

   protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.changeAutoCommit();
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }

全局鎖的策略, 是在一個while(true)循環(huán)里不斷執(zhí)行

 protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                    return callable.call();
                } catch (LockConflictException lockConflict) {
                    onException(lockConflict);
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    onException(e);
                    throw e;
                }
            }
        }

如果出現(xiàn)異常是LockConflictException,進(jìn)入sleep

public void sleep(Exception e) throws LockWaitTimeoutException {
        if (--lockRetryTimes < 0) {
            throw new LockWaitTimeoutException("Global lock wait timeout", e);
        }
        try {
            Thread.sleep(lockRetryInternal);
        } catch (InterruptedException ignore) {
        }
    }

這兩個變量就是@GlobalLock注解的兩個配置,一個是重試次數(shù),一個重試之間的間隔時間。

繼續(xù)就是執(zhí)行數(shù)據(jù)庫更新操作

io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse

發(fā)現(xiàn)這里也會生成,undoLog,beforeImage和afterImage,其實(shí)想想,在GlobalLock下,是沒必要生成undoLog的。但是現(xiàn)有邏輯確實(shí)要生成,這個seata后續(xù)應(yīng)該會優(yōu)化。

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

生成beforeImage和aferImage的邏輯也比較簡單。分別在執(zhí)行更新前,查詢數(shù)據(jù)庫,和更新后查詢數(shù)據(jù)庫

可見記錄undoLog是十分影響性能的,查詢就多了兩次,如果undoLog入庫還要再多一次入庫操作。

再看prepareUndoLog

 protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
            return;
        }
        if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
            if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
            }
        }
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        if (null != lockKeys) {
            connectionProxy.appendLockKey(lockKeys);

            SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }

將lockKeys,和undoLog,暫時記錄在connectionProxy中,也就是說至此還沒有將uodoLog記錄到數(shù)據(jù)庫,也沒有判斷全局鎖,這些事情都留到了事務(wù)提交

io.seata.rm.datasource.ConnectionProxy#doCommit

 private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

進(jìn)入io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks

這個 方法很簡單就是首先進(jìn)行鎖的檢查,并沒有我想象中的加索全局事務(wù)。

 private void processLocalCommitWithGlobalLocks() throws SQLException {
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

也就是說,使用GlobalLock會對全局鎖檢測,但是并不會對記錄加全局鎖。但是配合全局事務(wù)這樣已經(jīng)能夠保證全局事務(wù)的原子性了??梢奊lobalLock還是要和本地事務(wù)組合一起使用的,這樣才能保證,GlobalLock執(zhí)行完畢本地事務(wù)未提交的數(shù)據(jù)不會被別的本地事務(wù)/分布式事務(wù)修改掉。

到此這篇關(guān)于Java @GlobalLock注解詳細(xì)分析講解的文章就介紹到這了,更多相關(guān)Java @GlobalLock內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何使用JJWT及JWT講解和工具類

    如何使用JJWT及JWT講解和工具類

    關(guān)于JWT的文章網(wǎng)上已經(jīng)多如牛毛了,但是相信很多同學(xué)學(xué)的還是云里霧里,所以在我學(xué)習(xí)JWT之后盡量用最簡潔的描述寫下這篇文章用于日后復(fù)習(xí),與此同時也希望可以幫助同學(xué)們共同進(jìn)步
    2021-09-09
  • 解析Java并發(fā)Exchanger的使用

    解析Java并發(fā)Exchanger的使用

    Exchanger是java 5引入的并發(fā)類,Exchanger顧名思義就是用來做交換的。這里主要是兩個線程之間交換持有的對象。當(dāng)Exchanger在一個線程中調(diào)用exchange方法之后,會等待另外的線程調(diào)用同樣的exchange方法。兩個線程都調(diào)用exchange方法之后,傳入的參數(shù)就會交換。
    2021-06-06
  • ConcurrentModificationException日志關(guān)鍵字報(bào)警思考分析

    ConcurrentModificationException日志關(guān)鍵字報(bào)警思考分析

    本文將記錄和分析日志中的ConcurrentModificationException關(guān)鍵字報(bào)警,還有一些我的思考,有需要的朋友可以借鑒參考下,希望能夠有所幫助
    2023-12-12
  • Java實(shí)現(xiàn)時間日期格式轉(zhuǎn)換示例

    Java實(shí)現(xiàn)時間日期格式轉(zhuǎn)換示例

    本篇文章主要介紹了ava實(shí)現(xiàn)時間日期格式轉(zhuǎn)換示例,實(shí)現(xiàn)了各種時間輸出的類型,有興趣的可以了解一下。
    2017-01-01
  • 輕松掌握J(rèn)ava代理模式

    輕松掌握J(rèn)ava代理模式

    這篇文章主要幫助大家輕松掌握J(rèn)ava代理模式,什么是靜態(tài)代理?感興趣的小伙伴們可以參考一下
    2016-09-09
  • Collections.shuffle()方法實(shí)例解析

    Collections.shuffle()方法實(shí)例解析

    這篇文章主要介紹了Collections.shuffle()方法實(shí)例解析,分享了相關(guān)代碼示例,小編覺得還是挺不錯的,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • Spring?Data?JPA實(shí)現(xiàn)查詢結(jié)果返回map或自定義的實(shí)體類

    Spring?Data?JPA實(shí)現(xiàn)查詢結(jié)果返回map或自定義的實(shí)體類

    這篇文章主要介紹了Spring?Data?JPA實(shí)現(xiàn)查詢結(jié)果返回map或自定義的實(shí)體類,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • 詳解Spring Boot使用Maven自定義打包方式

    詳解Spring Boot使用Maven自定義打包方式

    這篇文章主要介紹了Spring Boot使用Maven自定義打包方式,本文通過多種方式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12
  • springboot配置多個數(shù)據(jù)源兩種方式實(shí)現(xiàn)

    springboot配置多個數(shù)據(jù)源兩種方式實(shí)現(xiàn)

    在我們的實(shí)際業(yè)務(wù)中可能會遇到;在一個項(xiàng)目里面讀取多個數(shù)據(jù)庫的數(shù)據(jù)來進(jìn)行展示,spring對同時配置多個數(shù)據(jù)源是支持的,本文主要介紹了springboot配置多個數(shù)據(jù)源兩種方式實(shí)現(xiàn),感興趣的可以了解一下
    2022-03-03
  • Java Selenium實(shí)現(xiàn)多窗口切換的示例代碼

    Java Selenium實(shí)現(xiàn)多窗口切換的示例代碼

    這篇文章主要介紹了Java Selenium實(shí)現(xiàn)多窗口切換的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09

最新評論