Java多線(xiàn)程事務(wù)回滾@Transactional失效處理方案
背景介紹
1,最近有一個(gè)大數(shù)據(jù)量插入的操作入庫(kù)的業(yè)務(wù)場(chǎng)景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會(huì)很多,用到多線(xiàn)程去拆分?jǐn)?shù)據(jù)并行處理來(lái)提高響應(yīng)時(shí)間,如果有一個(gè)線(xiàn)程執(zhí)行失敗,則全部回滾。
2,在spring中可以使用@Transactional注解去控制事務(wù),使出現(xiàn)異常時(shí)會(huì)進(jìn)行回滾,在多線(xiàn)程中,這個(gè)注解則不會(huì)生效,如果主線(xiàn)程需要先執(zhí)行一些修改數(shù)據(jù)庫(kù)的操作,當(dāng)子線(xiàn)程在進(jìn)行處理出現(xiàn)異常時(shí),主線(xiàn)程修改的數(shù)據(jù)則不會(huì)回滾,導(dǎo)致數(shù)據(jù)錯(cuò)誤。
3,下面用一個(gè)簡(jiǎn)單示例演示多線(xiàn)程事務(wù)。
公用的類(lèi)和方法
示例事務(wù)不成功操作
/** * 測(cè)試多線(xiàn)程事務(wù). * @param employeeDOList */ @Override @Transactional public void saveThread(List<EmployeeDO> employeeDOList) { try { //先做刪除操作,如果子線(xiàn)程出現(xiàn)異常,此操作不會(huì)回滾 this.getBaseMapper().delete(null); //獲取線(xiàn)程池 ExecutorService service = ExecutorConfig.getThreadPool(); //拆分?jǐn)?shù)據(jù),拆分5份 List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5); //執(zhí)行的線(xiàn)程 Thread []threadArray = new Thread[lists.size()]; //監(jiān)控子線(xiàn)程執(zhí)行完畢,再執(zhí)行主線(xiàn)程,要不然會(huì)導(dǎo)致主線(xiàn)程關(guān)閉,子線(xiàn)程也會(huì)隨著關(guān)閉 CountDownLatch countDownLatch = new CountDownLatch(lists.size()); AtomicBoolean atomicBoolean = new AtomicBoolean(true); for (int i =0;i<lists.size();i++){ if (i==lists.size()-1){ atomicBoolean.set(false); } List<EmployeeDO> list = lists.get(i); threadArray[i] = new Thread(() -> { try { //最后一個(gè)線(xiàn)程拋出異常 if (!atomicBoolean.get()){ throw new ServiceException("001","出現(xiàn)異常"); } //批量添加,mybatisPlus中自帶的batch方法 this.saveBatch(list); }finally { countDownLatch.countDown(); } }); } for (int i = 0; i <lists.size(); i++){ service.execute(threadArray[i]); } //當(dāng)子線(xiàn)程執(zhí)行完畢時(shí),主線(xiàn)程再往下執(zhí)行 countDownLatch.await(); System.out.println("添加完畢"); }catch (Exception e){ log.info("error",e); throw new ServiceException("002","出現(xiàn)異常"); }finally { connection.close(); } }
數(shù)據(jù)庫(kù)中存在一條數(shù)據(jù):
//測(cè)試用例 @RunWith(SpringRunner.class) @SpringBootTest(classes = { ThreadTest01.class, MainApplication.class}) public class ThreadTest01 { @Resource private EmployeeBO employeeBO; /** * 測(cè)試多線(xiàn)程事務(wù). * @throws InterruptedException */ @Test public void MoreThreadTest2() throws InterruptedException { int size = 10; List<EmployeeDO> employeeDOList = new ArrayList<>(size); for (int i = 0; i<size;i++){ EmployeeDO employeeDO = new EmployeeDO(); employeeDO.setEmployeeName("lol"+i); employeeDO.setAge(18); employeeDO.setGender(1); employeeDO.setIdNumber(i+"XX"); employeeDO.setCreatTime(Calendar.getInstance().getTime()); employeeDOList.add(employeeDO); } try { employeeBO.saveThread(employeeDOList); System.out.println("添加成功"); }catch (Exception e){ e.printStackTrace(); } } }
測(cè)試結(jié)果:
可以發(fā)現(xiàn)子線(xiàn)程組執(zhí)行時(shí),有一個(gè)線(xiàn)程執(zhí)行失敗,其他線(xiàn)程也會(huì)拋出異常,但是主線(xiàn)程中執(zhí)行的刪除操作,沒(méi)有回滾,@Transactional注解沒(méi)有生效。
使用sqlSession控制手動(dòng)提交事務(wù)
@Resource SqlContext sqlContext; /** * 測(cè)試多線(xiàn)程事務(wù). * @param employeeDOList */ @Override public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException { // 獲取數(shù)據(jù)庫(kù)連接,獲取會(huì)話(huà)(內(nèi)部自有事務(wù)) SqlSession sqlSession = sqlContext.getSqlSession(); Connection connection = sqlSession.getConnection(); try { // 設(shè)置手動(dòng)提交 connection.setAutoCommit(false); //獲取mapper EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class); //先做刪除操作 employeeMapper.delete(null); //獲取執(zhí)行器 ExecutorService service = ExecutorConfig.getThreadPool(); List<Callable<Integer>> callableList = new ArrayList<>(); //拆分list List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5); AtomicBoolean atomicBoolean = new AtomicBoolean(true); for (int i =0;i<lists.size();i++){ if (i==lists.size()-1){ atomicBoolean.set(false); } List<EmployeeDO> list = lists.get(i); //使用返回結(jié)果的callable去執(zhí)行, Callable<Integer> callable = () -> { //讓最后一個(gè)線(xiàn)程拋出異常 if (!atomicBoolean.get()){ throw new ServiceException("001","出現(xiàn)異常"); } return employeeMapper.saveBatch(list); }; callableList.add(callable); } //執(zhí)行子線(xiàn)程 List<Future<Integer>> futures = service.invokeAll(callableList); for (Future<Integer> future:futures) { //如果有一個(gè)執(zhí)行不成功,則全部回滾 if (future.get()<=0){ connection.rollback(); return; } } connection.commit(); System.out.println("添加完畢"); }catch (Exception e){ connection.rollback(); log.info("error",e); throw new ServiceException("002","出現(xiàn)異常"); }finally { connection.close(); } } // sql <insert id="saveBatch" parameterType="List"> INSERT INTO employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status) values <foreach collection="list" item="item" index="index" separator=","> ( #{item.employeeId}, #{item.age}, #{item.employeeName}, #{item.birthDate}, #{item.gender}, #{item.idNumber}, #{item.creatTime}, #{item.updateTime}, #{item.status} ) </foreach> </insert>
數(shù)據(jù)庫(kù)中一條數(shù)據(jù):
測(cè)試結(jié)果:拋出異常,
刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫(kù)中的數(shù)據(jù)依舊存在,說(shuō)明事務(wù)成功了。
成功操作示例:
@Resource SqlContext sqlContext; /** * 測(cè)試多線(xiàn)程事務(wù). * @param employeeDOList */ @Override public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException { // 獲取數(shù)據(jù)庫(kù)連接,獲取會(huì)話(huà)(內(nèi)部自有事務(wù)) SqlSession sqlSession = sqlContext.getSqlSession(); Connection connection = sqlSession.getConnection(); try { // 設(shè)置手動(dòng)提交 connection.setAutoCommit(false); EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class); //先做刪除操作 employeeMapper.delete(null); ExecutorService service = ExecutorConfig.getThreadPool(); List<Callable<Integer>> callableList = new ArrayList<>(); List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5); for (int i =0;i<lists.size();i++){ List<EmployeeDO> list = lists.get(i); Callable<Integer> callable = () -> employeeMapper.saveBatch(list); callableList.add(callable); } //執(zhí)行子線(xiàn)程 List<Future<Integer>> futures = service.invokeAll(callableList); for (Future<Integer> future:futures) { if (future.get()<=0){ connection.rollback(); return; } } connection.commit(); System.out.println("添加完畢"); }catch (Exception e){ connection.rollback(); log.info("error",e); throw new ServiceException("002","出現(xiàn)異常"); // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR); } }
測(cè)試結(jié)果:
數(shù)據(jù)庫(kù)中數(shù)據(jù):
刪除的刪除了,添加的添加成功了,測(cè)試成功。
到此這篇關(guān)于Java多線(xiàn)程事務(wù)回滾@Transactional失效處理方案的文章就介紹到這了,更多相關(guān)Java Transactional失效內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java根據(jù)模板實(shí)現(xiàn)excel導(dǎo)出標(biāo)準(zhǔn)化
這篇文章主要為大家詳細(xì)介紹了Java如何根據(jù)模板實(shí)現(xiàn)excel導(dǎo)出標(biāo)準(zhǔn)化,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,有需要的小伙伴可以參考下2024-03-03java中的Serializable、transient關(guān)鍵字詳解
這篇文章主要介紹了java中的Serializable、transient關(guān)鍵字詳解,序列化只會(huì)保存屬性值,不會(huì)保存方法,通過(guò)反序列化可以把序列化后的內(nèi)容恢復(fù)成對(duì)象,需要的朋友可以參考下2023-09-09詳解springboot接口如何優(yōu)雅的接收時(shí)間類(lèi)型參數(shù)
這篇文章主要為大家詳細(xì)介紹了springboot的接口如何優(yōu)雅的接收時(shí)間類(lèi)型參數(shù),文中為大家整理了三種常見(jiàn)的方法,希望對(duì)大家有一定的幫助2023-09-09Java并發(fā)編程Lock?Condition和ReentrantLock基本原理
這篇文章主要介紹了Java并發(fā)編程Lock?Condition和ReentrantLock基本原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09SpringBoot實(shí)現(xiàn)redis緩存菜單列表
本文主要介紹了SpringBoot實(shí)現(xiàn)redis緩存菜單列表,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01Struts2實(shí)現(xiàn)文件上傳時(shí)顯示進(jìn)度條功能
這篇文章主要為大家詳細(xì)介紹了Struts2實(shí)現(xiàn)文件上傳時(shí)顯示進(jìn)度條功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05一篇文章帶你了解Maven的坐標(biāo)概念以及依賴(lài)管理
這篇文章主要為大家介紹了Maven的坐標(biāo)概念以及依賴(lài)管理,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2022-01-01