Seata?AT獲取數(shù)據(jù)表元數(shù)據(jù)源碼詳解
前言
我們都知道Seata AT是基于前后鏡像來實(shí)現(xiàn)事務(wù)的成功回滾的,前后鏡像的生成依賴于數(shù)據(jù)表的元數(shù)據(jù),Seata是如何生成前后鏡像的可以看這篇博客:你知道Seata AT模式中前后鏡像是如何生成的嘛?。
起初我以為數(shù)據(jù)庫Driver提供了現(xiàn)成的API給開發(fā)人員獲取指定數(shù)據(jù)表的元數(shù)據(jù),今天看了源碼才知道,并沒有想象中那么簡單。下面我們就來一起看看到底是怎么一回事兒。
一探究竟
我們直接展開關(guān)鍵性的seata源碼,進(jìn)入DataSourceProxy.init()方法中:
// 是否允許開啟定時任務(wù)檢查更新元數(shù)據(jù)
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
// 開啟定時任務(wù),默認(rèn)一分鐘更新檢查一下
tableMetaExecutor.scheduleAtFixedRate(() -> {
// 獲取數(shù)據(jù)庫鏈接
try (Connection connection = dataSource.getConnection()) {
// 更新緩存中的數(shù)據(jù)表元數(shù)據(jù)
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
Seata AT在創(chuàng)建了DataSourceProxy對象后,馬上會啟動一個定時任務(wù),一分鐘檢查一次緩存中的元數(shù)據(jù)。
跟著關(guān)鍵代碼,我們可以追蹤到AbstractTableMetaCache類,這個抽象類其實(shí)就提供了兩個方法:
public abstract class AbstractTableMetaCache implements TableMetaCache {
@Override
public TableMeta getTableMeta(final Connection connection, final String tableName, String resourceId) {
// 如果緩存中有對應(yīng)數(shù)據(jù)就返回,否則就去查詢元數(shù)據(jù)并放在緩存中。
}
@Override
public void refresh(final Connection connection, String resourceId) {
// 更新緩存
}
}
最后我們發(fā)現(xiàn)獲取數(shù)據(jù)表元數(shù)據(jù)的代碼實(shí)現(xiàn)在fetchSchema()方法中,但是這個方法是一個抽象方法,有多個實(shí)現(xiàn):

我們就挑一個MysqlTableMetaCache來看一下里面是如何實(shí)現(xiàn)的。
@Override
protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
// 執(zhí)行SQL語句:SELECT * FROM [tableName] LIMIT 1;
ResultSet rs = stmt.executeQuery(sql)) {
// 根據(jù)執(zhí)行結(jié)果獲取元數(shù)據(jù)
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
}
}
根據(jù)上面源碼,我們發(fā)現(xiàn)Seata獲取Mysql數(shù)據(jù)表的元數(shù)據(jù)竟然是通過SELECT * FROM [tableName] LIMIT 1來的,但是事實(shí)并不是我們想象的這么簡單,繼續(xù)深入resultSetMetaToSchema()方法:
private TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd)
throws SQLException {
//always "" for mysql
String schemaName = rsmd.getSchemaName(1);
String catalogName = rsmd.getCatalogName(1);
/*
* 通過ResultSetMetaData獲取tableName可以避免以下情況
*
* select * from account_tbl
* select * from account_TBL
* select * from `account_tbl`
* select * from account.account_tbl
*/
String tableName = rsmd.getTableName(1);
TableMeta tm = new TableMeta();
tm.setTableName(tableName);
/*
* here has two different type to get the data
* make sure the table name was right
* 1. show full columns from xxx from xxx(normal)
* 2. select xxx from xxx where catalog_name like ? and table_name like ?(informationSchema=true)
*/
// 通過dbmd發(fā)送查詢語句獲取指定表中的所有列信息
try (ResultSet rsColumns = dbmd.getColumns(catalogName, schemaName, tableName, "%");
// 發(fā)送查詢語句獲取表中索引信息
ResultSet rsIndex = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true);
// 查詢更新行中的任何值時自動更新的列的信息
ResultSet onUpdateColumns = dbmd.getVersionColumns(catalogName, schemaName, tableName)) {
// 收集列信息
while (rsColumns.next()) {
ColumnMeta col = new ColumnMeta();
col.setTableCat(rsColumns.getString("TABLE_CAT"));
col.setTableSchemaName(rsColumns.getString("TABLE_SCHEM"));
col.setTableName(rsColumns.getString("TABLE_NAME"));
col.setColumnName(rsColumns.getString("COLUMN_NAME"));
col.setDataType(rsColumns.getInt("DATA_TYPE"));
col.setDataTypeName(rsColumns.getString("TYPE_NAME"));
col.setColumnSize(rsColumns.getInt("COLUMN_SIZE"));
col.setDecimalDigits(rsColumns.getInt("DECIMAL_DIGITS"));
col.setNumPrecRadix(rsColumns.getInt("NUM_PREC_RADIX"));
col.setNullAble(rsColumns.getInt("NULLABLE"));
col.setRemarks(rsColumns.getString("REMARKS"));
col.setColumnDef(rsColumns.getString("COLUMN_DEF"));
col.setSqlDataType(rsColumns.getInt("SQL_DATA_TYPE"));
col.setSqlDatetimeSub(rsColumns.getInt("SQL_DATETIME_SUB"));
col.setCharOctetLength(rsColumns.getInt("CHAR_OCTET_LENGTH"));
col.setOrdinalPosition(rsColumns.getInt("ORDINAL_POSITION"));
col.setIsNullAble(rsColumns.getString("IS_NULLABLE"));
col.setIsAutoincrement(rsColumns.getString("IS_AUTOINCREMENT"));
if (tm.getAllColumns().containsKey(col.getColumnName())) {
throw new NotSupportYetException("Not support the table has the same column name with different case yet");
}
tm.getAllColumns().put(col.getColumnName(), col);
}
while (onUpdateColumns.next()) {
tm.getAllColumns().get(onUpdateColumns.getString("COLUMN_NAME")).setOnUpdate(true);
}
// 收集索引信息
while (rsIndex.next()) {
String indexName = rsIndex.getString("INDEX_NAME");
String colName = rsIndex.getString("COLUMN_NAME");
ColumnMeta col = tm.getAllColumns().get(colName);
if (tm.getAllIndexes().containsKey(indexName)) {
IndexMeta index = tm.getAllIndexes().get(indexName);
index.getValues().add(col);
} else {
IndexMeta index = new IndexMeta();
index.setIndexName(indexName);
index.setNonUnique(rsIndex.getBoolean("NON_UNIQUE"));
index.setIndexQualifier(rsIndex.getString("INDEX_QUALIFIER"));
index.setIndexName(rsIndex.getString("INDEX_NAME"));
index.setType(rsIndex.getShort("TYPE"));
index.setOrdinalPosition(rsIndex.getShort("ORDINAL_POSITION"));
index.setAscOrDesc(rsIndex.getString("ASC_OR_DESC"));
index.setCardinality(rsIndex.getInt("CARDINALITY"));
index.getValues().add(col);
if ("PRIMARY".equalsIgnoreCase(indexName)) {
index.setIndextype(IndexType.PRIMARY);
} else if (!index.isNonUnique()) {
index.setIndextype(IndexType.UNIQUE);
} else {
index.setIndextype(IndexType.NORMAL);
}
tm.getAllIndexes().put(indexName, index);
}
}
if (tm.getAllIndexes().isEmpty()) {
throw new ShouldNeverHappenException("Could not found any index in the table: " + tableName);
}
}
return tm;
}
可以發(fā)現(xiàn),在mysql的實(shí)現(xiàn)中,我們查詢一個表的元數(shù)據(jù),需要執(zhí)行四條SQL語句,另外Oracle和Postgresql實(shí)現(xiàn)中,也是要執(zhí)行三條查詢語句的。在數(shù)據(jù)表變動不是很頻繁的情況下,seata遵循讀多寫少用緩存的原則,并通過定時任務(wù)的方式來保持拿到的數(shù)據(jù)表元數(shù)據(jù)是最新的。
小結(jié)
在seata獲取數(shù)據(jù)表元數(shù)據(jù)的實(shí)現(xiàn)中,我們通過閱讀源碼的方式,大致收獲了以下幾點(diǎn):
1.seata AT模式默認(rèn)會開啟定時任務(wù)每分鐘更新數(shù)據(jù)表元數(shù)據(jù),這是一個配置項(xiàng),在確認(rèn)運(yùn)行時數(shù)據(jù)表不會變更的情況下,開發(fā)人員可以不開啟該定時任務(wù)關(guān)閉。client.rm.tableMetaCheckEnable=false即可關(guān)閉該定時任務(wù)。
2.seata獲取數(shù)據(jù)表元數(shù)據(jù)至少需要進(jìn)行三次以上的查詢,這屬于一個比較重的操作。為了避免獲取元數(shù)據(jù)影響業(yè)務(wù)的吞吐量,seata遵循了讀多寫少用緩存的原則,來盡可能地降低該操作帶來的影響。
以上就是Seata AT獲取數(shù)據(jù)表元數(shù)據(jù)源碼詳解的詳細(xì)內(nèi)容,更多關(guān)于Seata AT獲取數(shù)據(jù)表元數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Boot文件上傳原理與實(shí)現(xiàn)詳解
這篇文章主要介紹了Spring Boot 文件上傳原理與實(shí)現(xiàn)詳解,前端文件上傳是面向多用戶的,多用戶之間可能存在上傳同一個名稱、類型的文件;為了避免文件沖突導(dǎo)致的覆蓋問題這些應(yīng)該在后臺進(jìn)行解決,需要的朋友可以參考下2024-01-01
SpringBoot整合mybatisplus和druid的示例詳解
這篇文章主要介紹了SpringBoot整合mybatisplus和druid的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08
快速解決VS Code報錯:Java 11 or more recent is required to run. Ple
這篇文章主要介紹了快速解決VS Code報錯:Java 11 or more recent is required to run. Please download and install a recent JDK的相關(guān)資料,需要的朋友可以參考下2020-09-09
Spring AOP與AspectJ的對比及應(yīng)用詳解
這篇文章主要為大家介紹了Spring AOP與AspectJ的對比及應(yīng)用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02
Java 使用IO流實(shí)現(xiàn)大文件的分割與合并實(shí)例詳解
這篇文章主要介紹了Java 使用IO流實(shí)現(xiàn)大文件的分割與合并實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2016-12-12

