多數(shù)據(jù)源如何實(shí)現(xiàn)事務(wù)管理
一. 原理-編程式事務(wù)管理
想直接看實(shí)現(xiàn)的朋友,這部分可以直接跳過(guò)
1.核心接口
Spring 中對(duì)事務(wù)的處理,涉及到三個(gè)核心接口:
- PlatformTransactionManager
- TransactionDefinition
- TransactionStatus
這三個(gè)核心類是Spring處理事務(wù)的核心類。
1.1 PlatformTransactionManager
public interface PlatformTransactionManager {
TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException;
void commit(TransactionStatus var1) throws TransactionException;
void rollback(TransactionStatus var1) throws TransactionException;
}可以看到 PlatformTransactionManager 中定義了基本的事務(wù)操作方法,這些事務(wù)操作方法都是平臺(tái)無(wú)關(guān)的,具體的實(shí)現(xiàn)都是由不同的子類來(lái)實(shí)現(xiàn)的。PlatformTransactionManager 中主要有如下三個(gè)方法:
getTransaction()- getTransaction() 是根據(jù)傳入的 TransactionDefinition 獲取一個(gè)事務(wù)對(duì)象,TransactionDefinition 中定義了一些事務(wù)的基本規(guī)則,例如傳播性、隔離級(jí)別等。
commit()- commit() 方法用來(lái)提交事務(wù)。
rollback()- rollback() 方法用來(lái)回滾事務(wù)。
1.2 TransactionDefinition

可以看到一共有五個(gè)方法:
- getIsolationLevel(),獲取事務(wù)的隔離級(jí)別
- getName(),獲取事務(wù)的名稱
- getPropagationBehavio(),獲取事務(wù)的傳播性
- getTimeout(),獲取事務(wù)的超時(shí)時(shí)間
- isReadOnly(),獲取事務(wù)是否是只讀事務(wù)
我們可以從這些個(gè)方法中, 很直觀的感受到這個(gè)類的作用:設(shè)置事務(wù)的屬性。接下來(lái),我重點(diǎn)結(jié)束幾個(gè)屬性的意義。
隔離級(jí)別 IsolationLevel
常用狀態(tài)分析:
ReadUncommitted
- 表示:未提交讀。當(dāng)事務(wù)A更新某條數(shù)據(jù)的時(shí)候,不容許其他事務(wù)來(lái)更新該數(shù)據(jù),但可以進(jìn)行讀取操作
ReadCommitted
- 表示:提交讀。當(dāng)事務(wù)A更新數(shù)據(jù)時(shí),不容許其他事務(wù)進(jìn)行任何的操作包括讀取,但事務(wù)A讀取時(shí),其他事務(wù)可以進(jìn)行讀取、更新。
- 在Read Committed隔離級(jí)別下,一個(gè)事務(wù)只能讀取到已經(jīng)提交的數(shù)據(jù)。當(dāng)多個(gè)事務(wù)同時(shí)讀取同一數(shù)據(jù)時(shí),如果有其他事務(wù)正在對(duì)該數(shù)據(jù)進(jìn)行修改但尚未提交,則讀取操作將等待,直到修改完成并提交后才能讀取到最新的數(shù)據(jù)。
- 這意味著在Read Committed級(jí)別下,事務(wù)讀取的數(shù)據(jù)是實(shí)時(shí)更新的,但可能會(huì)出現(xiàn)不一致的讀取結(jié)果。
RepeatableRead
- 表示:重復(fù)讀。當(dāng)事務(wù)A更新數(shù)據(jù)時(shí),不容許其他事務(wù)進(jìn)行任何的操作,但是當(dāng)事務(wù)A進(jìn)行讀取的時(shí)候,其他事務(wù)只能讀取,不能更新。
- 在Repeatable Read隔離級(jí)別下,一個(gè)事務(wù)在開(kāi)始讀取數(shù)據(jù)后,任何其他事務(wù)對(duì)該數(shù)據(jù)的修改都不會(huì)被讀取到,即使這些修改已經(jīng)提交。事務(wù)在整個(gè)過(guò)程中都能看到一致的數(shù)據(jù)快照。
- 這意味著在Repeatable Read級(jí)別下,事務(wù)讀取的數(shù)據(jù)是一致的,不會(huì)受到其他并發(fā)事務(wù)的修改的影響,確保了事務(wù)的獨(dú)立性和穩(wěn)定性。
Serializable
- 表示:序列化。
- 最嚴(yán)格的隔離級(jí)別,當(dāng)然并發(fā)性也是最差的,事務(wù)必須依次進(jìn)行。
讀取現(xiàn)象:
通過(guò)一些現(xiàn)象,可以反映出隔離級(jí)別的效果。這些現(xiàn)象有:
- 更新丟失(lost update):當(dāng)系統(tǒng)允許兩個(gè)事務(wù)同時(shí)更新同一數(shù)據(jù)時(shí),發(fā)生更新丟失。
臟讀(dirty read):當(dāng)一個(gè)事務(wù)讀取另一個(gè)事務(wù)尚未提交的修改時(shí),產(chǎn)生臟讀。
比如:事務(wù)B執(zhí)行過(guò)程中修改了數(shù)據(jù)X,在未提交前,事務(wù)A讀取了X,而事務(wù)B卻回滾了,這樣事務(wù)A就形成了臟讀。
不重復(fù)讀(nonrepeatable read):同一查詢?cè)谕皇聞?wù)中多次進(jìn)行,由于其他提交事務(wù)所做的修改或刪除,每次返回不同的結(jié)果集,此時(shí)發(fā)生非重復(fù)讀。(A transaction rereads data it has previously read and finds that another committed transaction has modified or deleted the data. )。
比如:事務(wù)A首先讀取了一條數(shù)據(jù),然后執(zhí)行邏輯的時(shí)候,事務(wù)B將這條數(shù)據(jù)改變了,然后事務(wù)A再次讀取的時(shí)候,發(fā)現(xiàn)數(shù)據(jù)不匹配了,就是所謂的不可重復(fù)讀了。
幻讀(phantom read):同一查詢?cè)谕皇聞?wù)中多次進(jìn)行,由于其他提交事務(wù)所做的插入操作,每次返回不同的結(jié)果集,此時(shí)發(fā)生幻像讀。(A transaction reexecutes a query returning a set of rows that satisfies a search condition and finds that another committed transaction has inserted additional rows that satisfy the condition. )。
比如:事務(wù)A首先根據(jù)條件索引得到N條數(shù)據(jù),然后事務(wù)B改變了這N條數(shù)據(jù)之外的M條或者增添了M條符合事務(wù)A搜索條件的數(shù)據(jù),導(dǎo)致事務(wù)A再次搜索發(fā)現(xiàn)有N+M條數(shù)據(jù)了,就產(chǎn)生了幻讀。
不可重復(fù)讀和幻讀比較:
兩者有些相似,但是前者針對(duì)的是update或delete,后者針對(duì)的insert。
- 為什么會(huì)出現(xiàn)“臟讀”?因?yàn)闆](méi)有“select”操作沒(méi)有規(guī)矩。
- 為什么會(huì)出現(xiàn)“不可重復(fù)讀”?因?yàn)?ldquo;update”操作沒(méi)有規(guī)矩。
- 為什么會(huì)出現(xiàn)“幻讀”?因?yàn)?ldquo;insert”和“delete”操作沒(méi)有規(guī)矩。
隔離級(jí)別與讀取現(xiàn)象:
| 隔離級(jí)別 | 臟讀 Dirty Read | 不可重復(fù)讀取 NonRepeatable Read | 幻讀 Phantom Read |
|---|---|---|---|
| 未授權(quán)讀取/未提交讀 read uncommitted | 可能發(fā)生 | 可能發(fā)生 | 可能發(fā)生 |
| 授權(quán)讀取/提交讀 read committed | - | 可能發(fā)生 | 可能發(fā)生 |
| 重復(fù)讀 read repeatable | - | - | 可能發(fā)生 |
| 序列化 serializable | - | - | - |
常見(jiàn)數(shù)據(jù)庫(kù)默認(rèn)隔離級(jí)別:
| 數(shù)據(jù)庫(kù) | 默認(rèn)隔離級(jí)別 |
|---|---|
| Oracle | read committed |
| SqlServer | read committed |
| MySQL(InnoDB) | Read-Repeatable |
傳播性 Propagation
- 1、PROPAGATION_REQUIRED:如果當(dāng)前沒(méi)有事務(wù),就創(chuàng)建一個(gè)新事務(wù),如果當(dāng)前存在事務(wù),就加入該事務(wù),該設(shè)置是最常用的設(shè)置。
- 2、PROPAGATION_NESTED:如果當(dāng)前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒(méi)有事務(wù),則執(zhí)行與PROPAGATION_REQUIRED類似的操作
- 3、PROPAGATION_SUPPORTS:支持當(dāng)前事務(wù),如果當(dāng)前存在事務(wù),就加入該事務(wù),如果當(dāng)前不存在事務(wù),就以非事務(wù)執(zhí)行。‘
- 4、PROPAGATION_MANDATORY:支持當(dāng)前事務(wù),如果當(dāng)前存在事務(wù),就加入該事務(wù),如果當(dāng)前不存在事務(wù),就拋出異常。
- 5、PROPAGATION_REQUIRES_NEW:支持當(dāng)前事務(wù),創(chuàng)建新事務(wù),無(wú)論當(dāng)前存不存在事務(wù),都創(chuàng)建新事務(wù)。
- 6、PROPAGATION_NOT_SUPPORTED:以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起。
- 7、PROPAGATION_NEVER:以非事務(wù)方式執(zhí)行,如果當(dāng)前存在事務(wù),則拋出異常。
1.3 TransactionStatus

- isNewTransaction() 方法獲取當(dāng)前事務(wù)是否是一個(gè)新事務(wù)。
- hasSavepoint() 方法判斷是否存在 savePoint(),即是否已創(chuàng)建為基于保存點(diǎn)的嵌套事務(wù)。此方法主要用于診斷目的,與isNewTransaction()一起使用。對(duì)于自定義保存點(diǎn)的編程處理,請(qǐng)使用SavepointManager提供的操作。
- setRollbackOnly() 方法設(shè)置事務(wù)必須回滾。
- isRollbackOnly() 方法獲取事務(wù)只能回滾。
- flush() 方法將底層會(huì)話中的修改刷新到數(shù)據(jù)庫(kù),一般用于 Hibernate/JPA 的會(huì)話,對(duì)如 JDBC 類型的事務(wù)無(wú)任何影響。
- isCompleted() 方法用來(lái)獲取是一個(gè)事務(wù)是否結(jié)束,即是否已提交或回滾。
表示事務(wù)的狀態(tài)。
事務(wù)代碼可以使用它來(lái)檢索狀態(tài)信息,并以編程方式請(qǐng)求回滾(而不是引發(fā)導(dǎo)致隱式回滾的異常)。
二、實(shí)現(xiàn)跨數(shù)據(jù)源事務(wù)
先說(shuō)一下兩階段提交:首先多個(gè)數(shù)據(jù)源的事務(wù)分別都開(kāi)起來(lái),然后各事務(wù)分別去執(zhí)行對(duì)應(yīng)的sql(此所謂第一階段提交),最后如果都成功就把事務(wù)全部提交,只要有一個(gè)失敗就把事務(wù)都回滾——此所謂第二階段提交。
Transactional注解只能指定一個(gè)數(shù)據(jù)源的事務(wù)管理器。我們重新定義一個(gè),讓它支持指定多個(gè)數(shù)據(jù)源的事務(wù)管理器,然后我們?cè)谑褂昧诉@個(gè)注解的方法前后進(jìn)行所謂的兩階段協(xié)議,而這可以通過(guò)AOP來(lái)完成。所以,代碼如下:
定義注解
/**
* 多數(shù)據(jù)源事務(wù)注解
*
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MultiDataSourceTransactional {
/**
* 事務(wù)管理器數(shù)組
*/
String[] transactionManagers();
}定義切面
我們使用Spring的Aspect來(lái)完成切面。
先來(lái)回顧一下它的切入點(diǎn)
- @Before: 標(biāo)識(shí)一個(gè)前置增強(qiáng)方法,相當(dāng)于BeforeAdvice的功能。
- @After: 后置增強(qiáng),不管是拋出異?;蛘哒M顺龆紩?huì)執(zhí)行。
- @AfterReturning: 后置增強(qiáng),似于AfterReturningAdvice, 方法正常退出時(shí)執(zhí)行。
- @AfterThrowing: 異常拋出增強(qiáng),相當(dāng)于ThrowsAdvice。
- @Around: 環(huán)繞增強(qiáng),相當(dāng)于MethodInterceptor。
咋一看,@Around是可以的:ProceedingJoinPoint的proceed方法是執(zhí)行目標(biāo)方法,在它前面聲明事務(wù),try…catch…一下如果有異常就回滾沒(méi)異常就提交。不過(guò),最開(kāi)始用這個(gè)的時(shí)候,好像發(fā)現(xiàn)有點(diǎn)問(wèn)題,具體記不住了,大家可以試一下。
因?yàn)楫?dāng)時(shí)工期緊沒(méi)仔細(xì)研究,就采用了下面這種
@Before + @AfterReturning + @AfterThrowing組合,看名字和功能簡(jiǎn)直是完美契合?。〉怯幸粋€(gè)問(wèn)題,不同方法怎么共享那個(gè)事務(wù)呢?成員變量?對(duì),沒(méi)錯(cuò)。但是又有線程安全問(wèn)題咋辦?ThreadLocal幫你解決(_)。
/**
* 編程式事務(wù) 基本過(guò)程:
* 1.TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
* 首先通過(guò)transactionManager.getTransaction方法獲取事務(wù)對(duì)象status,
* 2.transactionManager.commit(status);
* 然后在try塊中執(zhí)行數(shù)據(jù)庫(kù)操作,最后通過(guò)transactionManager.commit(status)提交事務(wù)。
* 3.transactionManager.rollback(status);
* 如果在數(shù)據(jù)庫(kù)操作中發(fā)生了異常,則會(huì)通過(guò)transactionManager.rollback(status)回滾事務(wù)。
*
*/
@Component
@Aspect
public class MultiDataSourceTransactionAspect implements ApplicationContextAware {
/**
* 線程本地變量:為什么使用棧?※為了達(dá)到后進(jìn)先出的效果※
*/
private static final ThreadLocal<Stack<Pair<DataSourceTransactionManager, TransactionStatus>>> THREAD_LOCAL = new ThreadLocal<>();
/**
* 事務(wù)聲明
*/
private DefaultTransactionDefinition def = new DefaultTransactionDefinition();
{
// 非只讀模式
def.setReadOnly(false);
//事務(wù)隔離級(jí)別 采用數(shù)據(jù)庫(kù)默認(rèn)的
def.setIsolationLevel(DefaultTransactionDefinition.ISOLATION_DEFAULT);
//事務(wù)傳播行為 - 創(chuàng)建一個(gè)新的事務(wù),并在該事務(wù)中執(zhí)行;如果當(dāng)前存在事務(wù),則將當(dāng)前事務(wù)掛起。
def.setPropagationBehavior(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
}
/**
* implements ApplicationContextAware ==> 獲取spring容器
*/
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 設(shè)置切入點(diǎn)
*/
@Pointcut("@annotation(com.example.mutidatasource.dataSource.multiDataSourceTransaction.MultiDataSourceTransactional)")
public void pointcut(){}
/**
* 聲明事務(wù)
*
* 冗余了,pointcut() 、 @annotation(transactional) 語(yǔ)義一致
* 但是 不加上這個(gè)@annotation(transactional) 報(bào)錯(cuò), before() 方法期望有一個(gè) MultiDataSourceTransactional 類型的參數(shù),
* 雖然pointcut() 確定了有這個(gè)注解,但是編輯器不知道啊
*/
@Before("pointcut() && @annotation(transactional)")
public void before(MultiDataSourceTransactional transactional){
//根據(jù)設(shè)置的事務(wù)名稱按順序聲明,并放到ThreadLocal里
String[] TransactionalNames = transactional.value();
Stack<Pair<DataSourceTransactionManager, TransactionStatus>> pairStack = new Stack<>();
for (String transactionalName : TransactionalNames) {
// 從容器中獲取 數(shù)據(jù)庫(kù)事務(wù)管理器
DataSourceTransactionManager manager = applicationContext.getBean(transactionalName, DataSourceTransactionManager.class);
TransactionStatus status = manager.getTransaction(def);
pairStack.push(new Pair<>(manager,status));
}
THREAD_LOCAL.set(pairStack);
}
/**
* 提交事務(wù)
*
* @AfterReturning 和 @After 都是 Spring AOP 框架提供的注解,用于在方法執(zhí)行后執(zhí)行某些操作。但是,它們之間有一些關(guān)鍵的區(qū)別:
*
* @AfterReturning 僅在方法正常返回時(shí)執(zhí)行,而 @After 總是在方法執(zhí)行后執(zhí)行,無(wú)論方法是否正常返回。
* @AfterReturning 可以訪問(wèn)方法的返回值,而 @After 則不能。
* @AfterReturning 可以通過(guò) returning 屬性指定要訪問(wèn)的返回值的名稱,而 @After 則不能。
*/
@AfterReturning("pointcut()")
public void afterReturning(){
// ※棧頂彈出(后進(jìn)先出)
Stack<Pair<DataSourceTransactionManager, TransactionStatus>> pairStack = THREAD_LOCAL.get();
while (!pairStack.empty()) {
Pair<DataSourceTransactionManager, TransactionStatus> pair = pairStack.pop();
// 提交事務(wù) transactionManager.commit(status);
pair.getKey().commit(pair.getValue());
}
THREAD_LOCAL.remove();
}
/**
* 回滾事務(wù)
*
*
*/
@AfterThrowing("pointcut()")
public void afterThrowing(){
// ※棧頂彈出(后進(jìn)先出)
Stack<Pair<DataSourceTransactionManager, TransactionStatus>> pairStack = THREAD_LOCAL.get();
while (!pairStack.empty()) {
Pair<DataSourceTransactionManager, TransactionStatus> pair = pairStack.pop();
// 提交事務(wù) transactionManager.commit(status);
pair.getKey().rollback(pair.getValue());
}
THREAD_LOCAL.remove();
}
}使用
/**
* 測(cè)試多數(shù)據(jù)源事務(wù)
*/
@MultiDataSourceTransactional(transactionManagers={"ATransactionManager","BTransactionManager"})
public void testTransaction() {
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring?Boot?整合RocketMq實(shí)現(xiàn)消息過(guò)濾功能
這篇文章主要介紹了Spring?Boot?整合RocketMq實(shí)現(xiàn)消息過(guò)濾,本文講解了RocketMQ實(shí)現(xiàn)消息過(guò)濾,針對(duì)不同的業(yè)務(wù)場(chǎng)景選擇合適的方案即可,需要的朋友可以參考下2022-06-06
Java servlet執(zhí)行流程代碼實(shí)例
這篇文章主要介紹了Java servlet執(zhí)行流程代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
Java中正則表達(dá)式的語(yǔ)法以及matches方法的使用方法
正則表達(dá)式(Regular Expression)是一門(mén)簡(jiǎn)單語(yǔ)言的語(yǔ)法規(guī)范,是強(qiáng)大、便捷、高效的文本處理工具,這篇文章主要給大家介紹了關(guān)于Java中正則表達(dá)式的語(yǔ)法以及matches方法的使用方法,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-05-05
spring @Lazy延遲注入的邏輯實(shí)現(xiàn)
有時(shí)候我們會(huì)在屬性注入的時(shí)候添加@Lazy注解實(shí)現(xiàn)延遲注入,今天咱們通過(guò)閱讀源碼來(lái)分析下原因,感興趣的可以了解一下2021-08-08
SpringCloud Ribbon負(fù)載均衡工具使用
Ribbon是Netflix的組件之一,負(fù)責(zé)注冊(cè)中心的負(fù)載均衡,有助于控制HTTP和TCP客戶端行為。Spring?Cloud?Netflix?Ribbon一般配合Ribbon進(jìn)行使用,利用在Eureka中讀取的服務(wù)信息,在調(diào)用服務(wù)節(jié)點(diǎn)時(shí)合理進(jìn)行負(fù)載2023-02-02
Java深入講解AWT實(shí)現(xiàn)事件處理流程
AWT的事件處理是一種委派式事件處理方式:普通組件(事件源)將整個(gè)事件處理委托給特定的對(duì)象(事件監(jiān)聽(tīng)器);當(dāng)該事件源發(fā)生指定的事件時(shí),就通知所委托的事件監(jiān)聽(tīng)器,由事件監(jiān)聽(tīng)器來(lái)處理這個(gè)事件2022-04-04
java.lang.UnsupportedOperationException分析及解決辦法
日常開(kāi)發(fā)中我遇到j(luò)ava.lang.UnsupportedOperationException:異常兩次了,下面這篇文章主要給對(duì)大家介紹了關(guān)于java.lang.UnsupportedOperationException分析及解決辦法,需要的朋友可以參考下2024-03-03
java中如何實(shí)現(xiàn) zip rar 7z 壓縮包解壓
這篇文章主要介紹了java中如何實(shí)現(xiàn) zip rar 7z 壓縮包解壓?jiǎn)栴},具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07

