Java多線程事務(wù)回滾@Transactional失效處理方案
背景介紹
1,最近有一個大數(shù)據(jù)量插入的操作入庫的業(yè)務(wù)場景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會很多,用到多線程去拆分?jǐn)?shù)據(jù)并行處理來提高響應(yīng)時間,如果有一個線程執(zhí)行失敗,則全部回滾。
2,在spring中可以使用@Transactional注解去控制事務(wù),使出現(xiàn)異常時會進(jìn)行回滾,在多線程中,這個注解則不會生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫的操作,當(dāng)子線程在進(jìn)行處理出現(xiàn)異常時,主線程修改的數(shù)據(jù)則不會回滾,導(dǎo)致數(shù)據(jù)錯誤。
3,下面用一個簡單示例演示多線程事務(wù)。
公用的類和方法
示例事務(wù)不成功操作
/** * 測試多線程事務(wù). * @param employeeDOList */ @Override @Transactional public void saveThread(List<EmployeeDO> employeeDOList) { try { //先做刪除操作,如果子線程出現(xiàn)異常,此操作不會回滾 this.getBaseMapper().delete(null); //獲取線程池 ExecutorService service = ExecutorConfig.getThreadPool(); //拆分?jǐn)?shù)據(jù),拆分5份 List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5); //執(zhí)行的線程 Thread []threadArray = new Thread[lists.size()]; //監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會導(dǎo)致主線程關(guān)閉,子線程也會隨著關(guān)閉 CountDownLatch countDownLatch = new CountDownLatch(lists.size()); AtomicBoolean atomicBoolean = new AtomicBoolean(true); for (int i =0;i<lists.size();i++){ if (i==lists.size()-1){ atomicBoolean.set(false); } List<EmployeeDO> list = lists.get(i); threadArray[i] = new Thread(() -> { try { //最后一個線程拋出異常 if (!atomicBoolean.get()){ throw new ServiceException("001","出現(xiàn)異常"); } //批量添加,mybatisPlus中自帶的batch方法 this.saveBatch(list); }finally { countDownLatch.countDown(); } }); } for (int i = 0; i <lists.size(); i++){ service.execute(threadArray[i]); } //當(dāng)子線程執(zhí)行完畢時,主線程再往下執(zhí)行 countDownLatch.await(); System.out.println("添加完畢"); }catch (Exception e){ log.info("error",e); throw new ServiceException("002","出現(xiàn)異常"); }finally { connection.close(); } }
數(shù)據(jù)庫中存在一條數(shù)據(jù):
//測試用例 @RunWith(SpringRunner.class) @SpringBootTest(classes = { ThreadTest01.class, MainApplication.class}) public class ThreadTest01 { @Resource private EmployeeBO employeeBO; /** * 測試多線程事務(wù). * @throws InterruptedException */ @Test public void MoreThreadTest2() throws InterruptedException { int size = 10; List<EmployeeDO> employeeDOList = new ArrayList<>(size); for (int i = 0; i<size;i++){ EmployeeDO employeeDO = new EmployeeDO(); employeeDO.setEmployeeName("lol"+i); employeeDO.setAge(18); employeeDO.setGender(1); employeeDO.setIdNumber(i+"XX"); employeeDO.setCreatTime(Calendar.getInstance().getTime()); employeeDOList.add(employeeDO); } try { employeeBO.saveThread(employeeDOList); System.out.println("添加成功"); }catch (Exception e){ e.printStackTrace(); } } }
測試結(jié)果:
可以發(fā)現(xiàn)子線程組執(zhí)行時,有一個線程執(zhí)行失敗,其他線程也會拋出異常,但是主線程中執(zhí)行的刪除操作,沒有回滾,@Transactional注解沒有生效。
使用sqlSession控制手動提交事務(wù)
@Resource SqlContext sqlContext; /** * 測試多線程事務(wù). * @param employeeDOList */ @Override public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException { // 獲取數(shù)據(jù)庫連接,獲取會話(內(nèi)部自有事務(wù)) SqlSession sqlSession = sqlContext.getSqlSession(); Connection connection = sqlSession.getConnection(); try { // 設(shè)置手動提交 connection.setAutoCommit(false); //獲取mapper EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class); //先做刪除操作 employeeMapper.delete(null); //獲取執(zhí)行器 ExecutorService service = ExecutorConfig.getThreadPool(); List<Callable<Integer>> callableList = new ArrayList<>(); //拆分list List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5); AtomicBoolean atomicBoolean = new AtomicBoolean(true); for (int i =0;i<lists.size();i++){ if (i==lists.size()-1){ atomicBoolean.set(false); } List<EmployeeDO> list = lists.get(i); //使用返回結(jié)果的callable去執(zhí)行, Callable<Integer> callable = () -> { //讓最后一個線程拋出異常 if (!atomicBoolean.get()){ throw new ServiceException("001","出現(xiàn)異常"); } return employeeMapper.saveBatch(list); }; callableList.add(callable); } //執(zhí)行子線程 List<Future<Integer>> futures = service.invokeAll(callableList); for (Future<Integer> future:futures) { //如果有一個執(zhí)行不成功,則全部回滾 if (future.get()<=0){ connection.rollback(); return; } } connection.commit(); System.out.println("添加完畢"); }catch (Exception e){ connection.rollback(); log.info("error",e); throw new ServiceException("002","出現(xiàn)異常"); }finally { connection.close(); } } // sql <insert id="saveBatch" parameterType="List"> INSERT INTO employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status) values <foreach collection="list" item="item" index="index" separator=","> ( #{item.employeeId}, #{item.age}, #{item.employeeName}, #{item.birthDate}, #{item.gender}, #{item.idNumber}, #{item.creatTime}, #{item.updateTime}, #{item.status} ) </foreach> </insert>
數(shù)據(jù)庫中一條數(shù)據(jù):
測試結(jié)果:拋出異常,
刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫中的數(shù)據(jù)依舊存在,說明事務(wù)成功了。
成功操作示例:
@Resource SqlContext sqlContext; /** * 測試多線程事務(wù). * @param employeeDOList */ @Override public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException { // 獲取數(shù)據(jù)庫連接,獲取會話(內(nèi)部自有事務(wù)) SqlSession sqlSession = sqlContext.getSqlSession(); Connection connection = sqlSession.getConnection(); try { // 設(shè)置手動提交 connection.setAutoCommit(false); EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class); //先做刪除操作 employeeMapper.delete(null); ExecutorService service = ExecutorConfig.getThreadPool(); List<Callable<Integer>> callableList = new ArrayList<>(); List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5); for (int i =0;i<lists.size();i++){ List<EmployeeDO> list = lists.get(i); Callable<Integer> callable = () -> employeeMapper.saveBatch(list); callableList.add(callable); } //執(zhí)行子線程 List<Future<Integer>> futures = service.invokeAll(callableList); for (Future<Integer> future:futures) { if (future.get()<=0){ connection.rollback(); return; } } connection.commit(); System.out.println("添加完畢"); }catch (Exception e){ connection.rollback(); log.info("error",e); throw new ServiceException("002","出現(xiàn)異常"); // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR); } }
測試結(jié)果:
數(shù)據(jù)庫中數(shù)據(jù):
刪除的刪除了,添加的添加成功了,測試成功。
到此這篇關(guān)于Java多線程事務(wù)回滾@Transactional失效處理方案的文章就介紹到這了,更多相關(guān)Java Transactional失效內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java根據(jù)模板實現(xiàn)excel導(dǎo)出標(biāo)準(zhǔn)化
這篇文章主要為大家詳細(xì)介紹了Java如何根據(jù)模板實現(xiàn)excel導(dǎo)出標(biāo)準(zhǔn)化,文中的示例代碼講解詳細(xì),具有一定的借鑒價值,有需要的小伙伴可以參考下2024-03-03java中的Serializable、transient關(guān)鍵字詳解
這篇文章主要介紹了java中的Serializable、transient關(guān)鍵字詳解,序列化只會保存屬性值,不會保存方法,通過反序列化可以把序列化后的內(nèi)容恢復(fù)成對象,需要的朋友可以參考下2023-09-09詳解springboot接口如何優(yōu)雅的接收時間類型參數(shù)
這篇文章主要為大家詳細(xì)介紹了springboot的接口如何優(yōu)雅的接收時間類型參數(shù),文中為大家整理了三種常見的方法,希望對大家有一定的幫助2023-09-09Java并發(fā)編程Lock?Condition和ReentrantLock基本原理
這篇文章主要介紹了Java并發(fā)編程Lock?Condition和ReentrantLock基本原理,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09Struts2實現(xiàn)文件上傳時顯示進(jìn)度條功能
這篇文章主要為大家詳細(xì)介紹了Struts2實現(xiàn)文件上傳時顯示進(jìn)度條功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05一篇文章帶你了解Maven的坐標(biāo)概念以及依賴管理
這篇文章主要為大家介紹了Maven的坐標(biāo)概念以及依賴管理,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-01-01