(starters)springboot-starter整合阿里云datahub方式
DataHub 類似于傳統(tǒng)大數據解決方案中 Kafka 的角色,提供了一個數據隊列功能。
DataHub 除了供了一個緩沖的隊列作用。同時由于 DataHub 提供了各種與其他阿里云
上下游產品的對接功能,所以 DataHub 又扮演了一個數據的分發(fā)樞紐工作。
datahub提供了開發(fā)者生產和消費的sdk,在平時的開發(fā)中往往會寫很多重復的代碼,我們可以利用springboot為我們提供的自定義starter的方式,模仿springboot官方的starter組件實現方式,來封裝一個更高效簡單易用的starter組件,實現開箱即用。
本文僅提供核心思路實現供學習使用,應根據自己所在公司開發(fā)習慣做定制開發(fā)
1. 功能介紹
1.無需關心DataHub底層如何操作,安心編寫業(yè)務代碼即可進行數據的獲取和上傳,
2.類似RabbitMQ的starter,通過注解方式,Listener和Handler方式進行隊列消費
3.支持游標的上次記憶功能
<dependency> <artifactId>cry-starters-projects</artifactId> <groupId>cn.com.cry.starters</groupId> <version>2022-1.0.0</version> </dependency>
2.快速開始
2.1 啟動客戶端
配置阿里云DataHub的endpoint以及AK信息
aliyun: datahub: # 開啟功能 havingValue: true #是否為私有云 isPrivate: false accessId: xxx accessKey: xxx endpoint: xxx #連接DataHub客戶端超時時間 conn-timeout: 10000
啟動SpringBoot,你會發(fā)現datahub客戶端已經啟動完畢
2.2 獲取DataHub客戶端
DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
2.3 寫數據
public int write(@RequestParam("id") Integer shardId) { List<Student> datas = new ArrayList<>(); for (int i = 0; i < 10; i++) { Student s = new Student(); s.setAge(i); s.setName("name-" + i); s.setAddress("address-" + i); datas.add(s); } int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId); return successNumbers; }
以上示例代碼表示往 projectName為my_test, topicName為student, shardId 為N的hub里寫數據,并且返回插入成功的條數
2.4 讀數據
讀數據開發(fā)的邏輯類似RabbitMq的starter,使用@DataHubListener和@DataHubHandler處理器注解進行使用
@Component @DataHubListener(projectName = "my_test") public class ReadServiceImpl { @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST) public void handler(Message message) { System.out.println("讀取到shardId=0的消息"); System.out.println(message.getData()); System.out.println(message.getCreateTsime()); System.out.println(message.getSize()); System.out.println(message.getConfig()); System.out.println(message.getMessageId()); } }
以上代碼說明: 通過LATEST游標的方式,監(jiān)聽 project=my_test ,topicName=student,shardId=0 ,最終通過Message的包裝類拿到dataHub實時寫入的數據。
這邊可以設置多種游標類型,例如根據最新的系統(tǒng)時間、最早錄入的序號等
3. 核心代碼
首先需要一個DataHubClient增強類,在SpringBoot啟動時開啟一個線程來監(jiān)聽對應的project-topic-shardingId,根據游標規(guī)則來讀取當前的cursor進行數據的讀取。
public class DataHubClientWrapper implements InitializingBean, DisposableBean { @Autowired private AliyunAccountProperties properties; @Autowired private ApplicationContext context; private DatahubClient datahubClient; public DataHubClientWrapper() { } /** * 執(zhí)行銷毀方法 * * @throws Exception */ @Override public void destroy() throws Exception { WorkerResourceExecutor.shutdown(); } @Override public void afterPropertiesSet() throws Exception { /** * 創(chuàng)建DataHubClient */ this.datahubClient = DataHubClientFactory.create(properties); /** * 打印Banner */ BannerUtil.printBanner(); /** * 賦值Template的靜態(tài)對象dataHubClient */ DataHubTemplate.setDataHubClient(datahubClient); /** * 初始化Worker線程 */ WorkerResourceExecutor.initWorkerResource(context); /** * 啟動Worker線程 */ WorkerResourceExecutor.start(); } }
寫數據,構建了一個類似RedisDataTemplate的模板類,封裝了write的邏輯,調用時只需要用DataHubTemplate.write調用
public class DataHubTemplate { private static DatahubClient dataHubClient; private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class); /** * 默認不開啟重試機制 * * @param projectName * @param topicName * @param datas * @param shardId * @return */ public static int write(String projectName, String topicName, List<?> datas, Integer shardId) { return write(projectName, topicName, datas, shardId, false); } /** * 往指定的projectName以及topic和shard下面寫數據 * * @param projectName * @param topicName * @param datas * @param shardId * @param retry * @return */ private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) { RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema(); List<RecordEntry> recordEntries = new ArrayList<>(); for (Object o : datas) { RecordEntry entry = new RecordEntry(); Map<String, Object> data = BeanUtil.beanToMap(o); TupleRecordData tupleRecordData = new TupleRecordData(recordSchema); for (String key : data.keySet()) { tupleRecordData.setField(key, data.get(key)); } entry.setRecordData(tupleRecordData); entry.setShardId(String.valueOf(shardId)); recordEntries.add(entry); } PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries); int failedRecordCount = result.getFailedRecordCount(); if (failedRecordCount > 0 && retry) { retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName); } return datas.size() - failedRecordCount; } /** * @param client * @param records * @param retryTimes * @param project * @param topic */ private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) { boolean suc = false; List<RecordEntry> failedRecords = records; while (retryTimes != 0) { logger.info("the time to send message has [{}] records failed, is starting retry", records.size()); retryTimes = retryTimes - 1; PutRecordsResult result = client.putRecords(project, topic, failedRecords); int failedNum = result.getFailedRecordCount(); if (failedNum > 0) { failedRecords = result.getFailedRecords(); continue; } suc = true; break; } if (!suc) { logger.error("DataHub send message retry failure"); } } public static DatahubClient getDataHubClient() { return dataHubClient; } public static void setDataHubClient(DatahubClient dataHubClient) { DataHubTemplate.dataHubClient = dataHubClient; } }
讀數據,需要在Spring啟動時開啟一個監(jiān)聽線程DataListenerWorkerThread,執(zhí)行一個死循環(huán)不停輪詢DataHub下的對應通道。
public class DataListenerWorkerThread extends Thread { private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class); private volatile boolean init = false; private DatahubConfig config; private String workerKey; private int recordLimits; private int sleep; private RecordSchema recordSchema; private RecordHandler recordHandler; private CursorHandler cursorHandler; public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) { this.config = new DatahubConfig(projectName, topicName, shardId); this.workerKey = projectName + "-" + topicName + "-" + shardId; this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey); this.recordLimits = recordLimits; this.sleep = sleep; this.setName("DataHub-Worker"); this.setDaemon(true); } @Override public void run() { initRecordSchema(); String cursor = cursorHandler.positioningCursor(config); for (; ; ) { try { GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits); if (result.getRecordCount() <= 0) { // 無數據,sleep后讀取 Thread.sleep(sleep); continue; } List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords()); logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId()); // 拿到下一個游標 cursor = cursorHandler.nextCursor(result); //執(zhí)行方法 WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor); } catch (InvalidParameterException ex) { //非法游標或游標已過期,建議重新定位后開始消費 cursor = cursorHandler.resetCursor(config); logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage()); } catch (DatahubClientException e) { logger.error("DataHubException:{}", e.getErrorMessage()); this.interrupt(); } catch (InterruptedException e) { logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId()); } catch (Exception e) { this.interrupt(); logger.error("receive DataHub records cry.exception:{}", e, e); } } } /** * 終止 */ public void shutdown() { if (!interrupted()) { interrupt(); } } /** * 初始化topic字段以及recordSchema */ private void initRecordSchema() { try { if (!init) { recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema(); List<Field> fields = recordSchema.getFields(); this.recordHandler = new RecordHandler(fields); init = true; } } catch (Exception e) { logger.error("initRecordSchema error:{}", e, e); } } }
read的時候結合了注解開發(fā),通過定義類注解DataHubListener和方法注解DataHubHandler內置屬性,來動態(tài)的控制需要在哪些方法中處理監(jiān)聽到的數據的邏輯:
DataHubHandler
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DataHubHandler { /** * 話題名稱 * * @return */ String topicName(); /** * shardId * * @return */ int shardId(); /** * 最大數據量限制 * * @return */ int recordLimit() default 1000; /** * 游標類型 * * @return */ CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST; /** * 若未監(jiān)聽到數據添加,休眠時間 ms * * @return */ int sleep() default 10000; /** * 使用CursorType.SYSTEM_TIME的時候配置 時間偏移量 * * @return */ String startTime() default ""; /** * 使用使用CursorType.SEQUENCE的時候配置,偏移量,必須是正整數 * * @return */ int sequenceOffset() default 0; }
DataHubListener
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DataHubListener { String projectName(); }
最后我們需要啟動SpringBootStarter的EnableConfigurationProperties 功能,通過配置文件來控制default-bean的開啟或者關閉。
啟動類:
@Configuration @EnableConfigurationProperties(value = {AliyunAccountProperties.class}) public class DataHubClientAutoConfiguration { /** * 初始化dataHub裝飾bean * * @return */ @Bean public DataHubClientWrapper dataHubWrapper() { return new DataHubClientWrapper(); } }
屬性配置類
@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true") @Data public class AliyunAccountProperties implements Properties{ /** * http://xxx.aliyuncs.com */ private String endpoint; /** * account */ private String accessId; /** * password */ private String accessKey; /** * private cloud || public cloud */ private boolean isPrivate; /** * unit: ms */ private Integer connTimeout = 10000; }
最后記得要做成一個starter,在resources下新建一個META-INF文件夾,新建一個spring.factories文件,
org.springframework.boot.autoconfigure.EnableAutoConfiguration= \ ? cry.starter.datahub.DataHubClientAutoConfiguration
大體邏輯就是這樣了,你學會了嗎? hhhhhhhhh~
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Eclipse中使用Maven創(chuàng)建Java Web工程的實現方式
這篇文章主要介紹了Eclipse中使用Maven創(chuàng)建Java Web工程的實現方式的相關資料,希望通過本文能幫助到大家,讓大家實現這樣的方式,需要的朋友可以參考下2017-10-10springboot讀取application.yml報錯問題及解決
這篇文章主要介紹了springboot讀取application.yml報錯問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06