Spring Data JPA開(kāi)啟批量更新時(shí)樂(lè)觀鎖失效問(wèn)題的解決方法
樂(lè)觀鎖機(jī)制
什么是樂(lè)觀鎖?
樂(lè)觀鎖的基本思想是,認(rèn)為在大多數(shù)情況下,數(shù)據(jù)訪問(wèn)不會(huì)導(dǎo)致沖突。因此,樂(lè)觀鎖允許多個(gè)事務(wù)同時(shí)讀取和修改相同的數(shù)據(jù),而不進(jìn)行顯式的鎖定。在提交事務(wù)
之前,會(huì)檢查是否有其他事務(wù)對(duì)該數(shù)據(jù)進(jìn)行了修改。如果沒(méi)有沖突,則提交成功;如果發(fā)現(xiàn)沖突,就需要回滾并重新嘗試。
樂(lè)觀鎖通常使用版本號(hào)或時(shí)間戳來(lái)實(shí)現(xiàn)。每個(gè)數(shù)據(jù)項(xiàng)都會(huì)包含一個(gè)表示當(dāng)前版本的標(biāo)識(shí)符。在讀取數(shù)據(jù)時(shí),會(huì)將版本標(biāo)識(shí)符保存下來(lái)。在提交更新時(shí),會(huì)檢查數(shù)據(jù)的當(dāng)前版本是否與保存的版本匹配。如果匹配,則更新成功;否則,表示數(shù)據(jù)已被其他事務(wù)修改,需要處理沖突。
樂(lè)觀鎖適用于讀操作頻率較高、寫操作沖突較少的場(chǎng)景。它減少了鎖的使用,提高了并發(fā)性能,但需要處理沖突和重試的情況。
樂(lè)觀鎖是一種廣義的思想,不是某一框架或語(yǔ)言特有的。
樂(lè)觀鎖的優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
- 增強(qiáng)吞吐量:由于在事務(wù)持續(xù)時(shí)間的大部分時(shí)間內(nèi)沒(méi)有持有鎖,因此等待時(shí)間最少,吞吐量也是最?的。
- 最小化死鎖:死鎖是一種事務(wù)無(wú)限期地等待其他人鎖定的資源的情況,這種情況的可能性要小得多,因?yàn)閿?shù)據(jù)不會(huì)長(zhǎng)時(shí)間鎖定。
- 更好的可擴(kuò)展性:隨著分布式系統(tǒng)和微服務(wù)架構(gòu)的興起,樂(lè)觀鎖在確保系統(tǒng)能夠有效擴(kuò)展而無(wú)需管理復(fù)雜鎖機(jī)制的開(kāi)銷方面發(fā)揮著關(guān)鍵作用。
缺點(diǎn)
- 沖突管理開(kāi)銷:在沖突頻繁的場(chǎng)景中,管理和解決沖突可能會(huì)占用大量資源。
- 復(fù)雜性:實(shí)現(xiàn)樂(lè)觀鎖需要經(jīng)過(guò)深思熟慮的設(shè)計(jì),特別是在處理失敗的事務(wù)時(shí)。
- 過(guò)時(shí)數(shù)據(jù)的可能性:由于數(shù)據(jù)在讀取時(shí)未鎖定,因此事務(wù)可能會(huì)使用過(guò)時(shí)或過(guò)時(shí)的數(shù)據(jù),如果管理不正確,可能會(huì)導(dǎo)致邏輯錯(cuò)誤或不一致。
JPA-樂(lè)觀鎖
概述
JPA(Java Persistence API)
協(xié)議對(duì)樂(lè)觀鎖的操作做了規(guī)定:通過(guò)指定@Version
字段對(duì)數(shù)據(jù)增加版本號(hào)控制,進(jìn)?在更新的時(shí)候判斷版本號(hào)是否有變化。如果版本沒(méi)有變化則更新成功;如果版本有變化,就會(huì)更新失敗并拋出“OptimisticLockException”
異常。我們? SQL 表示?下樂(lè)觀鎖的做法,代碼如下:
SELECT uid, name, version FROM user WHERE id = 1; UPDATE user SET name = 'jack', version = version + 1 WHERE id = 1 AND version = 1;
假設(shè)本次查詢的version=1
,在更新操作時(shí),只要version
與上一個(gè)版本相同,就會(huì)更新成功,并且不會(huì)出現(xiàn)互相覆蓋的問(wèn)題,保證了數(shù)據(jù)的原?性。
實(shí)現(xiàn)方法
JPA 協(xié)議規(guī)定,想要實(shí)現(xiàn)樂(lè)觀鎖,可以通過(guò)@Version
注解標(biāo)注在某個(gè)字段上?,而此字段需要是可以持久化到DB的字段,并且只?持如下四種類型:
int
或Integer
short
或Short
long
或Long
java.sql.Timestamp
我比較推薦使用Integer類型的字段,語(yǔ)義比較清晰、簡(jiǎn)單。
@Version的作用
該@Version
注解用于啟用實(shí)體上的樂(lè)觀鎖,確保數(shù)據(jù)庫(kù)中的數(shù)據(jù)更新不會(huì)出現(xiàn)并發(fā)修改問(wèn)題。當(dāng)實(shí)體中的某個(gè)字段標(biāo)記為@Version
時(shí),JPA 將使用該字段來(lái)跟蹤更改并確保一次只有一個(gè)事務(wù)可以更新特定行。
注意:Spring Data JPA ??有兩個(gè)@Version注解,請(qǐng)使?@javax.persistence.Version,?不是@org.springframework.data.annotation.Version。
它是如何工作的?
每個(gè)用注解標(biāo)記的實(shí)體都@Version
將由 JPA 跟蹤其版本。這是基本機(jī)制:
- 初始化:當(dāng)實(shí)體第一次被持久化(保存到數(shù)據(jù)庫(kù))時(shí),版本字段(通常是整數(shù)或時(shí)間戳)被設(shè)置為其初始值,通常為零。
- 讀取:稍后獲取實(shí)體時(shí),JPA 會(huì)從數(shù)據(jù)庫(kù)中檢索當(dāng)前版本。
- 更新:在嘗試更新或刪除實(shí)體時(shí),JPA 會(huì)根據(jù)實(shí)體的版本檢查數(shù)據(jù)庫(kù)中的當(dāng)前版本。如果版本匹配,則操作繼續(xù),并且數(shù)據(jù)庫(kù)中的版本增加(用于更新)。
- 沖突:如果版本不匹配,則表明另一個(gè)事務(wù)同時(shí)更新了實(shí)體,導(dǎo)致 JPA 拋出
OptimisticLockException
。
項(xiàng)目示例
引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!-- 驅(qū)動(dòng) --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 數(shù)據(jù)庫(kù)連接池 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> </dependency>
項(xiàng)目配置
spring: datasource: url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false username: root password: root jpa: database: mysql database-platform: org.hibernate.dialect.MySQL5InnoDBDialect show-sql: true hibernate: ddl-auto: update # 一般使用update # create: 每次運(yùn)行程序時(shí),都會(huì)重新創(chuàng)建表,故而數(shù)據(jù)會(huì)丟失 # create-drop: 每次運(yùn)行程序時(shí)會(huì)先創(chuàng)建表結(jié)構(gòu),然后待程序結(jié)束時(shí)清空表 # upadte: 每次運(yùn)行程序,沒(méi)有表時(shí)會(huì)創(chuàng)建表,如果對(duì)象發(fā)生改變會(huì)更新表結(jié)構(gòu),原有數(shù)據(jù)不會(huì)清空,只會(huì)更新(推薦使用) # validate: 運(yùn)行程序會(huì)校驗(yàn)數(shù)據(jù)與數(shù)據(jù)庫(kù)的字段類型是否相同,字段不同會(huì)報(bào)錯(cuò) # none: 禁用DDL處理 open-in-view: false properties: hibernate: jdbc: # 開(kāi)啟批量更新/寫入 batch_size: 50 batch_versioned_data: true order_inserts: true order_updates: true
實(shí)體添加@Version
User
實(shí)體增加字段version
,并添加注解@Version
。當(dāng)然,數(shù)據(jù)庫(kù)也要加上version
字段。
@Entity @Table(name = "TEST_USER") public class User { // ...... @Version private Integer version; // ...... }
創(chuàng)建UserInfoRepository
創(chuàng)建UserInfoRepository,?便進(jìn)?DB操作
public interface UserInfoRepository extends JpaRepository<User, Long> {}
創(chuàng)建 UserInfoService
創(chuàng)建 UserInfoService,?來(lái)模擬Service的復(fù)雜業(yè)務(wù)邏輯。
public interface UserService { /** * 根據(jù) UserId 產(chǎn)?的?些業(yè)務(wù)計(jì)算邏輯 */ User calculate(Long userId); } @Service public class UserServiceImpl implements UserService { @Autowired private UserRepository userRepository; @Override @Transactional public User calculate(Long userId) { User user = repository.getById(userId); // 模擬復(fù)雜的業(yè)務(wù)計(jì)算邏輯耗時(shí)操作; try { TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException ignored) { } user.setAge(user.getAge() + 1); return userRepository.saveAndFlush(user); } }
其中,我們通過(guò) @Transactional 開(kāi)啟事務(wù),并且在查詢?法后?模擬復(fù)雜業(yè)務(wù)邏輯,?來(lái)呈現(xiàn)多線程的并發(fā)問(wèn)題。
測(cè)試方法
@ExtendWith(SpringExtension.class) @DataJpaTest @ComponentScan(basePackageClasses = UserServiceImpl.class) class UserServiceTest { @Autowired private UserService userService; @Autowired private UserRepository userRepository; @Test void testVersion() { // 加?條數(shù)據(jù) User user1 = userRepository.save(User.builder().age(20).name("zzn").build()); // 驗(yàn)證?下數(shù)據(jù)庫(kù)??的值 Assertions.assertEquals(0, user1.getVersion()); Assertions.assertEquals(20, user1.getAge()); userService.calculate(user1.getId()); // 驗(yàn)證?下更新成功的值 User user2 = userRepository.getById(user1.getId()); Assertions.assertEquals(1, user2.getVersion()); Assertions.assertEquals(21, user2.getAge()); } @SneakyThrows @Test @Rollback(false) @Transactional(propagation = Propagation.NEVER) void testVersionException() { // 加?條數(shù)據(jù) userRepository.save(User.builder().age(20).name("zzn").build()); // 模擬多線程執(zhí)?兩次 new Thread(() -> userService.calculate(1L)).start(); TimeUnit.SECONDS.sleep(1L); // 如果兩個(gè)線程同時(shí)執(zhí)?會(huì)發(fā)?樂(lè)觀鎖異常; Exception exception = Assertions.assertThrows(ObjectOptimisticLockingFailureException.class, () -> userService.calculate(1L)); log.info("error info:", exception); } }
從上?的測(cè)試得到的結(jié)果中,我們執(zhí)?testVersion()
,會(huì)發(fā)現(xiàn)在 save
的時(shí)候, Version
會(huì)?動(dòng) +1,第?次初始化為 0;update 的時(shí)候也會(huì)附帶 Version
條件,我們通過(guò)下圖的 SQL,也可以看到 Version 的變化。
?當(dāng)?我們調(diào)?testVersionException()
測(cè)試?法的時(shí)候,利?多線程模擬兩個(gè)并發(fā)情況,會(huì)發(fā)現(xiàn)兩個(gè)線程同時(shí)取到了歷史數(shù)據(jù),并在稍后都對(duì)歷史數(shù)據(jù)進(jìn)?了更新。
由此你會(huì)發(fā)現(xiàn),第?次測(cè)試的結(jié)果是樂(lè)觀鎖異常,更新不成功。
通過(guò)?志?會(huì)發(fā)現(xiàn),兩個(gè)SQL同時(shí)更新的時(shí)候,Version
是?樣的,是它導(dǎo)致了樂(lè)觀鎖異常。
注意:樂(lè)觀鎖異常不僅僅是同?個(gè)?法多線程才會(huì)出現(xiàn)的問(wèn)題,我們只是為了?便測(cè)試?采?同?個(gè)?法;不同的?法、不同的項(xiàng)?,都有可能導(dǎo)致樂(lè)觀鎖異常。樂(lè)觀鎖的本質(zhì)是 SQL 層?發(fā)?的,和使?的框架、技術(shù)沒(méi)有關(guān)系。
問(wèn)題描述
一句廢話:正常情況下,一切正常!
運(yùn)行環(huán)境
Java:1.8.0 SpringBoot:2.3.12.RELEASE Spring Data JPA:2.3.9.RELEASE Hibernate:5.4.32.Final Database Driver:ojdbc6 11.2.0.3 Database Platform:Oracle 10g
問(wèn)題現(xiàn)象
上述代碼示例運(yùn)行在MySQL數(shù)據(jù)庫(kù)上,一切正常,但是切換到Oracle數(shù)據(jù)庫(kù)時(shí),不開(kāi)啟批量更新模式時(shí),也符合預(yù)期,但是開(kāi)啟批量更新模式時(shí),不符合預(yù)期:并發(fā)更新同一實(shí)體時(shí),未拋出
ObjectOptimisticLockingFailureException
異常。
數(shù)據(jù)庫(kù)類型 | 開(kāi)啟批量 | 不開(kāi)啟批量 |
---|---|---|
Oracle | 不生效 | 生效 |
MySQL | 生效 | 生效 |
批量模式下,樂(lè)觀鎖異常棧:
Caused by: org.hibernate.StaleStateException: Batch update returned unexpected row count from update [0]; actual row count: 0; expected: 1; statement executed: update test_user set update_time=?, version=?, remark=? where user_id=? and version=? at org.hibernate.jdbc.Expectations$BasicExpectation.checkBatched(Expectations.java:67) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.jdbc.Expectations$BasicExpectation.verifyOutcome(Expectations.java:54) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.checkRowCounts(BatchingBatch.java:151) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.performExecution(BatchingBatch.java:126) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.doExecuteBatch(BatchingBatch.java:106) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.AbstractBatchImpl.execute(AbstractBatchImpl.java:148) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.executeBatch(JdbcCoordinatorImpl.java:198) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:633) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
非批量模式下,樂(lè)觀鎖異常棧:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [com.esunny.option.domain.user.User#990] at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:2649) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3492) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:3355) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3769) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:201) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
代碼分析
從以上兩種模式下的異常棧分析代碼路徑:
org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush org.hibernate.internal.SessionImpl.flush org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener org.hibernate.event.internal.DefaultFlushEventListener.onFlush org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions org.hibernate.engine.spi.ActionQueue.executeActions
ActionQueue.executeActions邏輯如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java
/** * Perform all currently queued actions. * * @throws HibernateException error executing queued actions. */ public void executeActions() throws HibernateException { if ( hasUnresolvedEntityInsertActions() ) { throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." ); } for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) { ExecutableList<?> l = listProvider.get( this ); if ( l != null && !l.isEmpty() ) { executeActions( l ); } } } /** * Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list * * @param list The list of Executable elements to be performed * * @throws HibernateException */ private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException { // todo : consider ways to improve the double iteration of Executables here: // 1) we explicitly iterate list here to perform Executable#execute() // 2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces. try { for ( E e : list ) { try { e.execute(); } finally { if( e.getBeforeTransactionCompletionProcess() != null ) { if( beforeTransactionProcesses == null ) { beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); } beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess()); } if( e.getAfterTransactionCompletionProcess() != null ) { if( afterTransactionProcesses == null ) { afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); } afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess()); } } } } finally { if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) { // Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs. // We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are // unexpected. Set<Serializable> propertySpaces = list.getQuerySpaces(); invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) ); } } list.clear(); session.getJdbcCoordinator().executeBatch(); }
這里在for循環(huán)里頭調(diào)用了e.execute()
,同時(shí)在循環(huán)之后,finally之后調(diào)用了session.getJdbcCoordinator().executeBatch()
其中,EXECUTABLE_LISTS_MAP
中的Executable
包括:EntityInsertAction
、EntityUpdateAction
、EntityDeleteAction
等。
Executable.execute邏輯如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/action/internal/EntityUpdateAction.java
@Override public void execute() throws HibernateException { final Serializable id = getId(); final EntityPersister persister = getPersister(); final SharedSessionContractImplementor session = getSession(); final Object instance = getInstance(); final boolean veto = preUpdate(); final SessionFactoryImplementor factory = session.getFactory(); Object previousVersion = this.previousVersion; if ( persister.isVersionPropertyGenerated() ) { // we need to grab the version value from the entity, otherwise // we have issues with generated-version entities that may have // multiple actions queued during the same flush previousVersion = persister.getVersion( instance ); } final Object ck; if ( persister.canWriteToCache() ) { final EntityDataAccess cache = persister.getCacheAccessStrategy(); ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() ); lock = cache.lockItem( session, ck, previousVersion ); } else { ck = null; } if ( !veto ) { persister.update( id, state, dirtyFields, hasDirtyCollection, previousState, previousVersion, instance, rowId, session ); } final EntityEntry entry = session.getPersistenceContextInternal().getEntry( instance ); if ( entry == null ) { throw new AssertionFailure( "possible nonthreadsafe access to session" ); } if ( entry.getStatus()==Status.MANAGED || persister.isVersionPropertyGenerated() ) { // get the updated snapshot of the entity state by cloning current state; // it is safe to copy in place, since by this time no-one else (should have) // has a reference to the array TypeHelper.deepCopy( state, persister.getPropertyTypes(), persister.getPropertyCheckability(), state, session ); if ( persister.hasUpdateGeneratedProperties() ) { // this entity defines property generation, so process those generated // values... persister.processUpdateGeneratedProperties( id, instance, state, session ); if ( persister.isVersionPropertyGenerated() ) { nextVersion = Versioning.getVersion( state, persister ); } } // have the entity entry doAfterTransactionCompletion post-update processing, passing it the // update state and the new version (if one). entry.postUpdate( instance, state, nextVersion ); } final StatisticsImplementor statistics = factory.getStatistics(); if ( persister.canWriteToCache() ) { if ( persister.isCacheInvalidationRequired() || entry.getStatus()!= Status.MANAGED ) { persister.getCacheAccessStrategy().remove( session, ck); } else if ( session.getCacheMode().isPutEnabled() ) { //TODO: inefficient if that cache is just going to ignore the updated state! final CacheEntry ce = persister.buildCacheEntry( instance,state, nextVersion, getSession() ); cacheEntry = persister.getCacheEntryStructure().structure( ce ); final boolean put = cacheUpdate( persister, previousVersion, ck ); if ( put && statistics.isStatisticsEnabled() ) { statistics.entityCachePut( StatsHelper.INSTANCE.getRootEntityRole( persister ), getPersister().getCacheAccessStrategy().getRegion().getName() ); } } } session.getPersistenceContextInternal().getNaturalIdHelper().manageSharedNaturalIdCrossReference( persister, id, state, previousNaturalIdValues, CachedNaturalIdValueSource.UPDATE ); postUpdate(); if ( statistics.isStatisticsEnabled() && !veto ) { statistics.updateEntity( getPersister().getEntityName() ); } }
調(diào)用了persister的update方法。
AbstractEntityPersister.update
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java
public boolean update( final Serializable id, final Object[] fields, final Object[] oldFields, final Object rowId, final boolean[] includeProperty, final int j, final Object oldVersion, final Object object, final String sql, final SharedSessionContractImplementor session) throws HibernateException { final Expectation expectation = Expectations.appropriateExpectation( updateResultCheckStyles[j] ); final int jdbcBatchSizeToUse = session.getConfiguredJdbcBatchSize(); // IMPLEMENTATION NOTE: If Session#saveOrUpdate or #update is used to update an entity, then // Hibernate does not have a database snapshot of the existing entity. // As a result, oldFields will be null. // Don't use a batch if oldFields == null and the jth table is optional (isNullableTable( j ), // because there is no way to know that there is actually a row to update. If the update // was batched in this case, the batch update would fail and there is no way to fallback to // an insert. final boolean useBatch = expectation.canBeBatched() && isBatchable() && jdbcBatchSizeToUse > 1 && ( oldFields != null || !isNullableTable( j ) ); if ( useBatch && updateBatchKey == null ) { updateBatchKey = new BasicBatchKey( getEntityName() + "#UPDATE", expectation ); } final boolean callable = isUpdateCallable( j ); final boolean useVersion = j == 0 && isVersioned(); if ( LOG.isTraceEnabled() ) { LOG.tracev( "Updating entity: {0}", MessageHelper.infoString( this, id, getFactory() ) ); if ( useVersion ) { LOG.tracev( "Existing version: {0} -> New version:{1}", oldVersion, fields[getVersionProperty()] ); } } try { int index = 1; // starting index final PreparedStatement update; if ( useBatch ) { update = session .getJdbcCoordinator() .getBatch( updateBatchKey ) .getBatchStatement( sql, callable ); } else { update = session .getJdbcCoordinator() .getStatementPreparer() .prepareStatement( sql, callable ); } try { index += expectation.prepare( update ); //Now write the values of fields onto the prepared statement index = dehydrate( id, fields, rowId, includeProperty, propertyColumnUpdateable, j, update, session, index, true ); // Write any appropriate versioning conditional parameters if ( useVersion && entityMetamodel.getOptimisticLockStyle().isVersion()) { if ( checkVersion( includeProperty ) ) { getVersionType().nullSafeSet( update, oldVersion, index, session ); } } else if ( isAllOrDirtyOptLocking() && oldFields != null ) { boolean[] versionability = getPropertyVersionability(); //TODO: is this really necessary???? boolean[] includeOldField = entityMetamodel.getOptimisticLockStyle().isAll() ? getPropertyUpdateability() : includeProperty; Type[] types = getPropertyTypes(); for ( int i = 0; i < entityMetamodel.getPropertySpan(); i++ ) { boolean include = includeOldField[i] && isPropertyOfTable( i, j ) && versionability[i]; //TODO: is this really necessary???? if ( include ) { boolean[] settable = types[i].toColumnNullness( oldFields[i], getFactory() ); types[i].nullSafeSet( update, oldFields[i], index, settable, session ); index += ArrayHelper.countTrue( settable ); } } } if ( useBatch ) { session.getJdbcCoordinator().getBatch( updateBatchKey ).addToBatch(); return true; } else { return check( session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update ), id, j, expectation, update, sql ); } } catch (SQLException e) { if ( useBatch ) { session.getJdbcCoordinator().abortBatch(); } throw e; } finally { if ( !useBatch ) { session.getJdbcCoordinator().getResourceRegistry().release( update ); session.getJdbcCoordinator().afterStatementExecution(); } } } catch (SQLException e) { throw getFactory().getSQLExceptionHelper().convert( e, "could not update: " + MessageHelper.infoString( this, id, getFactory() ), sql ); } }
關(guān)鍵之處:
useBatch的賦值邏輯
public boolean isBatchable() { return optimisticLockStyle().isNone() || !isVersioned() && optimisticLockStyle().isVersion() || getFactory().getSessionFactoryOptions().isJdbcBatchVersionedData(); } 1. 配置了`spring.jpa.properties.hibernate.jdbc.batch_versioned_data`為true; 2. jdbcBatchSizeToUse > 1, 即`spring.jpa.properties.hibernate.jdbc.batch_size`大于0
- 如果useBatch為true
調(diào)用session.getJdbcCoordinator().getBatch(updateBatchKey).addToBatch();
這里的updateBatchKey
為com.example.domain.User#UPDATE
;此處僅是將PreparedStatement
放入待執(zhí)行隊(duì)列。
之后便執(zhí)行session.getJdbcCoordinator().executeBatch()
邏輯;請(qǐng)看BatchingBatch.performExecution
- 如果useBatch為false
調(diào)用session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update )
,并調(diào)用check
方法執(zhí)行檢查。此處檢查失敗,則會(huì)拋出樂(lè)觀鎖異常!
BatchingBatch.performExecution
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java
private void performExecution() { LOG.debugf( "Executing batch size: %s", batchPosition ); try { for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) { try { final PreparedStatement statement = entry.getValue(); final int[] rowCounts; try { getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart(); rowCounts = statement.executeBatch(); } finally { getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd(); } checkRowCounts( rowCounts, statement ); } catch ( SQLException e ) { abortBatch(); throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() ); } } } catch ( RuntimeException re ) { LOG.unableToExecuteBatch( re.getMessage() ); throw re; } finally { batchPosition = 0; } }
可以看到這里調(diào)用了statement.executeBatch(),并返回了int[] rowCounts;然后調(diào)用checkRowCounts( rowCounts, statement ); >Expectations#BasicExpectation.checkBatched 此處檢查失敗,則會(huì)拋出樂(lè)觀鎖異常!
問(wèn)題原因
非批量模式下,檢查執(zhí)行結(jié)果是調(diào)用的checkNonBatched
方法,該方法僅檢查更新條目數(shù)是否一致:
private void checkNonBatched(int rowCount, String statementSQL) { if ( expectedRowCount > rowCount ) { throw new StaleStateException( "Unexpected row count: " + rowCount + "; expected: " + expectedRowCount + "; statement executed: " + statementSQL ); } if ( expectedRowCount < rowCount ) { String msg = "Unexpected row count: " + rowCount + "; expected: " + expectedRowCount; throw new TooManyRowsAffectedException( msg, expectedRowCount, rowCount ); } }
批量模式下,檢查執(zhí)行結(jié)果是調(diào)用的checkBatched
方法,檢查邏輯如下:
private void checkBatched(int rowCount, int batchPosition, String statementSQL) { if ( rowCount == -2 ) { LOG.debugf( "Success of batch update unknown: %s", batchPosition ); } else if ( rowCount == -3 ) { throw new BatchFailedException( "Batch update failed: " + batchPosition ); } else { if ( expectedRowCount > rowCount ) { throw new StaleStateException( "Batch update returned unexpected row count from update [" + batchPosition + "]; actual row count: " + rowCount + "; expected: " + expectedRowCount + "; statement executed: " + statementSQL ); } if ( expectedRowCount < rowCount ) { String msg = "Batch update returned unexpected row count from update [" + batchPosition + "]; actual row count: " + rowCount + "; expected: " + expectedRowCount; throw new BatchedTooManyRowsAffectedException( msg, expectedRowCount, rowCount, batchPosition ); } } }
問(wèn)題便在于此!
int[] executeBatch() throws SQLException
返回值說(shuō)明:
① 大于或等于零的數(shù)字,表示命令已成功處理,并且是更新計(jì)數(shù),給出了數(shù)據(jù)庫(kù)中受命令影響的行數(shù)執(zhí)行;
② SUCCESS_NO_INFO ( -2)的值,表示命令處理成功,但受影響的行數(shù)未知;
③ 如果批量更新中的命令之一無(wú)法正確執(zhí)行,此方法引發(fā)BatchUpdateException,JDBC Driver可能會(huì)也可能不會(huì)繼續(xù)處理剩余的命令。但是Driver的行為是與特定的DBMS綁定的,要么總是繼續(xù)處理命令,要么從不繼續(xù)處理命令。如果驅(qū)動(dòng)程序繼續(xù)處理,方法將返回EXECUTE_FAILED(-3)。
在實(shí)際的測(cè)試過(guò)程中發(fā)現(xiàn):
DB類型 | 是否可以返回實(shí)際影響行數(shù) | 備注 |
---|---|---|
MySQL | 是 | |
Oracle | 否 | 每個(gè)數(shù)組位置值均為-2 |
在Oracle的驅(qū)動(dòng)中沒(méi)有實(shí)現(xiàn)該功能,即提交成功后不能返回影響行數(shù),所以返回-2。
Oracle驅(qū)動(dòng)源碼如下:oracle.jdbc.driver.OraclePreparedStatement#executeBatch
public int[] executeBatch() throws SQLException { synchronized (this.connection) { int[] arrayOfInt = new int[this.currentRank]; /* 此處省略N行代碼 */ if ((this.sqlKind != 1) && (this.sqlKind != 4)) { for (i = 0; i < arrayOfInt.length; i++) { arrayOfInt[i] = -2; // 關(guān)鍵看這行 } } this.connection.registerHeartbeat(); return arrayOfInt; } }
根據(jù)StackOverflow
上的說(shuō)法,Oracle 11g
之前的版本,executeBatch
方法返回的均是-2,eg.
解決方案
Hibernate對(duì)于這個(gè)問(wèn)題有自己的處理辦法,就是設(shè)置一個(gè)jdbc和數(shù)據(jù)庫(kù)的連接屬性hibernate.jdbc.use_scrollable_resultset =true
。
如果你想讓你的JDBC驅(qū)動(dòng)從executeBatch()
返回正確的行計(jì)數(shù) , 那么將此屬性設(shè)為true(開(kāi)啟這個(gè)選項(xiàng)通常是安全的). 同時(shí),Hibernate將為自動(dòng)版本化的數(shù)據(jù)使用批量DML. 默認(rèn)值為false. eg. true | false
以上就是Spring Data JPA開(kāi)啟批量更新時(shí)樂(lè)觀鎖失效問(wèn)題的解決方法的詳細(xì)內(nèi)容,更多關(guān)于Spring Data JPA樂(lè)觀鎖失效的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
maven創(chuàng)建spark項(xiàng)目的pom.xml文件配置demo
這篇文章主要為大家介紹了maven創(chuàng)建spark項(xiàng)目的pom.xml文件配置demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05使用注解@Recover優(yōu)化丑陋的循環(huán)詳解
我們知道在實(shí)現(xiàn)一個(gè)功能的時(shí)候是可以使用不同的代碼來(lái)實(shí)現(xiàn)的,那么相應(yīng)的不同實(shí)現(xiàn)方法的性能肯定也是有差別的,下面這篇文章主要給大家介紹了關(guān)于使用注解@Recover優(yōu)化丑陋的循環(huán)的相關(guān)資料,需要的朋友可以參考下2022-04-04SpringBoot Data JPA 關(guān)聯(lián)表查詢的方法
這篇文章主要介紹了SpringBoot Data JPA 關(guān)聯(lián)表查詢的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07詳解SpringBoot中@PostMapping注解的用法
在SpringBoot中,我們經(jīng)常需要編寫RESTful Web服務(wù),以便于客戶端與服務(wù)器之間的通信,@PostMapping注解可以讓我們更方便地編寫POST請(qǐng)求處理方法,在本文中,我們將介紹@PostMapping注解的作用、原理,以及如何在SpringBoot應(yīng)用程序中使用它2023-06-06Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建(圖文教程)
這篇文章主要介紹了Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建(圖文教程),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09解決@ConfigurationProperties注解的使用及亂碼問(wèn)題
這篇文章主要介紹了解決@ConfigurationProperties注解的使用及亂碼問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10