欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的數(shù)據(jù)同步組件

 更新時(shí)間:2023年06月19日 10:07:18   作者:磊叔的技術(shù)博客  
這篇文章主要為大家詳細(xì)介紹了如何基于Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的數(shù)據(jù)同步組件,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解一下

目前關(guān)于數(shù)據(jù)同步的組件開源社區(qū)有很多,如:Flink CDC, DataX, seaTunel,Kattle 等等,大體上可以分為兩種:基于日志的和基于 JDBC 的。這些同步組件在進(jìn)行整庫(kù)同步或者 schema 差異性不大的情況下通過可視化界面或者配置文件映射的方式可以直接達(dá)到我們庫(kù)-庫(kù)的同步訴求,但是對(duì)于一些定制化較大的場(chǎng)景處理起來(lái)還比較麻煩。為了滿足這樣的一個(gè)場(chǎng)景,筆者寫了一個(gè)小的同步組件。

PS: 我們的業(yè)務(wù)場(chǎng)景比較特殊,源的種類比較多,有 Oracle、Mysql、文件以及 API 接口等。另外一點(diǎn)是需要同步的數(shù)據(jù)量不是很大,引入額外的數(shù)據(jù)同步組件對(duì)于我們來(lái)說(shuō)會(huì)有額外的運(yùn)維成本和學(xué)習(xí)成本,因此基于上述兩個(gè)點(diǎn),決定自己寫一個(gè)小的組件。

業(yè)務(wù)框架

兩個(gè)基本需求

  • 支持雙寫,即外部源經(jīng)過數(shù)據(jù)同步組件,一方面是根據(jù)自定義的標(biāo)準(zhǔn)化模型轉(zhuǎn)換成需要的專題數(shù)據(jù);另一方面是需要將原始數(shù)據(jù)原封不動(dòng)的重新落到我們自己的庫(kù)中;
  • 能夠支持將專題庫(kù)數(shù)據(jù)通過逆向轉(zhuǎn)換,寫成不同的原始庫(kù)的數(shù)據(jù)格式

組件模型

以 Mysql 為例,同步組件的主要模塊如下:

  • 1、綠色背景是業(yè)務(wù)插件,每個(gè)業(yè)務(wù)都會(huì)對(duì)應(yīng)一個(gè)業(yè)務(wù)插件,編寫同步器 Syncer 和 轉(zhuǎn)換器 Convertor
  • 2、點(diǎn)虛線是一條單獨(dú)的通道,用于將目標(biāo)庫(kù)數(shù)據(jù)轉(zhuǎn)換成不同的原始庫(kù)數(shù)據(jù)
  • 3、借助了一些隊(duì)列實(shí)現(xiàn)了簡(jiǎn)單的生產(chǎn)者消費(fèi)者模型,隊(duì)列作為一個(gè)緩沖區(qū),以便于后續(xù)實(shí)現(xiàn)對(duì)于讀取原始庫(kù)和寫目標(biāo)庫(kù)的控制。

項(xiàng)目模塊劃分如下:

  • api-web 對(duì)外提供 api 服務(wù),能夠發(fā)起數(shù)據(jù)同步任務(wù)、取消數(shù)據(jù)同步任務(wù)、暫停數(shù)據(jù)同步任務(wù)和取消數(shù)據(jù)同步任務(wù)等接口
  • common 是一些公共的工具、模型、枚舉。
  • connectors 連接器的具體實(shí)現(xiàn),比如 MysqlConnector 就是基于 JDBC API 實(shí)現(xiàn)對(duì)原始庫(kù)的數(shù)據(jù)讀。
  • core 核心包,主要包括一些接口的定義和邏輯處理的標(biāo)準(zhǔn)化流程
  • executors 執(zhí)行器層,包括任務(wù)管理、線程池資源管理等
  • plugins 具體業(yè)務(wù)插件,主要使用 Syncer 同步器和 Convertor 模型轉(zhuǎn)換器

核心問題

針對(duì)數(shù)據(jù)同步組件,解決的核心問題可以抽象成如下模型:A_DB -> A -> B -> B_DB;即將 A 數(shù)據(jù)庫(kù)中的數(shù)據(jù)讀取出來(lái)之后轉(zhuǎn)換成 A class instance,然后將 A class instance 轉(zhuǎn)換成 B class instance,再將 B class instance 寫到 B 數(shù)據(jù)庫(kù)。

解決 A_DB 到 A

從 A_DB -> A 或者 B -> B_DB 這個(gè)過程,就是我們所熟知的 ORM 解決的問題;不管是 hibernate、mybatis 還是 SpringBoot JPA 都是圍繞著這個(gè)問題展開的。

在本篇的組件中,因沒有引入 ORM,所以將數(shù)據(jù)庫(kù)行映射成一個(gè) java 對(duì)象也需要自己實(shí)現(xiàn)。DataX 中是通過配置文件來(lái)描述的,在本篇中沒有才采用這種描述方式,而是通過語(yǔ)言耦合性更高的注解的方式來(lái)實(shí)現(xiàn)的(由業(yè)務(wù)屬性決定);

如下是一個(gè)描述具體業(yè)務(wù)的 Java 對(duì)象的定義,@Table 注解用來(lái)描述 JmltModel 和哪個(gè)表是關(guān)聯(lián)的, @Colum 注解用來(lái)描述屬性是和哪個(gè)字段關(guān)聯(lián)的.

@Data
// @Table 注解用來(lái)描述 JmltModel 和哪個(gè)表是關(guān)聯(lián) 的
@Table(name = "user_info") 
public class JmltModel implements Serializable {
    // @Colum 注解用來(lái)描述屬性是和哪個(gè)字段關(guān)聯(lián)的
    @Colum(name = "id")
    private Long id;
    @Colum(name = "email")
    private String email;
    @Colum(name = "name")
    private String name;
    @Colum(name = "create_time")
    private Date create_time;
}

有了這個(gè)描述關(guān)系,即可以在 runtime 時(shí)通過泛型 + 反射來(lái)實(shí)現(xiàn) A_DB -> A 過程的模板設(shè)計(jì)。

MysqlConnector 的實(shí)現(xiàn)來(lái)進(jìn)行說(shuō)明,下面抽取了 MysqlConnector 組件中的部分代碼(做了一些刪減);下面這段代碼中有 1-6 6 的步驟,這部分屬于生產(chǎn)端,即從原始 Mysql 表中分頁(yè)讀取數(shù)據(jù),并將讀取到的數(shù)據(jù)映射成實(shí)際的對(duì)象,再通過業(yè)務(wù)定義的 convertor 轉(zhuǎn)換成目標(biāo)的對(duì)象,最后丟到隊(duì)列中去等待消費(fèi)。

// 1、originClass 是原始庫(kù)對(duì)象,這里通過反射獲取 Table 注解,從而拿到表名
Table table = (Table) originClass.getDeclaredAnnotation(Table.class);
String tableName = table.name();
// 2、計(jì)算所有的條數(shù),然后按照分頁(yè)的方式進(jìn)行 fetch
SqlTemplate sqlTemplate = new SqlTemplate(this.originDataSource);
int totalCount = sqlTemplate.count();
RowBounds rowBounds = new RowBounds(totalCount);
int totalPage = rowBounds.getTotalPage();
// 3、這里是按分頁(yè)批量拉取
for (int i = 1; i <= totalPage; i++) {
    int offset = rowBounds.getOffset(i);
    String condition = " limit " + offset + "," + rowBounds.getPageSize();
    ResultSet resultSet = sqlTemplate.select(condition);
    // 4、將 ResultSet 轉(zhuǎn)成 A 這里就是從 A_DB 到 A 的過程
    ResultSetExtractor<R> extractor = (ResultSetExtractor<R>) new ResultSetExtractor<>(originClass);
    List<R> result = extractor.extractData(resultSet);
    // 5、將 A 轉(zhuǎn)成 B
    List<T> targetResult = convertor.batchConvertFrom(result);
    // 6、丟到隊(duì)列中去等待消費(fèi)
    this.rowObjectManager.pushToQueue(targetResult);
}

上述代碼片段中的 4 ,就是從 A_DB 到 A 的過程,實(shí)際上這部分是 ResultSet 到 Java 對(duì)象的過程。一般情況下,我們基于 JDBC API 編程時(shí), ResultSet 到 Java,對(duì)于業(yè)務(wù)來(lái)說(shuō)是非常明確的。大體是這樣:

String selectSql = "SELECT * FROM employees"; 
try (ResultSet resultSet = stmt.executeQuery(selectSql)) { 
List<Employee> employees = new ArrayList<>(); 
while (resultSet.next()) 
{ 
    Employee emp = new Employee(); 
    emp.setId(resultSet.getInt("emp_id"));
    emp.setName(resultSet.getString("name"));
    emp.setPosition(resultSet.getString("position")); 
    emp.setSalary(resultSet.getDouble("salary")); 
    employees.add(emp); 
} 

這種從對(duì)于明確 Java 對(duì)象的情況下是可以的;但是對(duì)于一個(gè)通用組件來(lái)說(shuō)肯定無(wú)法滿足。那么就需要解決如何將 ResultSet 轉(zhuǎn)換成 Java 對(duì)象變得的更普適一些。思路是:ResultSet -> Map -> Java Object,通過 ResultSet 的 getMetaData 可以取到所有的列名(K)和值(V),并將其存儲(chǔ)到 Map 中,代碼如下:

/**
 * 將 resultSet 轉(zhuǎn)成 Map
 *
 * @param resultSet
 * @return
 * @throws SQLException
 */
private Map<String, Object> resultSetToMap(ResultSet resultSet) throws Exception {
    Map<String, Object> resultMap = new HashMap<>();
    // 獲取 ResultSet 的元數(shù)據(jù)
    int columnCount = resultSet.getMetaData().getColumnCount();
    // 遍歷每一列,將列名和值存儲(chǔ)到 Map 中
    for (int i = 1; i <= columnCount; i++) {
        String columnName = resultSet.getMetaData().getColumnName(i);
        Object value = resultSet.getObject(i);
        resultMap.put(columnName, value);
    }
    return resultMap;
}

接著是將 Map 轉(zhuǎn)換成 Java 對(duì)象,當(dāng)然在框架層面的實(shí)現(xiàn)上,這里通過泛型機(jī)制來(lái)實(shí)現(xiàn)更加通用的場(chǎng)景:

private T mapResultSetToObject(Map<String, Object> resultMap, Class<T> objectType) throws Exception {
    // 通過目標(biāo)對(duì)象類型構(gòu)建一個(gè)對(duì)象
    T object = objectType.newInstance();  
    // 將 map 的 key 作為 field 的名字,map 的 value 作為 field 的值
    for (Map.Entry<String, Object> entry : resultMap.entrySet()) {  
        String fieldName = entry.getKey();  
        Object value = entry.getValue();  
        try {  
            Field declaredField = objectType.getDeclaredField(fieldName);  
            declaredField.setAccessible(true);  
            declaredField.set(object, value);  
        } catch (NoSuchFieldException e) {  
            LOGGER.error("ignore exception, fieldName: " + fieldName + ", objectType: " + objectType);  
        }  
    }
    // 完成對(duì)象的填充并返回
    return object;  
}

從 A 到 B 屬于業(yè)務(wù)自己定義的,即 Convertor 部分,下面是 Convertor 的接口定義,業(yè)務(wù)實(shí)現(xiàn)此接口用于實(shí)現(xiàn)對(duì)象到對(duì)象的轉(zhuǎn)換(包括批量轉(zhuǎn)換)

// T 是目標(biāo)對(duì)象類型,R 是原始對(duì)象類型
public interface Convertor<T, R> {  
/**  
* 將 T 轉(zhuǎn)換成 R  
*  
* @param origin  
* @return  
*/  
T convertFrom(R origin) throws SQLException;  
/**  
* 將 R 轉(zhuǎn)換成 T  
*  
* @param target  
* @return  
*/  
R convertTo(T target);  
/**  
* 批量轉(zhuǎn)換  
* @param origin  
* @return  
* @throws SQLException  
*/  
List<T> batchConvertFrom(List<R> origin) throws SQLException;  
/**  
* 批量轉(zhuǎn)換將 R 轉(zhuǎn)換成 T  
*  
* @param target  
* @return  
*/ 
List<R> batchConvertTo(List<T> target);

從 B 到 B_DB

前面是 A_DB 到 A 再到 B 的過程,那么接下來(lái)就是 B 到 B_DB 的過程。從前面的流程圖中可以看出,組件中借助了隊(duì)列。本篇文章的實(shí)現(xiàn)是基于 Disruptor 實(shí)現(xiàn)的。在 Convertor 中 A-> B 的邏輯完成之后,就會(huì)將 B 的 List 丟到 Disruptor 的 ringBuffer 中等待消費(fèi)。消費(fèi)邏輯如下:

public void onEvent(RowObjectEvent rowObjectEvent, long sequence, boolean endOfBatch) {  
    // targetResult  
    List<T> targetResult = (List) rowObjectEvent.getRowObject();  
    // 將 T 寫到 目標(biāo)庫(kù)  
    Connection tc = null;  
    PreparedStatement pstm = null;  
    try {  
        // 下面即為獲取連接,創(chuàng)建 prepareStatement 和執(zhí)行
        tc = this.targetDataSource.getConnection();  
        SqlTemplate<T> sqlTemplate = new SqlTemplate<>(this.targetDataSource); 
        sqlTemplate.setObj(targetResult.get(0));  
        String sql = sqlTemplate.createBaseSql();  
        pstm = tc.prepareStatement(sql);  
        for (T item : targetResult) {  
            Object[] objects = sqlTemplate.createInsertSql(item);  
            for (int i = 1; i <= objects.length; i++) {  
                if (objects[i - 1] instanceof Long) {  
                    objects[i - 1] = (Long) objects[i - 1] + 1;  
                }  
                pstm.setObject(i, objects[i - 1]);  
            } 
            // 這里執(zhí)行時(shí)批量操作的
            pstm.addBatch();  
        }  
        pstm.executeUpdate();  
    } catch (Exception e) {  
        // ignore some code...
    }  
}

通過上述幾個(gè)片段,大體闡述了數(shù)據(jù)同步的過程,以及本篇所實(shí)現(xiàn)的組件的部分核心邏輯。

總結(jié)

本篇中的代碼片段是比較零碎的,其中有相關(guān)部分的邏輯并沒有在本篇中體現(xiàn);本篇的主要目的是為了闡述基于 JDBC API 如何實(shí)現(xiàn)同步(實(shí)際上不僅僅是基于 JDBC 和面向關(guān)系型數(shù)據(jù)庫(kù)),并且為圍繞 DB -> A -> B -> DB 這樣的思路給出了每一步實(shí)現(xiàn)的代碼,有興趣的同學(xué)可以嘗試自己實(shí)現(xiàn)一個(gè)數(shù)據(jù)同步組件。

以上就是基于Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的數(shù)據(jù)同步組件的詳細(xì)內(nèi)容,更多關(guān)于Java數(shù)據(jù)同步組件的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java中xxl-job實(shí)現(xiàn)分片廣播任務(wù)的示例

    Java中xxl-job實(shí)現(xiàn)分片廣播任務(wù)的示例

    本文主要介紹了Java中xxl-job實(shí)現(xiàn)分片廣播任務(wù)的示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 關(guān)于Java中常見的負(fù)載均衡算法

    關(guān)于Java中常見的負(fù)載均衡算法

    這篇文章主要介紹了關(guān)于Java中常見的負(fù)載均衡算法,負(fù)載平衡是一種電子計(jì)算機(jī)技術(shù),用來(lái)在多個(gè)計(jì)算機(jī)、網(wǎng)絡(luò)連接、CPU、磁盤驅(qū)動(dòng)器或其他資源中分配負(fù)載,以達(dá)到優(yōu)化資源使用、最大化吞吐率、最小化響應(yīng)時(shí)間、同時(shí)避免過載的目的,需要的朋友可以參考下
    2023-08-08
  • SpringBoot實(shí)現(xiàn)圖片上傳及本地訪問

    SpringBoot實(shí)現(xiàn)圖片上傳及本地訪問

    在SpringBoot項(xiàng)目中,處理靜態(tài)文件訪問尤其是實(shí)時(shí)更新的文件如商品圖片,可通過配置WebMvcConfig將本地文件映射到URL路徑上,以解決重啟項(xiàng)目才能訪問文件的問題,本文詳解如何保存和訪問這些文件,幫助開發(fā)者優(yōu)化項(xiàng)目文件管理
    2022-09-09
  • springboot?vue測(cè)試列表遞歸查詢子節(jié)點(diǎn)下的接口功能實(shí)現(xiàn)

    springboot?vue測(cè)試列表遞歸查詢子節(jié)點(diǎn)下的接口功能實(shí)現(xiàn)

    這篇文章主要為大家介紹了springboot?vue測(cè)試列表遞歸查詢子節(jié)點(diǎn)下的接口功能實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Java如何實(shí)現(xiàn)對(duì)稱加密

    Java如何實(shí)現(xiàn)對(duì)稱加密

    這篇文章主要介紹了Java如何實(shí)現(xiàn)對(duì)稱加密問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • java實(shí)現(xiàn)兩個(gè)對(duì)象之間傳值及簡(jiǎn)單的封裝

    java實(shí)現(xiàn)兩個(gè)對(duì)象之間傳值及簡(jiǎn)單的封裝

    這篇文章主要介紹了java實(shí)現(xiàn)兩個(gè)對(duì)象之間傳值及簡(jiǎn)單的封裝,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Spring AOP切點(diǎn)表達(dá)式使用及說(shuō)明

    Spring AOP切點(diǎn)表達(dá)式使用及說(shuō)明

    這篇文章主要介紹了Spring AOP切點(diǎn)表達(dá)式使用及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • cmd中javac命令無(wú)法運(yùn)行(java指令能運(yùn)行)解決步驟

    cmd中javac命令無(wú)法運(yùn)行(java指令能運(yùn)行)解決步驟

    這篇文章主要介紹了在安裝JDK后,執(zhí)行javac命令沒有返回值的問題,可能是由于命令提示符窗口緩存問題、系統(tǒng)路徑優(yōu)先級(jí)問題、文件權(quán)限問題或命令行輸入問題,文中通過代碼將解決的步驟介紹的非常詳細(xì),需要的朋友可以參考下
    2025-02-02
  • SpringBoot優(yōu)雅捕捉異常的兩種方法小結(jié)

    SpringBoot優(yōu)雅捕捉異常的兩種方法小結(jié)

    SpringBoot框架對(duì)異常的處理提供了幾種很強(qiáng)大的方法,我們可以通過@ControllerAdvice和@ExceptionHandler注解實(shí)現(xiàn)全局異常的處理,下面就來(lái)介紹一下這兩種方法的實(shí)現(xiàn),感興趣的可以了解一下
    2024-08-08
  • java實(shí)現(xiàn)時(shí)鐘效果

    java實(shí)現(xiàn)時(shí)鐘效果

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)時(shí)鐘效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-03-03

最新評(píng)論