Java @GlobalLock注解詳細(xì)分析講解
GlobalLock的作用
對(duì)于某條數(shù)據(jù)進(jìn)行更新操作,如果全局事務(wù)正在進(jìn)行,當(dāng)某個(gè)本地事務(wù)需要更新該數(shù)據(jù)時(shí),需要使用@GlobalLock確保其不會(huì)對(duì)全局事務(wù)正在操作的數(shù)據(jù)進(jìn)行修改。防止的本地事務(wù)對(duì)全局事務(wù)的數(shù)據(jù)臟寫(xiě)。如果和select for update組合使用,還可以起到防止臟讀的效果。
全局鎖
首先我們知道,seata的AT模式是二段提交的,而且AT模式能夠做到事務(wù)ACID四種特性中的A原子性和D持久性,默認(rèn)情況下隔離級(jí)別也只能保證在讀未提交
那么為了保證原子性,在全局事務(wù)未提交之前,其中被修改的數(shù)據(jù)會(huì)被加上全局鎖,保證不再會(huì)被其他全局事務(wù)修改。
為什么要使用GlobalLock
但是全局鎖僅僅能防止全局事務(wù)對(duì)一個(gè)上鎖的數(shù)據(jù)再次進(jìn)行修改,在很多業(yè)務(wù)場(chǎng)景中我們是沒(méi)有跨系統(tǒng)的rpc調(diào)用的,通常是不會(huì)加分布式事務(wù)的。
例如有分布式事務(wù)執(zhí)行完畢A系統(tǒng)的業(yè)務(wù)邏輯,正在繼續(xù)執(zhí)行B系統(tǒng)邏輯,并且A系統(tǒng)事務(wù)已經(jīng)提交。此時(shí)A系統(tǒng)一個(gè)本地的spring事務(wù)去與分布式事務(wù)修改同一行數(shù)據(jù),是可以正常修改的
由于本地的spring事務(wù)并不受seata的全局鎖控制容易導(dǎo)致臟寫(xiě),即全局事務(wù)修改數(shù)據(jù)后,還未提交,數(shù)據(jù)又被本地事務(wù)改掉了。這很容易發(fā)生數(shù)據(jù)出錯(cuò)的問(wèn)題,而且十分有可能導(dǎo)致全局事務(wù)回滾時(shí)發(fā)現(xiàn) 數(shù)據(jù)已經(jīng)dirty(與uodoLog中的beforeImage不同)。那么就會(huì)回滾失敗,進(jìn)而導(dǎo)致全局鎖無(wú)法釋放,后續(xù)的操作無(wú)法進(jìn)行下去。也是比較嚴(yán)重的問(wèn)題。
一種解決辦法就是,針對(duì)所有相關(guān)操作都加上AT全局事務(wù),但這顯然是沒(méi)必要的,因?yàn)槿质聞?wù)意味者需要與seata-server進(jìn)行通信,創(chuàng)建全局事務(wù),注冊(cè)分支事務(wù),記錄undoLog,判斷鎖沖突,注冊(cè)鎖。
那么對(duì)于不需要跨系統(tǒng),跨庫(kù)的的業(yè)務(wù)來(lái)說(shuō),使用GlobalTransactional實(shí)在是有點(diǎn)浪費(fèi)了
那么更加輕量的GlobalLock就能夠發(fā)揮作用了,其只需要判斷本地的修改是否與全局鎖沖突就夠了
工作原理
加上@GlobalLock之后,會(huì)進(jìn)入切面
io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
進(jìn)而進(jìn)入這個(gè)方法,處理全局鎖
Object handleGlobalLock(final MethodInvocation methodInvocation,
final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
}進(jìn)入execute方法
public Object execute(GlobalLockExecutor executor) throws Throwable {
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if (!alreadyInGlobalLock) {
RootContext.bindGlobalLockFlag();
}
// set my config to config holder so that it can be access in further execution
// for example, LockRetryController can access it with config holder
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
return executor.execute();
} finally {
// only unbind when this is the root caller.
// otherwise, the outer caller would lose global lock flag
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
// if previous config is not null, we need to set it back
// so that the outer logic can still use their config
if (previousConfig != null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
GlobalLockConfigHolder.remove();
}
}
}
}先判斷當(dāng)前是否已經(jīng)在globalLock范圍之內(nèi),如果已經(jīng)在范圍之內(nèi),那么把上層的配置取出來(lái),用新的配置替換,并在方法執(zhí)行完畢時(shí)候,釋放鎖,或者將配置替換成之前的上層配置
如果開(kāi)啟全局鎖,會(huì)在threadLocal put一個(gè)標(biāo)記
//just put something not null CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
開(kāi)始執(zhí)行業(yè)務(wù)方法
那么加上相關(guān)GlobalLock標(biāo)記的和普通方法的區(qū)別在哪里?
我們都知道,seata會(huì)對(duì)數(shù)據(jù)庫(kù)連接做代理,在生成PreparedStatement時(shí)會(huì)進(jìn)入
io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}這里顯然不會(huì)進(jìn)入AT模式的邏輯,那么直接通過(guò)真正的數(shù)據(jù)庫(kù)連接,生成PreparedStatement,再使用PreparedStatementProxy進(jìn)行包裝,代理增強(qiáng)
在使用PreparedStatementProxy執(zhí)行sql時(shí),會(huì)進(jìn)入seata定義的一些邏輯
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
最終來(lái)到
io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object…)
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}如果當(dāng)前線程不需要鎖并且不不在AT模式的分支事務(wù)下,直接使用原生的preparedStatement執(zhí)行就好了
這里四種操作,通過(guò)不同的接口去執(zhí)行,接口又有多種不同的數(shù)據(jù)庫(kù)類(lèi)型實(shí)現(xiàn)
插入分為不同的數(shù)據(jù)庫(kù)類(lèi)型,通過(guò)spi獲取

seata提供了三種數(shù)據(jù)庫(kù)的實(shí)現(xiàn),
update,delete,select三種沒(méi)有多個(gè)實(shí)現(xiàn)類(lèi)
他們?cè)趫?zhí)行時(shí)都會(huì)執(zhí)行父類(lèi)的方法
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}全局鎖的策略, 是在一個(gè)while(true)循環(huán)里不斷執(zhí)行
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) {
onException(lockConflict);
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}如果出現(xiàn)異常是LockConflictException,進(jìn)入sleep
public void sleep(Exception e) throws LockWaitTimeoutException {
if (--lockRetryTimes < 0) {
throw new LockWaitTimeoutException("Global lock wait timeout", e);
}
try {
Thread.sleep(lockRetryInternal);
} catch (InterruptedException ignore) {
}
}這兩個(gè)變量就是@GlobalLock注解的兩個(gè)配置,一個(gè)是重試次數(shù),一個(gè)重試之間的間隔時(shí)間。
繼續(xù)就是執(zhí)行數(shù)據(jù)庫(kù)更新操作
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse
發(fā)現(xiàn)這里也會(huì)生成,undoLog,beforeImage和afterImage,其實(shí)想想,在GlobalLock下,是沒(méi)必要生成undoLog的。但是現(xiàn)有邏輯確實(shí)要生成,這個(gè)seata后續(xù)應(yīng)該會(huì)優(yōu)化。
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}生成beforeImage和aferImage的邏輯也比較簡(jiǎn)單。分別在執(zhí)行更新前,查詢(xún)數(shù)據(jù)庫(kù),和更新后查詢(xún)數(shù)據(jù)庫(kù)
可見(jiàn)記錄undoLog是十分影響性能的,查詢(xún)就多了兩次,如果undoLog入庫(kù)還要再多一次入庫(kù)操作。
再看prepareUndoLog
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
if (beforeImage.getRows().size() != afterImage.getRows().size()) {
throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
}
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
}將lockKeys,和undoLog,暫時(shí)記錄在connectionProxy中,也就是說(shuō)至此還沒(méi)有將uodoLog記錄到數(shù)據(jù)庫(kù),也沒(méi)有判斷全局鎖,這些事情都留到了事務(wù)提交
io.seata.rm.datasource.ConnectionProxy#doCommit
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}進(jìn)入io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks
這個(gè) 方法很簡(jiǎn)單就是首先進(jìn)行鎖的檢查,并沒(méi)有我想象中的加索全局事務(wù)。
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}也就是說(shuō),使用GlobalLock會(huì)對(duì)全局鎖檢測(cè),但是并不會(huì)對(duì)記錄加全局鎖。但是配合全局事務(wù)這樣已經(jīng)能夠保證全局事務(wù)的原子性了??梢?jiàn)GlobalLock還是要和本地事務(wù)組合一起使用的,這樣才能保證,GlobalLock執(zhí)行完畢本地事務(wù)未提交的數(shù)據(jù)不會(huì)被別的本地事務(wù)/分布式事務(wù)修改掉。
到此這篇關(guān)于Java @GlobalLock注解詳細(xì)分析講解的文章就介紹到這了,更多相關(guān)Java @GlobalLock內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
ConcurrentModificationException日志關(guān)鍵字報(bào)警思考分析
本文將記錄和分析日志中的ConcurrentModificationException關(guān)鍵字報(bào)警,還有一些我的思考,有需要的朋友可以借鑒參考下,希望能夠有所幫助2023-12-12
Java實(shí)現(xiàn)時(shí)間日期格式轉(zhuǎn)換示例
本篇文章主要介紹了ava實(shí)現(xiàn)時(shí)間日期格式轉(zhuǎn)換示例,實(shí)現(xiàn)了各種時(shí)間輸出的類(lèi)型,有興趣的可以了解一下。2017-01-01
Collections.shuffle()方法實(shí)例解析
這篇文章主要介紹了Collections.shuffle()方法實(shí)例解析,分享了相關(guān)代碼示例,小編覺(jué)得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01
Spring?Data?JPA實(shí)現(xiàn)查詢(xún)結(jié)果返回map或自定義的實(shí)體類(lèi)
這篇文章主要介紹了Spring?Data?JPA實(shí)現(xiàn)查詢(xún)結(jié)果返回map或自定義的實(shí)體類(lèi),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
springboot配置多個(gè)數(shù)據(jù)源兩種方式實(shí)現(xiàn)
在我們的實(shí)際業(yè)務(wù)中可能會(huì)遇到;在一個(gè)項(xiàng)目里面讀取多個(gè)數(shù)據(jù)庫(kù)的數(shù)據(jù)來(lái)進(jìn)行展示,spring對(duì)同時(shí)配置多個(gè)數(shù)據(jù)源是支持的,本文主要介紹了springboot配置多個(gè)數(shù)據(jù)源兩種方式實(shí)現(xiàn),感興趣的可以了解一下2022-03-03
Java Selenium實(shí)現(xiàn)多窗口切換的示例代碼
這篇文章主要介紹了Java Selenium實(shí)現(xiàn)多窗口切換的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09

