MybatisPlus批量保存原理及失效原因排查全過程
問題描述
一般情況下,在MybatisPlus中使用saveBatch方法進行批量保存只需要:在數(shù)據(jù)庫連接串中添加&rewriteBatchedStatements=true,并將MySQL驅動保證在5.0.18以上即可。
但是在這里實際使用中批量保存并沒有生效,列表數(shù)據(jù)被分組成幾批數(shù)據(jù)保存,而不是一批數(shù)據(jù)保存,通過調試、查看數(shù)據(jù)庫日志等方式可以驗證。
所以現(xiàn)在是配置正確,驅動正確,批量保存的數(shù)據(jù)正確,但是批量保存沒有生效。
批量保存原理
框架是不會出問題的,這里來看下MybatisPlus實現(xiàn)批量保存的實現(xiàn)方式,調用saveBatch方法后發(fā)生了什么。
ServiceImpl.saveBatch()
@Transactional(rollbackFor = Exception.class) @Override public boolean saveBatch(Collection<T> entityList, int batchSize) { // ${className}.insert String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE); // 函數(shù)式編程 BiConsumer<T, U> return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity)); }
采用函數(shù)式編程實現(xiàn)BiConsumer接口,其用法相當于
@Override public boolean saveBatch(Collection entityList, int batchSize) { BiConsumer<SqlSession, Object> consumer = new EntityConsumer(); return executeBatch(entityList, batchSize, consumer); } class EntityConsumer implements BiConsumer<SqlSession, Object>{ @Override public void accept(SqlSession sqlSession, Object object) { sqlSession.insert("${className}.insert", object); } }
SqlHelper.executeBatch()
定義批量保存的模板方法
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) { Assert.isFalse(batchSize < 1, "batchSize must not be less than one"); return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> { int size = list.size(); int i = 1; for (E element : list) { // 調用sqlSession.insert("${className}.insert", object); // 數(shù)據(jù)最終保存到StatementImpl.batchedArgs中,用于后面做批量保存 consumer.accept(sqlSession, element); if ((i % batchSize == 0) || i == size) { // 批量保存StatementImpl.batchedArgs中的數(shù)據(jù) sqlSession.flushStatements(); } i++; } }); }
MybatisBatchExecutor.doUpdate()
將待執(zhí)行對象添加到對應的Statement中,可以理解為將批量數(shù)據(jù)分組,分組的依據(jù)包含兩個:
if (sql.equals(currentSql) && ms.equals(currentStatement))
? ● 數(shù)據(jù)的SQL語句必須完全一致,包括表名和列
? ● 使用的MappedStatement一致,即Mapper一致
@Override public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException { final Configuration configuration = ms.getConfiguration(); final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null); final BoundSql boundSql = handler.getBoundSql(); final String sql = boundSql.getSql(); final Statement stmt; // ** if (sql.equals(currentSql) && ms.equals(currentStatement)) { int last = statementList.size() - 1; stmt = statementList.get(last); applyTransactionTimeout(stmt); handler.parameterize(stmt);//fix Issues 322 BatchResult batchResult = batchResultList.get(last); batchResult.addParameterObject(parameterObject); } else { Connection connection = getConnection(ms.getStatementLog()); stmt = handler.prepare(connection, transaction.getTimeout()); if (stmt == null) { return 0; } handler.parameterize(stmt); //fix Issues 322 currentSql = sql; currentStatement = ms; statementList.add(stmt); batchResultList.add(new BatchResult(ms, sql, parameterObject)); } handler.batch(stmt); return BATCH_UPDATE_RETURN_VALUE; }
PreparedStatement.addBatch()
將數(shù)據(jù)添加到StatementImpl.batchedArgs中,至此第一階段完成
public void addBatch() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { if (this.batchedArgs == null) { this.batchedArgs = new ArrayList<Object>(); } for (int i = 0; i < this.parameterValues.length; i++) { checkAllParametersSet(this.parameterValues[i], this.parameterStreams[i], i); } this.batchedArgs.add(new BatchParams(this.parameterValues, this.parameterStreams, this.isStream, this.streamLengths, this.isNull)); } }
MybatisBatchExecutor.doFlushStatements()
遍歷Statemen,并執(zhí)行executeBatch()方法
@Override public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException { try { List<BatchResult> results = new ArrayList<>(); if (isRollback) { return Collections.emptyList(); } for (int i = 0, n = statementList.size(); i < n; i++) { Statement stmt = statementList.get(i); applyTransactionTimeout(stmt); BatchResult batchResult = batchResultList.get(i); try { batchResult.setUpdateCounts(stmt.executeBatch()); MappedStatement ms = batchResult.getMappedStatement(); List<Object> parameterObjects = batchResult.getParameterObjects(); KeyGenerator keyGenerator = ms.getKeyGenerator(); if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) { Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator; jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects); } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141 for (Object parameter : parameterObjects) { keyGenerator.processAfter(this, ms, stmt, parameter); } } // Close statement to close cursor #1109 closeStatement(stmt); } catch (BatchUpdateException e) { StringBuilder message = new StringBuilder(); message.append(batchResult.getMappedStatement().getId()) .append(" (batch index #") .append(i + 1) .append(")") .append(" failed."); if (i > 0) { message.append(" ") .append(i) .append(" prior sub executor(s) completed successfully, but will be rolled back."); } throw new BatchExecutorException(message.toString(), e, results, batchResult); } results.add(batchResult); } return results; } finally { for (Statement stmt : statementList) { closeStatement(stmt); } currentSql = null; statementList.clear(); batchResultList.clear(); } }
PreparedStatement.executeBatchInternal()
最終執(zhí)行批量操作的邏輯,這里會判斷rewriteBatchedStatements參數(shù)
@Override protected long[] executeBatchInternal() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { if (this.connection.isReadOnly()) { throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"), SQLError.SQL_STATE_ILLEGAL_ARGUMENT); } if (this.batchedArgs == null || this.batchedArgs.size() == 0) { return new long[0]; } // we timeout the entire batch, not individual statements int batchTimeout = this.timeoutInMillis; this.timeoutInMillis = 0; resetCancelledState(); try { statementBegins(); clearWarnings(); // 判斷rewriteBatchedStatements參數(shù) if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) { if (canRewriteAsMultiValueInsertAtSqlLevel()) { return executeBatchedInserts(batchTimeout); } if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) { return executePreparedBatchAsMultiStatement(batchTimeout); } } return executeBatchSerially(batchTimeout); } finally { this.statementExecuting.set(false); clearBatch(); } } }
問題排查
? 在流程中幾個關鍵節(jié)點為:
? ● MybatisBatchExecutor:1、執(zhí)行數(shù)據(jù)分組邏輯。2、遍歷Statement執(zhí)行批量保存邏輯。
? ● StatementImpl:保存batchedArgs
? ● PreparedStatement:執(zhí)行最終的批量保存邏輯
? 可以看出JDK層只執(zhí)行最終的保存操作,如果這里的數(shù)據(jù)batchedArgs沒有拿到批量的,那一定是MybatisPlus的分組邏輯出現(xiàn)問題。通過調試發(fā)現(xiàn)問題出現(xiàn)在?
if (sql.equals(currentSql) && ms.equals(currentStatement))
? 由于有些數(shù)據(jù)有些列沒有默認值導致SQL的列不同,數(shù)據(jù)被添加到不同的Statement執(zhí)行,導致最終的批量操作失效。
總結
整個批量過程可以分為兩個階段:
- 1、將批量數(shù)據(jù)添加到statementImpl.batchedArgs中保存。
- 2、調用statement.executeBatch方法完成批量。
來看下這兩步最基本的操作之上,MybatisPlus做了哪些事情:
- ? 1、定義批量操作的模板。
- ? 2、驗證集合中的數(shù)據(jù),將完全一致的SQL添加到同一個Statement中。
- ? 3、Jdbc3KeyGenerator?
值得注意的是,批量模板中的單條新增調用的是sqlSession.insert(),這個方法是沒有執(zhí)行execute的,只是將數(shù)據(jù)放到statementImpl.batchedArgs中。而常規(guī)的單條新增調用的是baseMapper.insert()方法,其是基于動態(tài)代理的方法來實現(xiàn)。
可以看出以上流程經(jīng)歷了已知的四層結構:MybatisPlus–>Mybatis–>MySQL connector–>JDK。其最底層還是通過preparedStatement來實現(xiàn)批量操作,和我們通過原始JDBC來實現(xiàn)批量操作的原理相同,上層都是框架實現(xiàn)的封裝。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Spring中使用自定義ThreadLocal存儲導致的坑及解決
這篇文章主要介紹了Spring中使用自定義ThreadLocal存儲導致的坑及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12Kafka多節(jié)點分布式集群搭建實現(xiàn)過程詳解
這篇文章主要介紹了Kafka多節(jié)點分布式集群搭建實現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-11-11淺談Spring Cloud中的API網(wǎng)關服務Zuul
這篇文章主要介紹了淺談Spring Cloud中的API網(wǎng)關服務Zuul,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-10-10Spring Boot實現(xiàn)Undertow服務器同時支持HTTP2、HTTPS的方法
這篇文章考慮如何讓Spring Boot應用程序同時支持HTTP和HTTPS兩種協(xié)議。小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12