Atomikos + MybatisPlus解決多數(shù)據(jù)源事務(wù)一致性問題解決
多數(shù)據(jù)源事務(wù)
在實(shí)際項(xiàng)目的開發(fā)過程中,我們經(jīng)常會(huì)遇到在同一個(gè)項(xiàng)目或微服務(wù)中牽涉到使用兩個(gè)或多個(gè)數(shù)據(jù)源的,由于每個(gè)數(shù)據(jù)源需要使用不同的事務(wù)管理器,而每個(gè)事務(wù)管理器管理不同的數(shù)據(jù)源每個(gè)數(shù)據(jù)源只能保證單個(gè)數(shù)據(jù)源內(nèi)的事物一致性.所以在使用多個(gè)數(shù)據(jù)源的同時(shí)帶來的常見問題就是多數(shù)據(jù)源的事務(wù)一致性問題.本文通過利用一種常見的分布式事物管理器atomikos來解決此類問題.
Atomikos
Atomikos 是一個(gè)Java事務(wù)管理解決方案,用于處理分布式事務(wù)。它提供了一個(gè)可靠和可擴(kuò)展的事務(wù)管理器,可以協(xié)調(diào)多個(gè)資源(如數(shù)據(jù)庫(kù)、消息隊(duì)列等)之間的事務(wù)操作,以保證分布式系統(tǒng)的數(shù)據(jù)一致性與隔離性。
Atomikos 提供了以下主要功能和特點(diǎn):
分布式事務(wù)協(xié)調(diào):Atomikos 使用兩階段提交(Two-Phase Commit)協(xié)議來確保分布式事務(wù)的一致性。它充當(dāng)協(xié)調(diào)者角色,與參與者(各個(gè)資源管理器)進(jìn)行協(xié)作并決定是否提交或回滾事務(wù)。
事務(wù)原子性:Atomikos 確保在分布式環(huán)境中進(jìn)行的事務(wù)操作以原子方式執(zhí)行。如果其中任何一個(gè)資源的操作失敗,Atomikos 將自動(dòng)回滾整個(gè)事務(wù),確保數(shù)據(jù)的一致性。
分布式數(shù)據(jù)源和連接池:Atomikos 提供了分布式數(shù)據(jù)源和連接池,用于管理多個(gè)數(shù)據(jù)庫(kù)連接和資源。它能夠有效地管理連接和提供高性能和可伸縮性。
事務(wù)隔離級(jí)別:Atomikos 支持不同的事務(wù)隔離級(jí)別,包括讀未提交(Read Uncommitted)、讀已提交(Read Committed)、可重復(fù)讀(Repeatable Read)和串行化(Serializable)。
高可靠性和擴(kuò)展性:Atomikos 可以在分布式環(huán)境中進(jìn)行集群部署,提供高可靠性和擴(kuò)展性。多個(gè) Atomikos 事務(wù)管理器可以一起工作,以保證負(fù)載均衡和容錯(cuò)。
引入相關(guān)依賴
<dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- atomikos 依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- mybatis plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.3.1</version> </dependency> <!-- mysql 驅(qū)動(dòng)包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.26</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
application.yml配置
# 服務(wù)端口號(hào) server: port: 8069 # 多數(shù)據(jù)源配置 spring: datasource: properties: user_db: url: jdbc:mysql://192.168.1.18:3306/user_db user: root password: xxxxxxx data_db: url: jdbc:mysql://192.168.1.18:3306/data_db user: root password: xxxxxxx
上述配置中使用spring.datasource.properties配置多個(gè)數(shù)據(jù)源,在配置類中會(huì)使用一個(gè)Map<String,Map<String,String>>讀取多數(shù)據(jù)源的每個(gè)配置項(xiàng).
多數(shù)據(jù)源配置類:DataSourceConfiguration
給配置類負(fù)責(zé)創(chuàng)建多個(gè)數(shù)據(jù)源以及對(duì)應(yīng)的SqlSessionFactory.代碼如下:
package personal.gltm.demo.config; import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import com.mysql.cj.jdbc.MysqlXADataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Primary; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; import java.util.LinkedHashMap; import java.util.Map; @Configuration @ConfigurationProperties("spring.datasource") public class DataSourceConfiguration { private Map<String,Map<String,String>> properties; // spring.datasource.properties private Map<String,DataSource> sourceMap; // 用來保存數(shù)據(jù)源信息 public Map<String, Map<String,String>> getProperties() { return properties; } public void setProperties(Map<String, Map<String,String>> properties) { this.properties = properties; } /** * 添加user_db數(shù)據(jù)源 * @return */ @Bean("user_db") @DependsOn({"datasourceMap"}) public DataSource userDatasource(){ return sourceMap.get("user_db"); } /** * 添加user_db SqlSessionFactory * @return * @throws Exception */ @Bean("user_db_sql_session_factory") @DependsOn("user_db") public SqlSessionFactory userDbSqlSessionFactory() throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(this.sourceMap.get("user_db")); // 設(shè)置mapper位置 bean.setTypeAliasesPackage("personal.gltm.demo.mapper.user"); // 設(shè)置mapper.xml文件的路徑 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:user/mapper/*.xml")); return bean.getObject(); } /** * 添加 data_db 數(shù)據(jù)源 * @return */ @Bean("data_db") @DependsOn({"datasourceMap"}) public DataSource dataDatasource(){ return sourceMap.get("data_db"); } /** * 添加data_db SqlSessionFactory * @return * @throws Exception */ @Bean("data_db_sql_session_factory") @DependsOn("data_db") public SqlSessionFactory dataDbSqlSessionFactory() throws Exception { MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); bean.setDataSource(this.sourceMap.get("data_db")); // 設(shè)置mapper位置 bean.setTypeAliasesPackage("personal.gltm.demo.mapper.data"); // 設(shè)置mapper.xml文件的路徑 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:data/mapper/*.xml")); return bean.getObject(); } /** * 創(chuàng)建XA 數(shù)據(jù)源并添加到sourceMap中 * @param dataSourceConfiguration * @return */ @Bean("datasourceMap") @Primary public Map<String, DataSource> datasourceMap(DataSourceConfiguration dataSourceConfiguration){ Map<String, DataSource> map = new LinkedHashMap<>(); for(Map.Entry<String,Map<String, String>> entry:dataSourceConfiguration.properties.entrySet()){ // 讀取每個(gè)數(shù)據(jù)源的配置創(chuàng)建datasource MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setUrl(entry.getValue().get("url")); dataSource.setUser(entry.getValue().get("user")); dataSource.setPassword(entry.getValue().get("password")); // 創(chuàng)建atomikosDataSource數(shù)據(jù)源 AtomikosDataSourceBean atomikosDataSource = new AtomikosDataSourceBean(); atomikosDataSource.setMaxPoolSize(10); atomikosDataSource.setMinPoolSize(5); atomikosDataSource.setBeanName(entry.getKey()); // 設(shè)置bean的名稱 atomikosDataSource.setXaDataSource(dataSource); // 設(shè)置Xa數(shù)據(jù)源 atomikosDataSource.setTestQuery("select now()"); map.put(entry.getKey(), atomikosDataSource); } this.sourceMap = map; return map; } }
上述配置和代碼中我們創(chuàng)建了兩個(gè)mysql數(shù)據(jù)源分別是user_db和data_db.并且在兩個(gè)數(shù)據(jù)源中分別創(chuàng)建了兩個(gè)不同的數(shù)據(jù)表:user_db.t_user{id:bigint,user_name:varchar}和data_db. t_data{id:bigint,context:varchar}
多數(shù)據(jù)源事務(wù)管理器配置:AtomikosConfig
package personal.gltm.demo.config; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import lombok.SneakyThrows; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScans; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.jta.JtaTransactionManager; import javax.transaction.SystemException; import javax.transaction.TransactionManager; import javax.transaction.UserTransaction; @Configuration @EnableTransactionManagement @MapperScans({@MapperScan(basePackages={"personal.gltm.demo.mapper.user"},sqlSessionFactoryRef = "user_db_sql_session_factory"), @MapperScan(basePackages = {"personal.gltm.demo.mapper.data"},sqlSessionFactoryRef = "data_db_sql_session_factory")}) public class AtomikosConfig { // 用于在應(yīng)用程序中執(zhí)行事務(wù)的控制操作。 @Bean(name = "userTransaction") @SneakyThrows(Exception.class) public UserTransaction userTransaction() throws SystemException { final UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(1000); return userTransactionImp; } // 用于管理和控制分布式事務(wù)的整個(gè)生命周期。 @Bean(name = "atomikosTransactionManager") @SneakyThrows(Exception.class) public TransactionManager atomikosTransactionManager() { final UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } // JtaTransactionManager 的主要作用是管理和協(xié)調(diào)分布式事務(wù),它支持使用 JTA 來處理分布式事務(wù),與 JTA 兼容的事務(wù)管理器進(jìn)行交互。 @Bean(name = "transactionManager") @SneakyThrows(Throwable.class) public PlatformTransactionManager transactionManager( @Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) throws SystemException { return new JtaTransactionManager(userTransaction(), atomikosTransactionManager()); } }
上述代碼需要使用@MapperScans注解綁定sqlsessionfactory的掃描包.
驗(yàn)證測(cè)試
添加測(cè)試類:
package personal.gltm.demo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import personal.gltm.demo.model.Data; import personal.gltm.demo.model.User; import personal.gltm.demo.service.DataService; import personal.gltm.demo.service.UserService; import java.math.BigDecimal; @RequestMapping("/test") @RestController public class TestController { @Autowired private UserService userService; @Autowired private DataService dataService; @GetMapping("/test/{u-id}/{d-id}") @Transactional(rollbackFor = Throwable.class) public String Test(@PathVariable("u-id")BigDecimal uId,@PathVariable("d-id") BigDecimal dId){ User user = new User(); user.setId(uId); user.setUserName("sihong" + Math.floor(Math.random() * 20)); userService.insert(user); Data data = new Data(); data.setId(dId); data.setContext("mine mine mine ..... " + System.nanoTime()); dataService.insert(data); return "success"; } }
測(cè)試類中,我們添加了一個(gè)Test方法,接收兩個(gè)參數(shù):userId和dataId,然后把這兩個(gè)參數(shù)作為主鍵分別插入到user_db.t_user和data_db.t_data中.如果主鍵沖突則其中一個(gè)會(huì)報(bào)錯(cuò).如果要保證多數(shù)據(jù)源事物一致性另一個(gè)事物也必須回滾.
在瀏覽器中先調(diào)用:http://localhost/test/test/1/1 分別在user_db.t_user 和data_db.t_data中插入主鍵為1 的數(shù)據(jù).再調(diào)用http://localhost/test/test/1/2 時(shí)會(huì)向數(shù)據(jù)庫(kù)插入主鍵分別為1和2的數(shù)據(jù),但是t_user中會(huì)存在主鍵沖突.整個(gè)事務(wù)回滾t_data 中也不會(huì)插入數(shù)據(jù).
到此這篇關(guān)于Atomikos + MybatisPlus解決多數(shù)據(jù)源事務(wù)一致性問題解決的文章就介紹到這了,更多相關(guān)Atomikos MybatisPlus多數(shù)據(jù)源事務(wù)一致性內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringCloud如何引用xxjob定時(shí)任務(wù)
Spring?Cloud?本身不直接支持?XXL-JOB?這樣的定時(shí)任務(wù)框架,如果你想在?Spring?Cloud?應(yīng)用中集成?XXL-JOB,你需要手動(dòng)進(jìn)行配置,本文給大家介紹SpringCloud如何引用xxjob定時(shí)任務(wù),感興趣的朋友一起看看吧2024-04-04詳細(xì)談?wù)凧ava中l(wèi)ong和double的原子性
原子性是指一個(gè)操作或多個(gè)操作要么全部執(zhí)行,且執(zhí)行的過程不會(huì)被任何因素打斷,要么就都不執(zhí)行,下面這篇文章主要給大家介紹了關(guān)于Java中l(wèi)ong和double原子性的相關(guān)資料,需要的朋友可以參考下2021-08-08Java設(shè)計(jì)模式之監(jiān)聽器模式實(shí)例詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之監(jiān)聽器模式,結(jié)合實(shí)例形式較為詳細(xì)的分析了java設(shè)計(jì)模式中監(jiān)聽器模式的概念、原理及相關(guān)實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2018-02-02詳解Java ScheduledThreadPoolExecutor的踩坑與解決方法
最近項(xiàng)目上反饋某個(gè)重要的定時(shí)任務(wù)突然不執(zhí)行了,很頭疼,開發(fā)環(huán)境和測(cè)試環(huán)境都沒有出現(xiàn)過這個(gè)問題。定時(shí)任務(wù)采用的是ScheduledThreadPoolExecutor,后來一看代碼發(fā)現(xiàn)踩了一個(gè)大坑。本文就來和大家聊聊這次的踩坑記錄與解決方法,需要的可以參考一下2022-10-10Java實(shí)現(xiàn)短信驗(yàn)證碼和國(guó)際短信群發(fā)功能的示例
本篇文章主要介紹了Java實(shí)現(xiàn)短信驗(yàn)證碼和國(guó)際短信群發(fā)功能的示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-02-02SpringBoot工程下Lombok的應(yīng)用教程詳解
這篇文章主要給大家介紹了關(guān)于SpringBoot工程下Lombok應(yīng)用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11