SpringBoot 多數據源及事務解決方案小結
1. 背景
一個主庫和N個應用庫的數據源,并且會同時操作主庫和應用庫的數據,需要解決以下兩個問題:
如何動態(tài)管理多個數據源以及切換?
如何保證多數據源場景下的數據一致性(事務)?
本文主要探討這兩個問題的解決方案,希望能對讀者有一定的啟發(fā)。
2. 數據源切換原理
通過擴展Spring提供的抽象類AbstractRoutingDataSource
,可以實現切換數據源。其類結構如下圖所示:
targetDataSources&defaultTargetDataSource
項目上需要使用的所有數據源和默認數據源。
resolvedDataSources&resolvedDefaultDataSource
當Spring容器創(chuàng)建AbstractRoutingDataSource
對象時,通過調用afterPropertiesSet
復制上述目標數據源。由此可見,一旦數據源實例對象創(chuàng)建完畢,業(yè)務無法再添加新的數據源。
determineCurrentLookupKey
此方法為抽象方法,通過擴展這個方法來實現數據源的切換。目標數據源的結構為:Map<Object, DataSource>
其key為lookup key
。
我們來看官方對這個方法的注釋:
lookup key通常是綁定在線程上下文中,根據這個key去resolvedDataSources
中取出DataSource。
根據目標數據源的管理方式不同,可以使用基于配置文件和數據庫表兩種方式?;谂渲梦募芾矸桨笩o法后續(xù)添加新的數據源,而基于數據庫表方案管理,則更加靈活。關注公z號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優(yōu)手冊!
3. 配置文件解決方案
根據上面的分析,我們可以按照下面的步驟去實現:
定義
DynamicDataSource
類繼承AbstractRoutingDataSource
,重寫determineCurrentLookupKey()
方法。配置多個數據源注入
targetDataSources
和defaultTargetDataSource
,通過afterPropertiesSet()
方法將數據源寫入resolvedDataSources
和resolvedDefaultDataSource
。調用
AbstractRoutingDataSource
的getConnection()
方法時,determineTargetDataSource()
方法返回DataSource
執(zhí)行底層的getConnection()
。
其流程如下圖所示:
3.1 創(chuàng)建數據源
DynamicDataSource
數據源的注入,目前業(yè)界主流實現步驟如下:
在配置文件中定義數據源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driverClassName=com.mysql.jdbc.Driver # 主數據源 spring.datasource.druid.master.url=jdbcUrl spring.datasource.druid.master.username=*** spring.datasource.druid.master.password=*** # 其他數據源 spring.datasource.druid.second.url=jdbcUrl spring.datasource.druid.second.username=*** spring.datasource.druid.second.password=***
在代碼中配置Bean
@Configuration public class DynamicDataSourceConfig { @Bean @ConfigurationProperties("spring.datasource.druid.master") public DataSource firstDataSource(){ return DruidDataSourceBuilder.create().build(); } @Bean @ConfigurationProperties("spring.datasource.druid.second") public DataSource secondDataSource(){ return DruidDataSourceBuilder.create().build(); } @Bean @Primary public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(5); targetDataSources.put(DataSourceNames.FIRST, firstDataSource); targetDataSources.put(DataSourceNames.SECOND, secondDataSource); return new DynamicDataSource(firstDataSource, targetDataSources); } }
3.2 AOP處理
通過DataSourceAspect
切面技術來簡化業(yè)務上的使用,只需要在業(yè)務方法添加@SwitchDataSource
注解即可完成動態(tài)切換:
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface SwitchDataSource { String value(); }
DataSourceAspect
攔截業(yè)務方法,更新當前線程上下文DataSourceContextHolder
中存儲的key,即可實現數據源切換。
3.3 方案不足
基于AbstractRoutingDataSource
的多數據源動態(tài)切換,有個明顯的缺點,無法動態(tài)添加和刪除數據源。在我們的產品中,不能把應用數據源寫死在配置文件。接下來分享一下基于數據庫表的實現方案。
4. 數據庫表解決方案
我們需要實現可視化的數據源管理,并實時查看數據源的運行狀態(tài)。所以我們不能把數據源全部配置在文件中,應該將數據源定義保存到數據庫表。參考AbstractRoutingDataSource
的設計思路,實現自定義數據源管理。
4.1 設計數據源表
主庫的數據源信息仍然配置在項目配置文件中,應用庫數據源配置參數,則設計對應的數據表。表結構如下所示:
這個表主要就是DataSource
的相關配置參數,其相應的ORM操作代碼在此不再贅述,主要是實現數據源的增刪改查操作。
4.2 自定義數據源管理
4.2.1 定義管理接口
通過繼承AbstractDataSource
即可實現DynamicDataSource
。為了方便對數據源進行操作,我們定義一個接口DataSourceManager
,為業(yè)務提供操作數據源的統(tǒng)一接口。
public interface DataSourceManager { void put(String var1, DataSource var2); DataSource get(String var1); Boolean hasDataSource(String var1); void remove(String var1); void closeDataSource(String var1); Collection<DataSource> all(); }
該接口主要是對數據表中定義的數據源,提供基礎管理功能。
4.2.2 自定義數據源
DynamicDataSource
的實現如下圖所示:
根據前面的分析,AbstractRoutingDataSource
是在容器啟動的時候,執(zhí)行afterPropertiesSet
注入數據源對象,完成之后無法對數據源進行修改。DynamicDataSource
則實現DataSourceManager
接口,可以將數據表中的數據源加載到dataSources。
4.2.3 切面處理
這一塊的處理跟配置文件數據源方案處理方式相同,都是通過AOP技術切換lookup key。
public DataSource determineTargetDataSource() { String lookupKey = DataSourceContextHolder.getKey(); DataSource dataSource = Optional.ofNullable(lookupKey) .map(dataSources::get) .orElse(defaultDataSource); if (dataSource == null) { throw new IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]"); } return dataSource; }
4.2.4 管理數據源狀態(tài)
在項目啟動的時候,加載數據表中的所有數據源,并執(zhí)行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder
類,根據數據源表的定義創(chuàng)建DataSource。在項目運行過程中,可以使用定時任務對數據源進行?;睿瑸榱颂嵘阅茉偬砑右粚泳彺?。
AbstractRoutingDataSource
只支持單庫事務,切換數據源是在開啟事務之前執(zhí)行。Spring使用 DataSourceTransactionManager
進行事務管理。開啟事務,會將數據源緩存到DataSourceTransactionObject
對象中,后續(xù)的commit和 rollback事務操作實際上是使用的同一個數據源。
如何解決切庫事務問題?借助Spring的聲明式事務處理,我們可以在多次切庫操作時強制開啟新的事務:
@SwitchDataSource @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
這樣的話,執(zhí)行切庫操作的時候強制啟動新事務,便可實現多次切庫而且事務能夠生效。但是這種事務方式,存在數據一致性問題:
假若ServiceB正常執(zhí)行提交事務,接著返回ServiceA執(zhí)行并且發(fā)生異常。因為兩次處理是不同的事務,ServiceA這個事務執(zhí)行回滾,而ServiceA事務已經提交。這樣的話,數據就不一致了。接下來,我們主要討論如何解決多庫的事務問題。
6. 多庫事務處理
6.1 關于事務的理解
首先有必要理解事務的本質。
1.提到Spring事務,就離不開事務的四大特性和隔離級別、七大傳播特性。
事務特性和離級別是屬于數據庫范疇。Spring事務的七大傳播特性是什么呢?它是Spring在當前線程內,處理多個事務操作時的事務應用策略,數據庫事務本身并不存在傳播特性。
2.Spring事務的定義包括:begin、commit、rollback、close、suspend、resume等動作。
begin(事務開始): 可以認為存在于數據庫的命令中,比如Mysql的
start transaction
命令,但是在JDBC編程方式中不存在。close(事務關閉):Spring事務的close()方法,是把
Connection
對象歸還給數據庫連接池,與事務無關。suspend(事務掛起):Spring中事務掛起的語義是:需要新事務時,將現有的
Connection
保存起來(還有尚未提交的事務),然后創(chuàng)建新的Connection2
,Connection2
提交、回滾、關閉完畢后,再把Connection1
取出來繼續(xù)執(zhí)行。resume(事務恢復): 嵌套事務執(zhí)行完畢,返回上層事務重新綁定連接對象到事務管理器的過程。
實際上,只有commit、rollback、close是在JDBC真實存在的,而其他動作都是應用的語意,而非JDBC事務的真實命令。因此,事務真實存在的方法是:setAutoCommit()
、commit()
、rollback()
。
close()語義為:
關閉一個數據庫連接,這已經不再是事務的方法了。
使用DataSource并不會執(zhí)行物理關閉,只是歸還給連接池。
6.2 自定義管理事務
為了保證在多個數據源中事務的一致性,我們可以手動管理Connetion
的事務提交和回滾??紤]到不同ORM框架的事務管理實現差異,要求實現自定義事務管理不影響框架層的事務。
這可以通過使用裝飾器設計模式,對Connection
進行包裝重寫commit和rolllback屏蔽其默認行為,這樣就不會影響到原生Connection
和ORM框架的默認事務行為。其整體思路如下圖所示:
這里并沒有使用前面提到的@SwitchDataSource
,這是因為我們在TransactionAop
中已經執(zhí)行了lookupKey的切換。
6.2.1 定義多事務注解
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MultiTransaction { String transactionManager() default "multiTransactionManager"; // 默認數據隔離級別,隨數據庫本身默認值 IsolationLevel isolationLevel() default IsolationLevel.DEFAULT; // 默認為主庫數據源 String datasourceId() default "default"; // 只讀事務,若有更新操作會拋出異常 boolean readOnly() default false;
業(yè)務方法只需使用該注解即可開啟事務,datasourceId
指定事務用到的數據源,不指定默認為主庫。
6.2.3 包裝Connection
自定義事務我們使用包裝過的Connection
,屏蔽其中的commit&rollback
方法。這樣我們就可以在主事務里進行統(tǒng)一的事務提交和回滾操作。
public class ConnectionProxy implements Connection { private final Connection connection; public ConnectionProxy(Connection connection) { this.connection = connection; } @Override public void commit() throws SQLException { // connection.commit(); } public void realCommit() throws SQLException { connection.commit(); } @Override public void close() throws SQLException { //connection.close(); } public void realClose() throws SQLException { if (!connection.getAutoCommit()) { connection.setAutoCommit(true); } connection.close(); } @Override public void rollback() throws SQLException { if(!connection.isClosed()) connection.rollback(); } ... }
這里commit&close
方法不執(zhí)行操作,rollback執(zhí)行的前提是連接執(zhí)行close才生效。這樣不管是使用哪個ORM框架,其自身事務管理都將失效。事務的控制就交由MultiTransaction
控制了。
6.2.4 事務上下文管理
public class TransactionHolder { // 是否開啟了一個MultiTransaction private boolean isOpen; // 是否只讀事務 private boolean readOnly; // 事務隔離級別 private IsolationLevel isolationLevel; // 維護當前線程事務ID和連接關系 private ConcurrentHashMap<String, ConnectionProxy> connectionMap; // 事務執(zhí)行棧 private Stack<String> executeStack; // 數據源切換棧 private Stack<String> datasourceKeyStack; // 主事務ID private String mainTransactionId; // 執(zhí)行次數 private AtomicInteger transCount; // 事務和數據源key關系 private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap; }
每開啟一個事物,生成一個事務ID并綁定一個ConnectionProxy
。事務嵌套調用,保存事務ID和lookupKey至棧中,當內層事務執(zhí)行完畢執(zhí)行pop。這樣的話,外層事務只需在棧中執(zhí)行peek即可獲取事務ID和lookupKey。
6.2.5 數據源兼容處理
為了不影響原生事務的使用,需要重寫getConnection
方法。當前線程沒有啟動自定義事務,則直接從數據源中返回連接。
@Override public Connection getConnection() throws SQLException { TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get(); if (Objects.isNull(transactionHolder)) { return determineTargetDataSource().getConnection(); } ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap() .get(transactionHolder.getExecuteStack().peek()); if (ConnectionProxy == null) { // 沒開跨庫事務,直接返回 return determineTargetDataSource().getConnection(); } else { transactionHolder.addCount(); // 開了跨庫事務,從當前線程中拿包裝過的Connection return ConnectionProxy; } }
6.2.6 切面處理
切面處理的核心邏輯是:維護一個嵌套事務棧,當業(yè)務方法執(zhí)行結束,或者發(fā)生異常時,判斷當前棧頂事務ID是否為主事務ID。如果是的話這時候已經到了最外層事務,這時才執(zhí)行提交和回滾。詳細流程如下圖所示:
package com.github.mtxn.transaction.aop; @Aspect @Component @Slf4j @Order(99999) public class MultiTransactionAop { @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)") public void pointcut() { if (log.isDebugEnabled()) { log.debug("start in transaction pointcut..."); } } @Around("pointcut()") public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); // 從切面中獲取當前方法 Method method = signature.getMethod(); MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class); if (multiTransaction == null) { return point.proceed(); } IsolationLevel isolationLevel = multiTransaction.isolationLevel(); boolean readOnly = multiTransaction.readOnly(); String prevKey = DataSourceContextHolder.getKey(); MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager()); // 切數據源,如果失敗使用默認庫 if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed(); // 開啟事務棧 TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager); Object proceed; try { proceed = point.proceed(); multiTransactionManager.commit(); } catch (Throwable ex) { log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex); multiTransactionManager.rollback(); throw ExceptionUtils.api(ex, "系統(tǒng)異常:%s", ex.getMessage()); } finally { // 當前事務結束出棧 String transId = multiTransactionManager.getTrans().getExecuteStack().pop(); transactionHolder.getDatasourceKeyStack().pop(); // 恢復上一層事務 DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek()); // 最后回到主事務,關閉此次事務 multiTransactionManager.close(transId); } return proceed; } }
7.總結
本文主要介紹了多數據源管理的解決方案(應用層事務,而非XA二段提交保證),以及對多個庫同時操作的事務管理。
需要注意的是,這種方式只適用于單體架構的應用。因為多個庫的事務參與者都是運行在同一個JVM進行。如果是在微服務架構的應用中,則需要使用分布式事務管理(譬如:Seata)。
到此這篇關于SpringBoot 多數據源及事務解決方案小結的文章就介紹到這了,更多相關SpringBoot 多數據源及事務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
spring boot 3.3.0和mybatis plus 3.5.6版本沖突
這篇文章主要介紹了spring boot 3.3.0和mybatis plus 3.5.6版本沖突的問題解決,文中介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2024-07-07如何利用IDEA搭建SpringBoot項目整合mybatis實現簡單的登錄功能
這篇文章主要介紹了如何利用IDEA搭建SpringBoot項目整合mybatis實現簡單的登錄功能,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08Spring?Boot整合Kafka+SSE實現實時數據展示
本文主要介紹了Spring?Boot整合Kafka+SSE實現實時數據展示2024-06-06