Spring Data JPA開啟批量更新時(shí)樂觀鎖失效問題的解決方法
樂觀鎖機(jī)制
什么是樂觀鎖?
樂觀鎖的基本思想是,認(rèn)為在大多數(shù)情況下,數(shù)據(jù)訪問不會(huì)導(dǎo)致沖突。因此,樂觀鎖允許多個(gè)事務(wù)同時(shí)讀取和修改相同的數(shù)據(jù),而不進(jìn)行顯式的鎖定。在提交事務(wù)之前,會(huì)檢查是否有其他事務(wù)對(duì)該數(shù)據(jù)進(jìn)行了修改。如果沒有沖突,則提交成功;如果發(fā)現(xiàn)沖突,就需要回滾并重新嘗試。
樂觀鎖通常使用版本號(hào)或時(shí)間戳來實(shí)現(xiàn)。每個(gè)數(shù)據(jù)項(xiàng)都會(huì)包含一個(gè)表示當(dāng)前版本的標(biāo)識(shí)符。在讀取數(shù)據(jù)時(shí),會(huì)將版本標(biāo)識(shí)符保存下來。在提交更新時(shí),會(huì)檢查數(shù)據(jù)的當(dāng)前版本是否與保存的版本匹配。如果匹配,則更新成功;否則,表示數(shù)據(jù)已被其他事務(wù)修改,需要處理沖突。
樂觀鎖適用于讀操作頻率較高、寫操作沖突較少的場(chǎng)景。它減少了鎖的使用,提高了并發(fā)性能,但需要處理沖突和重試的情況。
樂觀鎖是一種廣義的思想,不是某一框架或語(yǔ)言特有的。
樂觀鎖的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 增強(qiáng)吞吐量:由于在事務(wù)持續(xù)時(shí)間的大部分時(shí)間內(nèi)沒有持有鎖,因此等待時(shí)間最少,吞吐量也是最?的。
- 最小化死鎖:死鎖是一種事務(wù)無限期地等待其他人鎖定的資源的情況,這種情況的可能性要小得多,因?yàn)閿?shù)據(jù)不會(huì)長(zhǎng)時(shí)間鎖定。
- 更好的可擴(kuò)展性:隨著分布式系統(tǒng)和微服務(wù)架構(gòu)的興起,樂觀鎖在確保系統(tǒng)能夠有效擴(kuò)展而無需管理復(fù)雜鎖機(jī)制的開銷方面發(fā)揮著關(guān)鍵作用。
缺點(diǎn)
- 沖突管理開銷:在沖突頻繁的場(chǎng)景中,管理和解決沖突可能會(huì)占用大量資源。
- 復(fù)雜性:實(shí)現(xiàn)樂觀鎖需要經(jīng)過深思熟慮的設(shè)計(jì),特別是在處理失敗的事務(wù)時(shí)。
- 過時(shí)數(shù)據(jù)的可能性:由于數(shù)據(jù)在讀取時(shí)未鎖定,因此事務(wù)可能會(huì)使用過時(shí)或過時(shí)的數(shù)據(jù),如果管理不正確,可能會(huì)導(dǎo)致邏輯錯(cuò)誤或不一致。
JPA-樂觀鎖
概述
JPA(Java Persistence API)協(xié)議對(duì)樂觀鎖的操作做了規(guī)定:通過指定@Version字段對(duì)數(shù)據(jù)增加版本號(hào)控制,進(jìn)?在更新的時(shí)候判斷版本號(hào)是否有變化。如果版本沒有變化則更新成功;如果版本有變化,就會(huì)更新失敗并拋出“OptimisticLockException”異常。我們? SQL 表示?下樂觀鎖的做法,代碼如下:
SELECT uid, name, version FROM user WHERE id = 1; UPDATE user SET name = 'jack', version = version + 1 WHERE id = 1 AND version = 1;
假設(shè)本次查詢的version=1,在更新操作時(shí),只要version與上一個(gè)版本相同,就會(huì)更新成功,并且不會(huì)出現(xiàn)互相覆蓋的問題,保證了數(shù)據(jù)的原?性。
實(shí)現(xiàn)方法
JPA 協(xié)議規(guī)定,想要實(shí)現(xiàn)樂觀鎖,可以通過@Version注解標(biāo)注在某個(gè)字段上?,而此字段需要是可以持久化到DB的字段,并且只?持如下四種類型:
int或Integershort或Shortlong或Longjava.sql.Timestamp
我比較推薦使用Integer類型的字段,語(yǔ)義比較清晰、簡(jiǎn)單。
@Version的作用
該@Version注解用于啟用實(shí)體上的樂觀鎖,確保數(shù)據(jù)庫(kù)中的數(shù)據(jù)更新不會(huì)出現(xiàn)并發(fā)修改問題。當(dāng)實(shí)體中的某個(gè)字段標(biāo)記為@Version時(shí),JPA 將使用該字段來跟蹤更改并確保一次只有一個(gè)事務(wù)可以更新特定行。
注意:Spring Data JPA ??有兩個(gè)@Version注解,請(qǐng)使?@javax.persistence.Version,?不是@org.springframework.data.annotation.Version。
它是如何工作的?
每個(gè)用注解標(biāo)記的實(shí)體都@Version將由 JPA 跟蹤其版本。這是基本機(jī)制:
- 初始化:當(dāng)實(shí)體第一次被持久化(保存到數(shù)據(jù)庫(kù))時(shí),版本字段(通常是整數(shù)或時(shí)間戳)被設(shè)置為其初始值,通常為零。
- 讀取:稍后獲取實(shí)體時(shí),JPA 會(huì)從數(shù)據(jù)庫(kù)中檢索當(dāng)前版本。
- 更新:在嘗試更新或刪除實(shí)體時(shí),JPA 會(huì)根據(jù)實(shí)體的版本檢查數(shù)據(jù)庫(kù)中的當(dāng)前版本。如果版本匹配,則操作繼續(xù),并且數(shù)據(jù)庫(kù)中的版本增加(用于更新)。
- 沖突:如果版本不匹配,則表明另一個(gè)事務(wù)同時(shí)更新了實(shí)體,導(dǎo)致 JPA 拋出
OptimisticLockException。
項(xiàng)目示例
引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!-- 驅(qū)動(dòng) --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 數(shù)據(jù)庫(kù)連接池 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> </dependency>
項(xiàng)目配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false
username: root
password: root
jpa:
database: mysql
database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
show-sql: true
hibernate:
ddl-auto: update # 一般使用update
# create: 每次運(yùn)行程序時(shí),都會(huì)重新創(chuàng)建表,故而數(shù)據(jù)會(huì)丟失
# create-drop: 每次運(yùn)行程序時(shí)會(huì)先創(chuàng)建表結(jié)構(gòu),然后待程序結(jié)束時(shí)清空表
# upadte: 每次運(yùn)行程序,沒有表時(shí)會(huì)創(chuàng)建表,如果對(duì)象發(fā)生改變會(huì)更新表結(jié)構(gòu),原有數(shù)據(jù)不會(huì)清空,只會(huì)更新(推薦使用)
# validate: 運(yùn)行程序會(huì)校驗(yàn)數(shù)據(jù)與數(shù)據(jù)庫(kù)的字段類型是否相同,字段不同會(huì)報(bào)錯(cuò)
# none: 禁用DDL處理
open-in-view: false
properties:
hibernate:
jdbc: # 開啟批量更新/寫入
batch_size: 50
batch_versioned_data: true
order_inserts: true
order_updates: true
實(shí)體添加@Version
User實(shí)體增加字段version,并添加注解@Version。當(dāng)然,數(shù)據(jù)庫(kù)也要加上version字段。
@Entity
@Table(name = "TEST_USER")
public class User {
// ......
@Version
private Integer version;
// ......
}
創(chuàng)建UserInfoRepository
創(chuàng)建UserInfoRepository,?便進(jìn)?DB操作
public interface UserInfoRepository extends JpaRepository<User, Long> {}
創(chuàng)建 UserInfoService
創(chuàng)建 UserInfoService,?來模擬Service的復(fù)雜業(yè)務(wù)邏輯。
public interface UserService {
/**
* 根據(jù) UserId 產(chǎn)?的?些業(yè)務(wù)計(jì)算邏輯
*/
User calculate(Long userId);
}
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Override
@Transactional
public User calculate(Long userId) {
User user = repository.getById(userId);
// 模擬復(fù)雜的業(yè)務(wù)計(jì)算邏輯耗時(shí)操作;
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException ignored) {
}
user.setAge(user.getAge() + 1);
return userRepository.saveAndFlush(user);
}
}
其中,我們通過 @Transactional 開啟事務(wù),并且在查詢?法后?模擬復(fù)雜業(yè)務(wù)邏輯,?來呈現(xiàn)多線程的并發(fā)問題。
測(cè)試方法
@ExtendWith(SpringExtension.class)
@DataJpaTest
@ComponentScan(basePackageClasses = UserServiceImpl.class)
class UserServiceTest {
@Autowired
private UserService userService;
@Autowired
private UserRepository userRepository;
@Test
void testVersion() {
// 加?條數(shù)據(jù)
User user1 = userRepository.save(User.builder().age(20).name("zzn").build());
// 驗(yàn)證?下數(shù)據(jù)庫(kù)??的值
Assertions.assertEquals(0, user1.getVersion());
Assertions.assertEquals(20, user1.getAge());
userService.calculate(user1.getId());
// 驗(yàn)證?下更新成功的值
User user2 = userRepository.getById(user1.getId());
Assertions.assertEquals(1, user2.getVersion());
Assertions.assertEquals(21, user2.getAge());
}
@SneakyThrows
@Test
@Rollback(false)
@Transactional(propagation = Propagation.NEVER)
void testVersionException() {
// 加?條數(shù)據(jù)
userRepository.save(User.builder().age(20).name("zzn").build());
// 模擬多線程執(zhí)?兩次
new Thread(() -> userService.calculate(1L)).start();
TimeUnit.SECONDS.sleep(1L);
// 如果兩個(gè)線程同時(shí)執(zhí)?會(huì)發(fā)?樂觀鎖異常;
Exception exception = Assertions.assertThrows(ObjectOptimisticLockingFailureException.class,
() -> userService.calculate(1L));
log.info("error info:", exception);
}
}
從上?的測(cè)試得到的結(jié)果中,我們執(zhí)?testVersion(),會(huì)發(fā)現(xiàn)在 save 的時(shí)候, Version會(huì)?動(dòng) +1,第?次初始化為 0;update 的時(shí)候也會(huì)附帶 Version 條件,我們通過下圖的 SQL,也可以看到 Version 的變化。

?當(dāng)?我們調(diào)?testVersionException()測(cè)試?法的時(shí)候,利?多線程模擬兩個(gè)并發(fā)情況,會(huì)發(fā)現(xiàn)兩個(gè)線程同時(shí)取到了歷史數(shù)據(jù),并在稍后都對(duì)歷史數(shù)據(jù)進(jìn)?了更新。
由此你會(huì)發(fā)現(xiàn),第?次測(cè)試的結(jié)果是樂觀鎖異常,更新不成功。
通過?志?會(huì)發(fā)現(xiàn),兩個(gè)SQL同時(shí)更新的時(shí)候,Version是?樣的,是它導(dǎo)致了樂觀鎖異常。
注意:樂觀鎖異常不僅僅是同?個(gè)?法多線程才會(huì)出現(xiàn)的問題,我們只是為了?便測(cè)試?采?同?個(gè)?法;不同的?法、不同的項(xiàng)?,都有可能導(dǎo)致樂觀鎖異常。樂觀鎖的本質(zhì)是 SQL 層?發(fā)?的,和使?的框架、技術(shù)沒有關(guān)系。
問題描述
一句廢話:正常情況下,一切正常!
運(yùn)行環(huán)境
Java:1.8.0 SpringBoot:2.3.12.RELEASE Spring Data JPA:2.3.9.RELEASE Hibernate:5.4.32.Final Database Driver:ojdbc6 11.2.0.3 Database Platform:Oracle 10g
問題現(xiàn)象
上述代碼示例運(yùn)行在MySQL數(shù)據(jù)庫(kù)上,一切正常,但是切換到Oracle數(shù)據(jù)庫(kù)時(shí),不開啟批量更新模式時(shí),也符合預(yù)期,但是開啟批量更新模式時(shí),不符合預(yù)期:并發(fā)更新同一實(shí)體時(shí),未拋出
ObjectOptimisticLockingFailureException異常。
| 數(shù)據(jù)庫(kù)類型 | 開啟批量 | 不開啟批量 |
|---|---|---|
| Oracle | 不生效 | 生效 |
| MySQL | 生效 | 生效 |
批量模式下,樂觀鎖異常棧:
Caused by: org.hibernate.StaleStateException: Batch update returned unexpected row count from update [0]; actual row count: 0; expected: 1; statement executed: update test_user set update_time=?, version=?, remark=? where user_id=? and version=? at org.hibernate.jdbc.Expectations$BasicExpectation.checkBatched(Expectations.java:67) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.jdbc.Expectations$BasicExpectation.verifyOutcome(Expectations.java:54) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.checkRowCounts(BatchingBatch.java:151) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.performExecution(BatchingBatch.java:126) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.doExecuteBatch(BatchingBatch.java:106) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.AbstractBatchImpl.execute(AbstractBatchImpl.java:148) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.executeBatch(JdbcCoordinatorImpl.java:198) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:633) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
非批量模式下,樂觀鎖異常棧:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [com.esunny.option.domain.user.User#990] at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:2649) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3492) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:3355) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3769) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:201) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
代碼分析
從以上兩種模式下的異常棧分析代碼路徑:
org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush
org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush
org.hibernate.internal.SessionImpl.flush
org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener
org.hibernate.event.internal.DefaultFlushEventListener.onFlush
org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions
org.hibernate.engine.spi.ActionQueue.executeActions
ActionQueue.executeActions邏輯如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java
/**
* Perform all currently queued actions.
*
* @throws HibernateException error executing queued actions.
*/
public void executeActions() throws HibernateException {
if ( hasUnresolvedEntityInsertActions() ) {
throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
}
for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
ExecutableList<?> l = listProvider.get( this );
if ( l != null && !l.isEmpty() ) {
executeActions( l );
}
}
}
/**
* Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
*
* @param list The list of Executable elements to be performed
*
* @throws HibernateException
*/
private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
// todo : consider ways to improve the double iteration of Executables here:
// 1) we explicitly iterate list here to perform Executable#execute()
// 2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
try {
for ( E e : list ) {
try {
e.execute();
}
finally {
if( e.getBeforeTransactionCompletionProcess() != null ) {
if( beforeTransactionProcesses == null ) {
beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
}
beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
}
if( e.getAfterTransactionCompletionProcess() != null ) {
if( afterTransactionProcesses == null ) {
afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
}
afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
}
}
}
}
finally {
if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
// Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
// We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
// unexpected.
Set<Serializable> propertySpaces = list.getQuerySpaces();
invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
}
}
list.clear();
session.getJdbcCoordinator().executeBatch();
}
這里在for循環(huán)里頭調(diào)用了e.execute(),同時(shí)在循環(huán)之后,finally之后調(diào)用了session.getJdbcCoordinator().executeBatch()
其中,EXECUTABLE_LISTS_MAP中的Executable包括:EntityInsertAction、EntityUpdateAction、EntityDeleteAction等。
Executable.execute邏輯如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/action/internal/EntityUpdateAction.java
@Override
public void execute() throws HibernateException {
final Serializable id = getId();
final EntityPersister persister = getPersister();
final SharedSessionContractImplementor session = getSession();
final Object instance = getInstance();
final boolean veto = preUpdate();
final SessionFactoryImplementor factory = session.getFactory();
Object previousVersion = this.previousVersion;
if ( persister.isVersionPropertyGenerated() ) {
// we need to grab the version value from the entity, otherwise
// we have issues with generated-version entities that may have
// multiple actions queued during the same flush
previousVersion = persister.getVersion( instance );
}
final Object ck;
if ( persister.canWriteToCache() ) {
final EntityDataAccess cache = persister.getCacheAccessStrategy();
ck = cache.generateCacheKey(
id,
persister,
factory,
session.getTenantIdentifier()
);
lock = cache.lockItem( session, ck, previousVersion );
}
else {
ck = null;
}
if ( !veto ) {
persister.update(
id,
state,
dirtyFields,
hasDirtyCollection,
previousState,
previousVersion,
instance,
rowId,
session
);
}
final EntityEntry entry = session.getPersistenceContextInternal().getEntry( instance );
if ( entry == null ) {
throw new AssertionFailure( "possible nonthreadsafe access to session" );
}
if ( entry.getStatus()==Status.MANAGED || persister.isVersionPropertyGenerated() ) {
// get the updated snapshot of the entity state by cloning current state;
// it is safe to copy in place, since by this time no-one else (should have)
// has a reference to the array
TypeHelper.deepCopy(
state,
persister.getPropertyTypes(),
persister.getPropertyCheckability(),
state,
session
);
if ( persister.hasUpdateGeneratedProperties() ) {
// this entity defines property generation, so process those generated
// values...
persister.processUpdateGeneratedProperties( id, instance, state, session );
if ( persister.isVersionPropertyGenerated() ) {
nextVersion = Versioning.getVersion( state, persister );
}
}
// have the entity entry doAfterTransactionCompletion post-update processing, passing it the
// update state and the new version (if one).
entry.postUpdate( instance, state, nextVersion );
}
final StatisticsImplementor statistics = factory.getStatistics();
if ( persister.canWriteToCache() ) {
if ( persister.isCacheInvalidationRequired() || entry.getStatus()!= Status.MANAGED ) {
persister.getCacheAccessStrategy().remove( session, ck);
}
else if ( session.getCacheMode().isPutEnabled() ) {
//TODO: inefficient if that cache is just going to ignore the updated state!
final CacheEntry ce = persister.buildCacheEntry( instance,state, nextVersion, getSession() );
cacheEntry = persister.getCacheEntryStructure().structure( ce );
final boolean put = cacheUpdate( persister, previousVersion, ck );
if ( put && statistics.isStatisticsEnabled() ) {
statistics.entityCachePut(
StatsHelper.INSTANCE.getRootEntityRole( persister ),
getPersister().getCacheAccessStrategy().getRegion().getName()
);
}
}
}
session.getPersistenceContextInternal().getNaturalIdHelper().manageSharedNaturalIdCrossReference(
persister,
id,
state,
previousNaturalIdValues,
CachedNaturalIdValueSource.UPDATE
);
postUpdate();
if ( statistics.isStatisticsEnabled() && !veto ) {
statistics.updateEntity( getPersister().getEntityName() );
}
}
調(diào)用了persister的update方法。
AbstractEntityPersister.update
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java
public boolean update(
final Serializable id,
final Object[] fields,
final Object[] oldFields,
final Object rowId,
final boolean[] includeProperty,
final int j,
final Object oldVersion,
final Object object,
final String sql,
final SharedSessionContractImplementor session) throws HibernateException {
final Expectation expectation = Expectations.appropriateExpectation( updateResultCheckStyles[j] );
final int jdbcBatchSizeToUse = session.getConfiguredJdbcBatchSize();
// IMPLEMENTATION NOTE: If Session#saveOrUpdate or #update is used to update an entity, then
// Hibernate does not have a database snapshot of the existing entity.
// As a result, oldFields will be null.
// Don't use a batch if oldFields == null and the jth table is optional (isNullableTable( j ),
// because there is no way to know that there is actually a row to update. If the update
// was batched in this case, the batch update would fail and there is no way to fallback to
// an insert.
final boolean useBatch =
expectation.canBeBatched() &&
isBatchable() &&
jdbcBatchSizeToUse > 1 &&
( oldFields != null || !isNullableTable( j ) );
if ( useBatch && updateBatchKey == null ) {
updateBatchKey = new BasicBatchKey(
getEntityName() + "#UPDATE",
expectation
);
}
final boolean callable = isUpdateCallable( j );
final boolean useVersion = j == 0 && isVersioned();
if ( LOG.isTraceEnabled() ) {
LOG.tracev( "Updating entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
if ( useVersion ) {
LOG.tracev( "Existing version: {0} -> New version:{1}", oldVersion, fields[getVersionProperty()] );
}
}
try {
int index = 1; // starting index
final PreparedStatement update;
if ( useBatch ) {
update = session
.getJdbcCoordinator()
.getBatch( updateBatchKey )
.getBatchStatement( sql, callable );
}
else {
update = session
.getJdbcCoordinator()
.getStatementPreparer()
.prepareStatement( sql, callable );
}
try {
index += expectation.prepare( update );
//Now write the values of fields onto the prepared statement
index = dehydrate(
id,
fields,
rowId,
includeProperty,
propertyColumnUpdateable,
j,
update,
session,
index,
true
);
// Write any appropriate versioning conditional parameters
if ( useVersion && entityMetamodel.getOptimisticLockStyle().isVersion()) {
if ( checkVersion( includeProperty ) ) {
getVersionType().nullSafeSet( update, oldVersion, index, session );
}
}
else if ( isAllOrDirtyOptLocking() && oldFields != null ) {
boolean[] versionability = getPropertyVersionability(); //TODO: is this really necessary????
boolean[] includeOldField = entityMetamodel.getOptimisticLockStyle().isAll()
? getPropertyUpdateability()
: includeProperty;
Type[] types = getPropertyTypes();
for ( int i = 0; i < entityMetamodel.getPropertySpan(); i++ ) {
boolean include = includeOldField[i] &&
isPropertyOfTable( i, j ) &&
versionability[i]; //TODO: is this really necessary????
if ( include ) {
boolean[] settable = types[i].toColumnNullness( oldFields[i], getFactory() );
types[i].nullSafeSet(
update,
oldFields[i],
index,
settable,
session
);
index += ArrayHelper.countTrue( settable );
}
}
}
if ( useBatch ) {
session.getJdbcCoordinator().getBatch( updateBatchKey ).addToBatch();
return true;
}
else {
return check(
session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update ),
id,
j,
expectation,
update,
sql
);
}
}
catch (SQLException e) {
if ( useBatch ) {
session.getJdbcCoordinator().abortBatch();
}
throw e;
}
finally {
if ( !useBatch ) {
session.getJdbcCoordinator().getResourceRegistry().release( update );
session.getJdbcCoordinator().afterStatementExecution();
}
}
}
catch (SQLException e) {
throw getFactory().getSQLExceptionHelper().convert(
e,
"could not update: " + MessageHelper.infoString( this, id, getFactory() ),
sql
);
}
}
關(guān)鍵之處:
useBatch的賦值邏輯
public boolean isBatchable() {
return optimisticLockStyle().isNone()
|| !isVersioned() && optimisticLockStyle().isVersion()
|| getFactory().getSessionFactoryOptions().isJdbcBatchVersionedData();
}
1. 配置了`spring.jpa.properties.hibernate.jdbc.batch_versioned_data`為true;
2. jdbcBatchSizeToUse > 1, 即`spring.jpa.properties.hibernate.jdbc.batch_size`大于0
- 如果useBatch為true
調(diào)用session.getJdbcCoordinator().getBatch(updateBatchKey).addToBatch();
這里的updateBatchKey為com.example.domain.User#UPDATE;此處僅是將PreparedStatement放入待執(zhí)行隊(duì)列。
之后便執(zhí)行session.getJdbcCoordinator().executeBatch()邏輯;請(qǐng)看BatchingBatch.performExecution
- 如果useBatch為false
調(diào)用session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update ),并調(diào)用check方法執(zhí)行檢查。此處檢查失敗,則會(huì)拋出樂觀鎖異常!
BatchingBatch.performExecution
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java
private void performExecution() {
LOG.debugf( "Executing batch size: %s", batchPosition );
try {
for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
try {
final PreparedStatement statement = entry.getValue();
final int[] rowCounts;
try {
getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
rowCounts = statement.executeBatch();
}
finally {
getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
}
checkRowCounts( rowCounts, statement );
}
catch ( SQLException e ) {
abortBatch();
throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
}
}
}
catch ( RuntimeException re ) {
LOG.unableToExecuteBatch( re.getMessage() );
throw re;
}
finally {
batchPosition = 0;
}
}
可以看到這里調(diào)用了statement.executeBatch(),并返回了int[] rowCounts;然后調(diào)用checkRowCounts( rowCounts, statement ); >Expectations#BasicExpectation.checkBatched 此處檢查失敗,則會(huì)拋出樂觀鎖異常!
問題原因
非批量模式下,檢查執(zhí)行結(jié)果是調(diào)用的checkNonBatched方法,該方法僅檢查更新條目數(shù)是否一致:
private void checkNonBatched(int rowCount, String statementSQL) {
if ( expectedRowCount > rowCount ) {
throw new StaleStateException(
"Unexpected row count: " + rowCount + "; expected: " + expectedRowCount
+ "; statement executed: " + statementSQL
);
}
if ( expectedRowCount < rowCount ) {
String msg = "Unexpected row count: " + rowCount + "; expected: " + expectedRowCount;
throw new TooManyRowsAffectedException( msg, expectedRowCount, rowCount );
}
}
批量模式下,檢查執(zhí)行結(jié)果是調(diào)用的checkBatched方法,檢查邏輯如下:
private void checkBatched(int rowCount, int batchPosition, String statementSQL) {
if ( rowCount == -2 ) {
LOG.debugf( "Success of batch update unknown: %s", batchPosition );
}
else if ( rowCount == -3 ) {
throw new BatchFailedException( "Batch update failed: " + batchPosition );
}
else {
if ( expectedRowCount > rowCount ) {
throw new StaleStateException(
"Batch update returned unexpected row count from update ["
+ batchPosition + "]; actual row count: " + rowCount
+ "; expected: " + expectedRowCount + "; statement executed: "
+ statementSQL
);
}
if ( expectedRowCount < rowCount ) {
String msg = "Batch update returned unexpected row count from update [" +
batchPosition + "]; actual row count: " + rowCount +
"; expected: " + expectedRowCount;
throw new BatchedTooManyRowsAffectedException( msg, expectedRowCount, rowCount, batchPosition );
}
}
}
問題便在于此!
int[] executeBatch() throws SQLException
返回值說明:
① 大于或等于零的數(shù)字,表示命令已成功處理,并且是更新計(jì)數(shù),給出了數(shù)據(jù)庫(kù)中受命令影響的行數(shù)執(zhí)行;
② SUCCESS_NO_INFO ( -2)的值,表示命令處理成功,但受影響的行數(shù)未知;
③ 如果批量更新中的命令之一無法正確執(zhí)行,此方法引發(fā)BatchUpdateException,JDBC Driver可能會(huì)也可能不會(huì)繼續(xù)處理剩余的命令。但是Driver的行為是與特定的DBMS綁定的,要么總是繼續(xù)處理命令,要么從不繼續(xù)處理命令。如果驅(qū)動(dòng)程序繼續(xù)處理,方法將返回EXECUTE_FAILED(-3)。
在實(shí)際的測(cè)試過程中發(fā)現(xiàn):
| DB類型 | 是否可以返回實(shí)際影響行數(shù) | 備注 |
|---|---|---|
| MySQL | 是 | |
| Oracle | 否 | 每個(gè)數(shù)組位置值均為-2 |
在Oracle的驅(qū)動(dòng)中沒有實(shí)現(xiàn)該功能,即提交成功后不能返回影響行數(shù),所以返回-2。
Oracle驅(qū)動(dòng)源碼如下:oracle.jdbc.driver.OraclePreparedStatement#executeBatch
public int[] executeBatch() throws SQLException {
synchronized (this.connection) {
int[] arrayOfInt = new int[this.currentRank];
/* 此處省略N行代碼 */
if ((this.sqlKind != 1) && (this.sqlKind != 4)) {
for (i = 0; i < arrayOfInt.length; i++) {
arrayOfInt[i] = -2; // 關(guān)鍵看這行
}
}
this.connection.registerHeartbeat();
return arrayOfInt;
}
}
根據(jù)StackOverflow上的說法,Oracle 11g之前的版本,executeBatch方法返回的均是-2,eg.


解決方案
Hibernate對(duì)于這個(gè)問題有自己的處理辦法,就是設(shè)置一個(gè)jdbc和數(shù)據(jù)庫(kù)的連接屬性hibernate.jdbc.use_scrollable_resultset =true。
如果你想讓你的JDBC驅(qū)動(dòng)從executeBatch()返回正確的行計(jì)數(shù) , 那么將此屬性設(shè)為true(開啟這個(gè)選項(xiàng)通常是安全的). 同時(shí),Hibernate將為自動(dòng)版本化的數(shù)據(jù)使用批量DML. 默認(rèn)值為false. eg. true | false
以上就是Spring Data JPA開啟批量更新時(shí)樂觀鎖失效問題的解決方法的詳細(xì)內(nèi)容,更多關(guān)于Spring Data JPA樂觀鎖失效的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
maven創(chuàng)建spark項(xiàng)目的pom.xml文件配置demo
這篇文章主要為大家介紹了maven創(chuàng)建spark項(xiàng)目的pom.xml文件配置demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
使用注解@Recover優(yōu)化丑陋的循環(huán)詳解
我們知道在實(shí)現(xiàn)一個(gè)功能的時(shí)候是可以使用不同的代碼來實(shí)現(xiàn)的,那么相應(yīng)的不同實(shí)現(xiàn)方法的性能肯定也是有差別的,下面這篇文章主要給大家介紹了關(guān)于使用注解@Recover優(yōu)化丑陋的循環(huán)的相關(guān)資料,需要的朋友可以參考下2022-04-04
SpringBoot Data JPA 關(guān)聯(lián)表查詢的方法
這篇文章主要介紹了SpringBoot Data JPA 關(guān)聯(lián)表查詢的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-07-07
詳解SpringBoot中@PostMapping注解的用法
在SpringBoot中,我們經(jīng)常需要編寫RESTful Web服務(wù),以便于客戶端與服務(wù)器之間的通信,@PostMapping注解可以讓我們更方便地編寫POST請(qǐng)求處理方法,在本文中,我們將介紹@PostMapping注解的作用、原理,以及如何在SpringBoot應(yīng)用程序中使用它2023-06-06
Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建(圖文教程)
這篇文章主要介紹了Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建(圖文教程),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09
解決@ConfigurationProperties注解的使用及亂碼問題
這篇文章主要介紹了解決@ConfigurationProperties注解的使用及亂碼問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10

