Seata?AT獲取數(shù)據(jù)表元數(shù)據(jù)源碼詳解
前言
我們都知道Seata AT是基于前后鏡像來實(shí)現(xiàn)事務(wù)的成功回滾的,前后鏡像的生成依賴于數(shù)據(jù)表的元數(shù)據(jù),Seata是如何生成前后鏡像的可以看這篇博客:你知道Seata AT模式中前后鏡像是如何生成的嘛?。
起初我以為數(shù)據(jù)庫(kù)Driver提供了現(xiàn)成的API給開發(fā)人員獲取指定數(shù)據(jù)表的元數(shù)據(jù),今天看了源碼才知道,并沒有想象中那么簡(jiǎn)單。下面我們就來一起看看到底是怎么一回事兒。
一探究竟
我們直接展開關(guān)鍵性的seata源碼,進(jìn)入DataSourceProxy.init()
方法中:
// 是否允許開啟定時(shí)任務(wù)檢查更新元數(shù)據(jù) if (ENABLE_TABLE_META_CHECKER_ENABLE) { // 開啟定時(shí)任務(wù),默認(rèn)一分鐘更新檢查一下 tableMetaExecutor.scheduleAtFixedRate(() -> { // 獲取數(shù)據(jù)庫(kù)鏈接 try (Connection connection = dataSource.getConnection()) { // 更新緩存中的數(shù)據(jù)表元數(shù)據(jù) TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()) .refresh(connection, DataSourceProxy.this.getResourceId()); } catch (Exception ignore) {} }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS); }
Seata AT在創(chuàng)建了DataSourceProxy
對(duì)象后,馬上會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),一分鐘檢查一次緩存中的元數(shù)據(jù)。
跟著關(guān)鍵代碼,我們可以追蹤到AbstractTableMetaCache
類,這個(gè)抽象類其實(shí)就提供了兩個(gè)方法:
public abstract class AbstractTableMetaCache implements TableMetaCache { @Override public TableMeta getTableMeta(final Connection connection, final String tableName, String resourceId) { // 如果緩存中有對(duì)應(yīng)數(shù)據(jù)就返回,否則就去查詢?cè)獢?shù)據(jù)并放在緩存中。 } @Override public void refresh(final Connection connection, String resourceId) { // 更新緩存 } }
最后我們發(fā)現(xiàn)獲取數(shù)據(jù)表元數(shù)據(jù)的代碼實(shí)現(xiàn)在fetchSchema()
方法中,但是這個(gè)方法是一個(gè)抽象方法,有多個(gè)實(shí)現(xiàn):
我們就挑一個(gè)MysqlTableMetaCache
來看一下里面是如何實(shí)現(xiàn)的。
@Override protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException { String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1"; try (Statement stmt = connection.createStatement(); // 執(zhí)行SQL語(yǔ)句:SELECT * FROM [tableName] LIMIT 1; ResultSet rs = stmt.executeQuery(sql)) { // 根據(jù)執(zhí)行結(jié)果獲取元數(shù)據(jù) return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData()); } catch (SQLException sqlEx) { throw sqlEx; } catch (Exception e) { throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e); } }
根據(jù)上面源碼,我們發(fā)現(xiàn)Seata獲取Mysql數(shù)據(jù)表的元數(shù)據(jù)竟然是通過SELECT * FROM [tableName] LIMIT 1
來的,但是事實(shí)并不是我們想象的這么簡(jiǎn)單,繼續(xù)深入resultSetMetaToSchema()
方法:
private TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd) throws SQLException { //always "" for mysql String schemaName = rsmd.getSchemaName(1); String catalogName = rsmd.getCatalogName(1); /* * 通過ResultSetMetaData獲取tableName可以避免以下情況 * * select * from account_tbl * select * from account_TBL * select * from `account_tbl` * select * from account.account_tbl */ String tableName = rsmd.getTableName(1); TableMeta tm = new TableMeta(); tm.setTableName(tableName); /* * here has two different type to get the data * make sure the table name was right * 1. show full columns from xxx from xxx(normal) * 2. select xxx from xxx where catalog_name like ? and table_name like ?(informationSchema=true) */ // 通過dbmd發(fā)送查詢語(yǔ)句獲取指定表中的所有列信息 try (ResultSet rsColumns = dbmd.getColumns(catalogName, schemaName, tableName, "%"); // 發(fā)送查詢語(yǔ)句獲取表中索引信息 ResultSet rsIndex = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true); // 查詢更新行中的任何值時(shí)自動(dòng)更新的列的信息 ResultSet onUpdateColumns = dbmd.getVersionColumns(catalogName, schemaName, tableName)) { // 收集列信息 while (rsColumns.next()) { ColumnMeta col = new ColumnMeta(); col.setTableCat(rsColumns.getString("TABLE_CAT")); col.setTableSchemaName(rsColumns.getString("TABLE_SCHEM")); col.setTableName(rsColumns.getString("TABLE_NAME")); col.setColumnName(rsColumns.getString("COLUMN_NAME")); col.setDataType(rsColumns.getInt("DATA_TYPE")); col.setDataTypeName(rsColumns.getString("TYPE_NAME")); col.setColumnSize(rsColumns.getInt("COLUMN_SIZE")); col.setDecimalDigits(rsColumns.getInt("DECIMAL_DIGITS")); col.setNumPrecRadix(rsColumns.getInt("NUM_PREC_RADIX")); col.setNullAble(rsColumns.getInt("NULLABLE")); col.setRemarks(rsColumns.getString("REMARKS")); col.setColumnDef(rsColumns.getString("COLUMN_DEF")); col.setSqlDataType(rsColumns.getInt("SQL_DATA_TYPE")); col.setSqlDatetimeSub(rsColumns.getInt("SQL_DATETIME_SUB")); col.setCharOctetLength(rsColumns.getInt("CHAR_OCTET_LENGTH")); col.setOrdinalPosition(rsColumns.getInt("ORDINAL_POSITION")); col.setIsNullAble(rsColumns.getString("IS_NULLABLE")); col.setIsAutoincrement(rsColumns.getString("IS_AUTOINCREMENT")); if (tm.getAllColumns().containsKey(col.getColumnName())) { throw new NotSupportYetException("Not support the table has the same column name with different case yet"); } tm.getAllColumns().put(col.getColumnName(), col); } while (onUpdateColumns.next()) { tm.getAllColumns().get(onUpdateColumns.getString("COLUMN_NAME")).setOnUpdate(true); } // 收集索引信息 while (rsIndex.next()) { String indexName = rsIndex.getString("INDEX_NAME"); String colName = rsIndex.getString("COLUMN_NAME"); ColumnMeta col = tm.getAllColumns().get(colName); if (tm.getAllIndexes().containsKey(indexName)) { IndexMeta index = tm.getAllIndexes().get(indexName); index.getValues().add(col); } else { IndexMeta index = new IndexMeta(); index.setIndexName(indexName); index.setNonUnique(rsIndex.getBoolean("NON_UNIQUE")); index.setIndexQualifier(rsIndex.getString("INDEX_QUALIFIER")); index.setIndexName(rsIndex.getString("INDEX_NAME")); index.setType(rsIndex.getShort("TYPE")); index.setOrdinalPosition(rsIndex.getShort("ORDINAL_POSITION")); index.setAscOrDesc(rsIndex.getString("ASC_OR_DESC")); index.setCardinality(rsIndex.getInt("CARDINALITY")); index.getValues().add(col); if ("PRIMARY".equalsIgnoreCase(indexName)) { index.setIndextype(IndexType.PRIMARY); } else if (!index.isNonUnique()) { index.setIndextype(IndexType.UNIQUE); } else { index.setIndextype(IndexType.NORMAL); } tm.getAllIndexes().put(indexName, index); } } if (tm.getAllIndexes().isEmpty()) { throw new ShouldNeverHappenException("Could not found any index in the table: " + tableName); } } return tm; }
可以發(fā)現(xiàn),在mysql的實(shí)現(xiàn)中,我們查詢一個(gè)表的元數(shù)據(jù),需要執(zhí)行四條SQL語(yǔ)句,另外Oracle和Postgresql實(shí)現(xiàn)中,也是要執(zhí)行三條查詢語(yǔ)句的。在數(shù)據(jù)表變動(dòng)不是很頻繁的情況下,seata遵循讀多寫少用緩存的原則,并通過定時(shí)任務(wù)的方式來保持拿到的數(shù)據(jù)表元數(shù)據(jù)是最新的。
小結(jié)
在seata獲取數(shù)據(jù)表元數(shù)據(jù)的實(shí)現(xiàn)中,我們通過閱讀源碼的方式,大致收獲了以下幾點(diǎn):
1.seata AT模式默認(rèn)會(huì)開啟定時(shí)任務(wù)每分鐘更新數(shù)據(jù)表元數(shù)據(jù),這是一個(gè)配置項(xiàng),在確認(rèn)運(yùn)行時(shí)數(shù)據(jù)表不會(huì)變更的情況下,開發(fā)人員可以不開啟該定時(shí)任務(wù)關(guān)閉。client.rm.tableMetaCheckEnable=false
即可關(guān)閉該定時(shí)任務(wù)。
2.seata獲取數(shù)據(jù)表元數(shù)據(jù)至少需要進(jìn)行三次以上的查詢,這屬于一個(gè)比較重的操作。為了避免獲取元數(shù)據(jù)影響業(yè)務(wù)的吞吐量,seata遵循了讀多寫少用緩存的原則,來盡可能地降低該操作帶來的影響。
以上就是Seata AT獲取數(shù)據(jù)表元數(shù)據(jù)源碼詳解的詳細(xì)內(nèi)容,更多關(guān)于Seata AT獲取數(shù)據(jù)表元數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Boot文件上傳原理與實(shí)現(xiàn)詳解
這篇文章主要介紹了Spring Boot 文件上傳原理與實(shí)現(xiàn)詳解,前端文件上傳是面向多用戶的,多用戶之間可能存在上傳同一個(gè)名稱、類型的文件;為了避免文件沖突導(dǎo)致的覆蓋問題這些應(yīng)該在后臺(tái)進(jìn)行解決,需要的朋友可以參考下2024-01-01SpringBoot整合mybatisplus和druid的示例詳解
這篇文章主要介紹了SpringBoot整合mybatisplus和druid的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08快速解決VS Code報(bào)錯(cuò):Java 11 or more recent is required to run. Ple
這篇文章主要介紹了快速解決VS Code報(bào)錯(cuò):Java 11 or more recent is required to run. Please download and install a recent JDK的相關(guān)資料,需要的朋友可以參考下2020-09-09Spring AOP與AspectJ的對(duì)比及應(yīng)用詳解
這篇文章主要為大家介紹了Spring AOP與AspectJ的對(duì)比及應(yīng)用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02Java 使用IO流實(shí)現(xiàn)大文件的分割與合并實(shí)例詳解
這篇文章主要介紹了Java 使用IO流實(shí)現(xiàn)大文件的分割與合并實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2016-12-12