基于Java實現(xiàn)一個簡單的數(shù)據(jù)同步組件
目前關(guān)于數(shù)據(jù)同步的組件開源社區(qū)有很多,如:Flink CDC, DataX, seaTunel,Kattle 等等,大體上可以分為兩種:基于日志的和基于 JDBC 的。這些同步組件在進行整庫同步或者 schema 差異性不大的情況下通過可視化界面或者配置文件映射的方式可以直接達到我們庫-庫的同步訴求,但是對于一些定制化較大的場景處理起來還比較麻煩。為了滿足這樣的一個場景,筆者寫了一個小的同步組件。
PS: 我們的業(yè)務(wù)場景比較特殊,源的種類比較多,有 Oracle、Mysql、文件以及 API 接口等。另外一點是需要同步的數(shù)據(jù)量不是很大,引入額外的數(shù)據(jù)同步組件對于我們來說會有額外的運維成本和學(xué)習(xí)成本,因此基于上述兩個點,決定自己寫一個小的組件。
業(yè)務(wù)框架
兩個基本需求
- 支持雙寫,即外部源經(jīng)過數(shù)據(jù)同步組件,一方面是根據(jù)自定義的標準化模型轉(zhuǎn)換成需要的專題數(shù)據(jù);另一方面是需要將原始數(shù)據(jù)原封不動的重新落到我們自己的庫中;
- 能夠支持將專題庫數(shù)據(jù)通過逆向轉(zhuǎn)換,寫成不同的原始庫的數(shù)據(jù)格式
組件模型
以 Mysql 為例,同步組件的主要模塊如下:
- 1、綠色背景是業(yè)務(wù)插件,每個業(yè)務(wù)都會對應(yīng)一個業(yè)務(wù)插件,編寫同步器 Syncer 和 轉(zhuǎn)換器 Convertor
- 2、點虛線是一條單獨的通道,用于將目標庫數(shù)據(jù)轉(zhuǎn)換成不同的原始庫數(shù)據(jù)
- 3、借助了一些隊列實現(xiàn)了簡單的生產(chǎn)者消費者模型,隊列作為一個緩沖區(qū),以便于后續(xù)實現(xiàn)對于讀取原始庫和寫目標庫的控制。
項目模塊劃分如下:
- api-web 對外提供 api 服務(wù),能夠發(fā)起數(shù)據(jù)同步任務(wù)、取消數(shù)據(jù)同步任務(wù)、暫停數(shù)據(jù)同步任務(wù)和取消數(shù)據(jù)同步任務(wù)等接口
- common 是一些公共的工具、模型、枚舉。
- connectors 連接器的具體實現(xiàn),比如 MysqlConnector 就是基于 JDBC API 實現(xiàn)對原始庫的數(shù)據(jù)讀。
- core 核心包,主要包括一些接口的定義和邏輯處理的標準化流程
- executors 執(zhí)行器層,包括任務(wù)管理、線程池資源管理等
- plugins 具體業(yè)務(wù)插件,主要使用 Syncer 同步器和 Convertor 模型轉(zhuǎn)換器
核心問題
針對數(shù)據(jù)同步組件,解決的核心問題可以抽象成如下模型:A_DB -> A -> B -> B_DB
;即將 A 數(shù)據(jù)庫中的數(shù)據(jù)讀取出來之后轉(zhuǎn)換成 A class instance,然后將 A class instance 轉(zhuǎn)換成 B class instance,再將 B class instance 寫到 B 數(shù)據(jù)庫。
解決 A_DB 到 A
從 A_DB -> A 或者 B -> B_DB 這個過程,就是我們所熟知的 ORM 解決的問題;不管是 hibernate、mybatis 還是 SpringBoot JPA 都是圍繞著這個問題展開的。
在本篇的組件中,因沒有引入 ORM,所以將數(shù)據(jù)庫行映射成一個 java 對象也需要自己實現(xiàn)。DataX 中是通過配置文件來描述的,在本篇中沒有才采用這種描述方式,而是通過語言耦合性更高的注解的方式來實現(xiàn)的(由業(yè)務(wù)屬性決定);
如下是一個描述具體業(yè)務(wù)的 Java 對象的定義,@Table
注解用來描述 JmltModel 和哪個表是關(guān)聯(lián)的, @Colum
注解用來描述屬性是和哪個字段關(guān)聯(lián)的.
@Data // @Table 注解用來描述 JmltModel 和哪個表是關(guān)聯(lián) 的 @Table(name = "user_info") public class JmltModel implements Serializable { // @Colum 注解用來描述屬性是和哪個字段關(guān)聯(lián)的 @Colum(name = "id") private Long id; @Colum(name = "email") private String email; @Colum(name = "name") private String name; @Colum(name = "create_time") private Date create_time; }
有了這個描述關(guān)系,即可以在 runtime 時通過泛型 + 反射來實現(xiàn) A_DB -> A
過程的模板設(shè)計。
以 MysqlConnector
的實現(xiàn)來進行說明,下面抽取了 MysqlConnector
組件中的部分代碼(做了一些刪減);下面這段代碼中有 1-6 6 的步驟,這部分屬于生產(chǎn)端,即從原始 Mysql 表中分頁讀取數(shù)據(jù),并將讀取到的數(shù)據(jù)映射成實際的對象,再通過業(yè)務(wù)定義的 convertor 轉(zhuǎn)換成目標的對象,最后丟到隊列中去等待消費。
// 1、originClass 是原始庫對象,這里通過反射獲取 Table 注解,從而拿到表名 Table table = (Table) originClass.getDeclaredAnnotation(Table.class); String tableName = table.name(); // 2、計算所有的條數(shù),然后按照分頁的方式進行 fetch SqlTemplate sqlTemplate = new SqlTemplate(this.originDataSource); int totalCount = sqlTemplate.count(); RowBounds rowBounds = new RowBounds(totalCount); int totalPage = rowBounds.getTotalPage(); // 3、這里是按分頁批量拉取 for (int i = 1; i <= totalPage; i++) { int offset = rowBounds.getOffset(i); String condition = " limit " + offset + "," + rowBounds.getPageSize(); ResultSet resultSet = sqlTemplate.select(condition); // 4、將 ResultSet 轉(zhuǎn)成 A 這里就是從 A_DB 到 A 的過程 ResultSetExtractor<R> extractor = (ResultSetExtractor<R>) new ResultSetExtractor<>(originClass); List<R> result = extractor.extractData(resultSet); // 5、將 A 轉(zhuǎn)成 B List<T> targetResult = convertor.batchConvertFrom(result); // 6、丟到隊列中去等待消費 this.rowObjectManager.pushToQueue(targetResult); }
上述代碼片段中的 4 ,就是從 A_DB 到 A
的過程,實際上這部分是 ResultSet 到 Java 對象的過程。一般情況下,我們基于 JDBC API 編程時, ResultSet 到 Java,對于業(yè)務(wù)來說是非常明確的。大體是這樣:
String selectSql = "SELECT * FROM employees"; try (ResultSet resultSet = stmt.executeQuery(selectSql)) { List<Employee> employees = new ArrayList<>(); while (resultSet.next()) { Employee emp = new Employee(); emp.setId(resultSet.getInt("emp_id")); emp.setName(resultSet.getString("name")); emp.setPosition(resultSet.getString("position")); emp.setSalary(resultSet.getDouble("salary")); employees.add(emp); }
這種從對于明確 Java 對象的情況下是可以的;但是對于一個通用組件來說肯定無法滿足。那么就需要解決如何將 ResultSet 轉(zhuǎn)換成 Java 對象變得的更普適一些。思路是:ResultSet -> Map -> Java Object
,通過 ResultSet 的 getMetaData 可以取到所有的列名(K)和值(V),并將其存儲到 Map 中,代碼如下:
/** * 將 resultSet 轉(zhuǎn)成 Map * * @param resultSet * @return * @throws SQLException */ private Map<String, Object> resultSetToMap(ResultSet resultSet) throws Exception { Map<String, Object> resultMap = new HashMap<>(); // 獲取 ResultSet 的元數(shù)據(jù) int columnCount = resultSet.getMetaData().getColumnCount(); // 遍歷每一列,將列名和值存儲到 Map 中 for (int i = 1; i <= columnCount; i++) { String columnName = resultSet.getMetaData().getColumnName(i); Object value = resultSet.getObject(i); resultMap.put(columnName, value); } return resultMap; }
接著是將 Map 轉(zhuǎn)換成 Java 對象,當(dāng)然在框架層面的實現(xiàn)上,這里通過泛型機制來實現(xiàn)更加通用的場景:
private T mapResultSetToObject(Map<String, Object> resultMap, Class<T> objectType) throws Exception { // 通過目標對象類型構(gòu)建一個對象 T object = objectType.newInstance(); // 將 map 的 key 作為 field 的名字,map 的 value 作為 field 的值 for (Map.Entry<String, Object> entry : resultMap.entrySet()) { String fieldName = entry.getKey(); Object value = entry.getValue(); try { Field declaredField = objectType.getDeclaredField(fieldName); declaredField.setAccessible(true); declaredField.set(object, value); } catch (NoSuchFieldException e) { LOGGER.error("ignore exception, fieldName: " + fieldName + ", objectType: " + objectType); } } // 完成對象的填充并返回 return object; }
從 A 到 B 屬于業(yè)務(wù)自己定義的,即 Convertor 部分,下面是 Convertor 的接口定義,業(yè)務(wù)實現(xiàn)此接口用于實現(xiàn)對象到對象的轉(zhuǎn)換(包括批量轉(zhuǎn)換)
// T 是目標對象類型,R 是原始對象類型 public interface Convertor<T, R> { /** * 將 T 轉(zhuǎn)換成 R * * @param origin * @return */ T convertFrom(R origin) throws SQLException; /** * 將 R 轉(zhuǎn)換成 T * * @param target * @return */ R convertTo(T target); /** * 批量轉(zhuǎn)換 * @param origin * @return * @throws SQLException */ List<T> batchConvertFrom(List<R> origin) throws SQLException; /** * 批量轉(zhuǎn)換將 R 轉(zhuǎn)換成 T * * @param target * @return */ List<R> batchConvertTo(List<T> target);
從 B 到 B_DB
前面是 A_DB 到 A 再到 B 的過程,那么接下來就是 B 到 B_DB 的過程。從前面的流程圖中可以看出,組件中借助了隊列。本篇文章的實現(xiàn)是基于 Disruptor 實現(xiàn)的。在 Convertor 中 A-> B 的邏輯完成之后,就會將 B 的 List 丟到 Disruptor 的 ringBuffer 中等待消費。消費邏輯如下:
public void onEvent(RowObjectEvent rowObjectEvent, long sequence, boolean endOfBatch) { // targetResult List<T> targetResult = (List) rowObjectEvent.getRowObject(); // 將 T 寫到 目標庫 Connection tc = null; PreparedStatement pstm = null; try { // 下面即為獲取連接,創(chuàng)建 prepareStatement 和執(zhí)行 tc = this.targetDataSource.getConnection(); SqlTemplate<T> sqlTemplate = new SqlTemplate<>(this.targetDataSource); sqlTemplate.setObj(targetResult.get(0)); String sql = sqlTemplate.createBaseSql(); pstm = tc.prepareStatement(sql); for (T item : targetResult) { Object[] objects = sqlTemplate.createInsertSql(item); for (int i = 1; i <= objects.length; i++) { if (objects[i - 1] instanceof Long) { objects[i - 1] = (Long) objects[i - 1] + 1; } pstm.setObject(i, objects[i - 1]); } // 這里執(zhí)行時批量操作的 pstm.addBatch(); } pstm.executeUpdate(); } catch (Exception e) { // ignore some code... } }
通過上述幾個片段,大體闡述了數(shù)據(jù)同步的過程,以及本篇所實現(xiàn)的組件的部分核心邏輯。
總結(jié)
本篇中的代碼片段是比較零碎的,其中有相關(guān)部分的邏輯并沒有在本篇中體現(xiàn);本篇的主要目的是為了闡述基于 JDBC API 如何實現(xiàn)同步(實際上不僅僅是基于 JDBC 和面向關(guān)系型數(shù)據(jù)庫),并且為圍繞 DB -> A -> B -> DB
這樣的思路給出了每一步實現(xiàn)的代碼,有興趣的同學(xué)可以嘗試自己實現(xiàn)一個數(shù)據(jù)同步組件。
以上就是基于Java實現(xiàn)一個簡單的數(shù)據(jù)同步組件的詳細內(nèi)容,更多關(guān)于Java數(shù)據(jù)同步組件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中xxl-job實現(xiàn)分片廣播任務(wù)的示例
本文主要介紹了Java中xxl-job實現(xiàn)分片廣播任務(wù)的示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03springboot?vue測試列表遞歸查詢子節(jié)點下的接口功能實現(xiàn)
這篇文章主要為大家介紹了springboot?vue測試列表遞歸查詢子節(jié)點下的接口功能實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05cmd中javac命令無法運行(java指令能運行)解決步驟
這篇文章主要介紹了在安裝JDK后,執(zhí)行javac命令沒有返回值的問題,可能是由于命令提示符窗口緩存問題、系統(tǒng)路徑優(yōu)先級問題、文件權(quán)限問題或命令行輸入問題,文中通過代碼將解決的步驟介紹的非常詳細,需要的朋友可以參考下2025-02-02SpringBoot優(yōu)雅捕捉異常的兩種方法小結(jié)
SpringBoot框架對異常的處理提供了幾種很強大的方法,我們可以通過@ControllerAdvice和@ExceptionHandler注解實現(xiàn)全局異常的處理,下面就來介紹一下這兩種方法的實現(xiàn),感興趣的可以了解一下2024-08-08