欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

(starters)springboot-starter整合阿里云datahub方式

 更新時間:2022年11月18日 10:48:22   作者:Cry丶  
這篇文章主要介紹了(starters)springboot-starter整合阿里云datahub方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

DataHub 類似于傳統(tǒng)大數據解決方案中 Kafka 的角色,提供了一個數據隊列功能。

DataHub 除了供了一個緩沖的隊列作用。同時由于 DataHub 提供了各種與其他阿里云

上下游產品的對接功能,所以 DataHub 又扮演了一個數據的分發(fā)樞紐工作。

datahub提供了開發(fā)者生產和消費的sdk,在平時的開發(fā)中往往會寫很多重復的代碼,我們可以利用springboot為我們提供的自定義starter的方式,模仿springboot官方的starter組件實現方式,來封裝一個更高效簡單易用的starter組件,實現開箱即用。

本文僅提供核心思路實現供學習使用,應根據自己所在公司開發(fā)習慣做定制開發(fā)

1. 功能介紹

1.無需關心DataHub底層如何操作,安心編寫業(yè)務代碼即可進行數據的獲取和上傳,

2.類似RabbitMQ的starter,通過注解方式,Listener和Handler方式進行隊列消費

3.支持游標的上次記憶功能

<dependency>
      <artifactId>cry-starters-projects</artifactId>
      <groupId>cn.com.cry.starters</groupId>
      <version>2022-1.0.0</version>
</dependency>

2.快速開始

2.1 啟動客戶端

配置阿里云DataHub的endpoint以及AK信息

aliyun:
  datahub:
  	# 開啟功能
  	havingValue: true
    #是否為私有云
    isPrivate: false
    accessId: xxx
    accessKey: xxx
    endpoint: xxx
    #連接DataHub客戶端超時時間
    conn-timeout: 10000

啟動SpringBoot,你會發(fā)現datahub客戶端已經啟動完畢

2.2 獲取DataHub客戶端

DatahubClient datahubClient=DataHubTemplate.getDataHubClient();

2.3 寫數據

public int write(@RequestParam("id") Integer shardId) {
    List<Student> datas = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Student s = new Student();
        s.setAge(i);
        s.setName("name-" + i);
        s.setAddress("address-" + i);
        datas.add(s);
    }
    int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
    return successNumbers;
}

以上示例代碼表示往 projectName為my_test, topicName為student, shardId 為N的hub里寫數據,并且返回插入成功的條數

2.4 讀數據

讀數據開發(fā)的邏輯類似RabbitMq的starter,使用@DataHubListener和@DataHubHandler處理器注解進行使用

@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {    @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
    public void handler(Message message) {
        System.out.println("讀取到shardId=0的消息");
        System.out.println(message.getData());
        System.out.println(message.getCreateTsime());
        System.out.println(message.getSize());
        System.out.println(message.getConfig());
        System.out.println(message.getMessageId());
    }
}

以上代碼說明: 通過LATEST游標的方式,監(jiān)聽 project=my_test ,topicName=student,shardId=0 ,最終通過Message的包裝類拿到dataHub實時寫入的數據。

這邊可以設置多種游標類型,例如根據最新的系統(tǒng)時間、最早錄入的序號等

3. 核心代碼

首先需要一個DataHubClient增強類,在SpringBoot啟動時開啟一個線程來監(jiān)聽對應的project-topic-shardingId,根據游標規(guī)則來讀取當前的cursor進行數據的讀取。

public class DataHubClientWrapper implements InitializingBean, DisposableBean {    @Autowired
    private AliyunAccountProperties properties;    @Autowired
    private ApplicationContext context;    private DatahubClient datahubClient;
    public DataHubClientWrapper() {    }    /**
     * 執(zhí)行銷毀方法
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        WorkerResourceExecutor.shutdown();
    }    @Override
    public void afterPropertiesSet() throws Exception {        /**
         * 創(chuàng)建DataHubClient
         */
        this.datahubClient = DataHubClientFactory.create(properties);        /**
         * 打印Banner
         */
        BannerUtil.printBanner();        /**
         * 賦值Template的靜態(tài)對象dataHubClient
         */
        DataHubTemplate.setDataHubClient(datahubClient);        /**
         * 初始化Worker線程
         */
        WorkerResourceExecutor.initWorkerResource(context);
        /**
         * 啟動Worker線程
         */
        WorkerResourceExecutor.start();
    }
}

寫數據,構建了一個類似RedisDataTemplate的模板類,封裝了write的邏輯,調用時只需要用DataHubTemplate.write調用

public class DataHubTemplate {    private static DatahubClient dataHubClient;    private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);    /**
     * 默認不開啟重試機制
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @return
     */
    public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
        return write(projectName, topicName, datas, shardId, false);
    }    /**
     * 往指定的projectName以及topic和shard下面寫數據
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @param retry
     * @return
     */
    private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
        RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (Object o : datas) {
            RecordEntry entry = new RecordEntry();
            Map<String, Object> data = BeanUtil.beanToMap(o);
            TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
            for (String key : data.keySet()) {
                tupleRecordData.setField(key, data.get(key));
            }
            entry.setRecordData(tupleRecordData);
            entry.setShardId(String.valueOf(shardId));
            recordEntries.add(entry);
        }
        PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
        int failedRecordCount = result.getFailedRecordCount();
        if (failedRecordCount > 0 && retry) {
            retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
        }
        return datas.size() - failedRecordCount;
    }    /**
     * @param client
     * @param records
     * @param retryTimes
     * @param project
     * @param topic
     */
    private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        List<RecordEntry> failedRecords = records;
        while (retryTimes != 0) {
            logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
            retryTimes = retryTimes - 1;
            PutRecordsResult result = client.putRecords(project, topic, failedRecords);
            int failedNum = result.getFailedRecordCount();
            if (failedNum > 0) {
                failedRecords = result.getFailedRecords();
                continue;
            }
            suc = true;
            break;
        }
        if (!suc) {
            logger.error("DataHub send message retry failure");
        }
    }    public static DatahubClient getDataHubClient() {
        return dataHubClient;
    }    public static void setDataHubClient(DatahubClient dataHubClient) {
        DataHubTemplate.dataHubClient = dataHubClient;
    }
}

讀數據,需要在Spring啟動時開啟一個監(jiān)聽線程DataListenerWorkerThread,執(zhí)行一個死循環(huán)不停輪詢DataHub下的對應通道。

public class DataListenerWorkerThread extends Thread {
    private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
    private volatile boolean init = false;
    private DatahubConfig config;
    private String workerKey;
    private int recordLimits;
    private int sleep;
    private RecordSchema recordSchema;
    private RecordHandler recordHandler;
    private CursorHandler cursorHandler;    public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
        this.config = new DatahubConfig(projectName, topicName, shardId);
        this.workerKey = projectName + "-" + topicName + "-" + shardId;
        this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
        this.recordLimits = recordLimits;
        this.sleep = sleep;
        this.setName("DataHub-Worker");
        this.setDaemon(true);
    }    @Override
    public void run() {
        initRecordSchema();
        String cursor = cursorHandler.positioningCursor(config);
        for (; ; ) {
            try {
                GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
                if (result.getRecordCount() <= 0) {
                    // 無數據,sleep后讀取
                    Thread.sleep(sleep);
                    continue;
                }
                List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
                logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
                // 拿到下一個游標
                cursor = cursorHandler.nextCursor(result);
                //執(zhí)行方法
                WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
            } catch (InvalidParameterException ex) {
                //非法游標或游標已過期,建議重新定位后開始消費
                cursor = cursorHandler.resetCursor(config);
                logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
            } catch (DatahubClientException e) {
                logger.error("DataHubException:{}", e.getErrorMessage());
                this.interrupt();
            } catch (InterruptedException e) {
                logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
            } catch (Exception e) {
                this.interrupt();
                logger.error("receive DataHub records cry.exception:{}", e, e);
            }
        }
    }    /**
     * 終止
     */
    public void shutdown() {
        if (!interrupted()) {
            interrupt();
        }
    }    /**
     * 初始化topic字段以及recordSchema
     */
    private void initRecordSchema() {
        try {
            if (!init) {
                recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
                List<Field> fields = recordSchema.getFields();
                this.recordHandler = new RecordHandler(fields);
                init = true;
            }
        } catch (Exception e) {
            logger.error("initRecordSchema error:{}", e, e);
        }
    }
}

read的時候結合了注解開發(fā),通過定義類注解DataHubListener和方法注解DataHubHandler內置屬性,來動態(tài)的控制需要在哪些方法中處理監(jiān)聽到的數據的邏輯:

DataHubHandler

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {
    /**
     * 話題名稱
     *
     * @return
     */
    String topicName();    /**
     * shardId
     *
     * @return
     */
    int shardId();    /**
     * 最大數據量限制
     *
     * @return
     */
    int recordLimit() default 1000;    /**
     * 游標類型
     *
     * @return
     */
    CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;    /**
     * 若未監(jiān)聽到數據添加,休眠時間 ms
     *
     * @return
     */
    int sleep() default 10000;    /**
     * 使用CursorType.SYSTEM_TIME的時候配置 時間偏移量
     *
     * @return
     */
    String startTime() default "";    /**
     * 使用使用CursorType.SEQUENCE的時候配置,偏移量,必須是正整數
     *
     * @return
     */
    int sequenceOffset() default 0;
}

DataHubListener

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
    String projectName();
}

最后我們需要啟動SpringBootStarter的EnableConfigurationProperties 功能,通過配置文件來控制default-bean的開啟或者關閉。

啟動類:

@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
    /**
     * 初始化dataHub裝飾bean
     *
     * @return
     */
    @Bean
    public DataHubClientWrapper dataHubWrapper() {
        return new DataHubClientWrapper();
    }
}

屬性配置類

@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{    /**
     * http://xxx.aliyuncs.com
     */
    private String endpoint;    /**
     * account
     */
    private String accessId;    /**
     * password
     */
    private String accessKey;    /**
     * private cloud || public cloud
     */
    private boolean isPrivate;    /**
     * unit: ms
     */
    private Integer connTimeout = 10000;
}

最后記得要做成一個starter,在resources下新建一個META-INF文件夾,新建一個spring.factories文件,

org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
? cry.starter.datahub.DataHubClientAutoConfiguration

大體邏輯就是這樣了,你學會了嗎? hhhhhhhhh~

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • SpringBoot實現OneDrive文件上傳的詳細步驟

    SpringBoot實現OneDrive文件上傳的詳細步驟

    這篇文章主要介紹了SpringBoot實現OneDrive文件上傳的詳細步驟,文中通過代碼示例和圖文講解的非常詳細,對大家實現OneDrive文件上傳有一定的幫助,需要的朋友可以參考下
    2024-02-02
  • Eclipse中使用Maven創(chuàng)建Java Web工程的實現方式

    Eclipse中使用Maven創(chuàng)建Java Web工程的實現方式

    這篇文章主要介紹了Eclipse中使用Maven創(chuàng)建Java Web工程的實現方式的相關資料,希望通過本文能幫助到大家,讓大家實現這樣的方式,需要的朋友可以參考下
    2017-10-10
  • java.util.ArrayDeque類使用方法詳解

    java.util.ArrayDeque類使用方法詳解

    這篇文章主要介紹了java.util.ArrayDeque類使用方法,java.util.ArrayDeque類提供了可調整大小的陣列,并實現了Deque接口,感興趣的小伙伴們可以參考一下
    2016-03-03
  • Springboot自定義全局異常問題

    Springboot自定義全局異常問題

    這篇文章主要介紹了Springboot自定義全局異常問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Java?Spring的兩種事務你知道嗎

    Java?Spring的兩種事務你知道嗎

    這篇文章主要為大家詳細介紹了Java?Spring的兩種事務,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • 一篇文章帶你深入了解Java異常

    一篇文章帶你深入了解Java異常

    本篇文章主要介紹了java異常處理機制及應用,異常處理機制是Java語言的一大特色。從異常處理的機制、異常處理的方法、異常處理的原則等方面介紹Java語言的異常處理技術,有興趣的可以了解一下
    2021-08-08
  • Spring cloud Gateway簡介及相關配置方法

    Spring cloud Gateway簡介及相關配置方法

    這篇文章主要介紹了Spring cloud Gateway簡介及相關配置方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-04-04
  • Spring Security學習筆記(一)

    Spring Security學習筆記(一)

    這篇文章主要介紹了Spring Security的相關資料,幫助大家開始學習Spring Security框架,感興趣的朋友可以了解下
    2020-09-09
  • 使用Java對Hbase操作總結及示例代碼

    使用Java對Hbase操作總結及示例代碼

    這篇文章主要介紹了使用Java對Hbase進行操作總結,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-07-07
  • springboot讀取application.yml報錯問題及解決

    springboot讀取application.yml報錯問題及解決

    這篇文章主要介紹了springboot讀取application.yml報錯問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06

最新評論