(starters)springboot-starter整合阿里云datahub方式
DataHub 類(lèi)似于傳統(tǒng)大數(shù)據(jù)解決方案中 Kafka 的角色,提供了一個(gè)數(shù)據(jù)隊(duì)列功能。
DataHub 除了供了一個(gè)緩沖的隊(duì)列作用。同時(shí)由于 DataHub 提供了各種與其他阿里云
上下游產(chǎn)品的對(duì)接功能,所以 DataHub 又扮演了一個(gè)數(shù)據(jù)的分發(fā)樞紐工作。

datahub提供了開(kāi)發(fā)者生產(chǎn)和消費(fèi)的sdk,在平時(shí)的開(kāi)發(fā)中往往會(huì)寫(xiě)很多重復(fù)的代碼,我們可以利用springboot為我們提供的自定義starter的方式,模仿springboot官方的starter組件實(shí)現(xiàn)方式,來(lái)封裝一個(gè)更高效簡(jiǎn)單易用的starter組件,實(shí)現(xiàn)開(kāi)箱即用。
本文僅提供核心思路實(shí)現(xiàn)供學(xué)習(xí)使用,應(yīng)根據(jù)自己所在公司開(kāi)發(fā)習(xí)慣做定制開(kāi)發(fā)
1. 功能介紹
1.無(wú)需關(guān)心DataHub底層如何操作,安心編寫(xiě)業(yè)務(wù)代碼即可進(jìn)行數(shù)據(jù)的獲取和上傳,
2.類(lèi)似RabbitMQ的starter,通過(guò)注解方式,Listener和Handler方式進(jìn)行隊(duì)列消費(fèi)
3.支持游標(biāo)的上次記憶功能
<dependency>
<artifactId>cry-starters-projects</artifactId>
<groupId>cn.com.cry.starters</groupId>
<version>2022-1.0.0</version>
</dependency>2.快速開(kāi)始
2.1 啟動(dòng)客戶(hù)端
配置阿里云DataHub的endpoint以及AK信息
aliyun:
datahub:
# 開(kāi)啟功能
havingValue: true
#是否為私有云
isPrivate: false
accessId: xxx
accessKey: xxx
endpoint: xxx
#連接DataHub客戶(hù)端超時(shí)時(shí)間
conn-timeout: 10000啟動(dòng)SpringBoot,你會(huì)發(fā)現(xiàn)datahub客戶(hù)端已經(jīng)啟動(dòng)完畢
2.2 獲取DataHub客戶(hù)端
DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
2.3 寫(xiě)數(shù)據(jù)
public int write(@RequestParam("id") Integer shardId) {
List<Student> datas = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Student s = new Student();
s.setAge(i);
s.setName("name-" + i);
s.setAddress("address-" + i);
datas.add(s);
}
int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
return successNumbers;
}以上示例代碼表示往 projectName為my_test, topicName為student, shardId 為N的hub里寫(xiě)數(shù)據(jù),并且返回插入成功的條數(shù)
2.4 讀數(shù)據(jù)
讀數(shù)據(jù)開(kāi)發(fā)的邏輯類(lèi)似RabbitMq的starter,使用@DataHubListener和@DataHubHandler處理器注解進(jìn)行使用
@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl { @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
public void handler(Message message) {
System.out.println("讀取到shardId=0的消息");
System.out.println(message.getData());
System.out.println(message.getCreateTsime());
System.out.println(message.getSize());
System.out.println(message.getConfig());
System.out.println(message.getMessageId());
}
}以上代碼說(shuō)明: 通過(guò)LATEST游標(biāo)的方式,監(jiān)聽(tīng) project=my_test ,topicName=student,shardId=0 ,最終通過(guò)Message的包裝類(lèi)拿到dataHub實(shí)時(shí)寫(xiě)入的數(shù)據(jù)。
這邊可以設(shè)置多種游標(biāo)類(lèi)型,例如根據(jù)最新的系統(tǒng)時(shí)間、最早錄入的序號(hào)等
3. 核心代碼
首先需要一個(gè)DataHubClient增強(qiáng)類(lèi),在SpringBoot啟動(dòng)時(shí)開(kāi)啟一個(gè)線(xiàn)程來(lái)監(jiān)聽(tīng)對(duì)應(yīng)的project-topic-shardingId,根據(jù)游標(biāo)規(guī)則來(lái)讀取當(dāng)前的cursor進(jìn)行數(shù)據(jù)的讀取。
public class DataHubClientWrapper implements InitializingBean, DisposableBean { @Autowired
private AliyunAccountProperties properties; @Autowired
private ApplicationContext context; private DatahubClient datahubClient;
public DataHubClientWrapper() { } /**
* 執(zhí)行銷(xiāo)毀方法
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
WorkerResourceExecutor.shutdown();
} @Override
public void afterPropertiesSet() throws Exception { /**
* 創(chuàng)建DataHubClient
*/
this.datahubClient = DataHubClientFactory.create(properties); /**
* 打印Banner
*/
BannerUtil.printBanner(); /**
* 賦值Template的靜態(tài)對(duì)象dataHubClient
*/
DataHubTemplate.setDataHubClient(datahubClient); /**
* 初始化Worker線(xiàn)程
*/
WorkerResourceExecutor.initWorkerResource(context);
/**
* 啟動(dòng)Worker線(xiàn)程
*/
WorkerResourceExecutor.start();
}
}
寫(xiě)數(shù)據(jù),構(gòu)建了一個(gè)類(lèi)似RedisDataTemplate的模板類(lèi),封裝了write的邏輯,調(diào)用時(shí)只需要用DataHubTemplate.write調(diào)用
public class DataHubTemplate { private static DatahubClient dataHubClient; private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class); /**
* 默認(rèn)不開(kāi)啟重試機(jī)制
*
* @param projectName
* @param topicName
* @param datas
* @param shardId
* @return
*/
public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
return write(projectName, topicName, datas, shardId, false);
} /**
* 往指定的projectName以及topic和shard下面寫(xiě)數(shù)據(jù)
*
* @param projectName
* @param topicName
* @param datas
* @param shardId
* @param retry
* @return
*/
private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
List<RecordEntry> recordEntries = new ArrayList<>();
for (Object o : datas) {
RecordEntry entry = new RecordEntry();
Map<String, Object> data = BeanUtil.beanToMap(o);
TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
for (String key : data.keySet()) {
tupleRecordData.setField(key, data.get(key));
}
entry.setRecordData(tupleRecordData);
entry.setShardId(String.valueOf(shardId));
recordEntries.add(entry);
}
PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
int failedRecordCount = result.getFailedRecordCount();
if (failedRecordCount > 0 && retry) {
retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
}
return datas.size() - failedRecordCount;
} /**
* @param client
* @param records
* @param retryTimes
* @param project
* @param topic
*/
private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
List<RecordEntry> failedRecords = records;
while (retryTimes != 0) {
logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
retryTimes = retryTimes - 1;
PutRecordsResult result = client.putRecords(project, topic, failedRecords);
int failedNum = result.getFailedRecordCount();
if (failedNum > 0) {
failedRecords = result.getFailedRecords();
continue;
}
suc = true;
break;
}
if (!suc) {
logger.error("DataHub send message retry failure");
}
} public static DatahubClient getDataHubClient() {
return dataHubClient;
} public static void setDataHubClient(DatahubClient dataHubClient) {
DataHubTemplate.dataHubClient = dataHubClient;
}
}
讀數(shù)據(jù),需要在Spring啟動(dòng)時(shí)開(kāi)啟一個(gè)監(jiān)聽(tīng)線(xiàn)程DataListenerWorkerThread,執(zhí)行一個(gè)死循環(huán)不停輪詢(xún)DataHub下的對(duì)應(yīng)通道。
public class DataListenerWorkerThread extends Thread {
private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
private volatile boolean init = false;
private DatahubConfig config;
private String workerKey;
private int recordLimits;
private int sleep;
private RecordSchema recordSchema;
private RecordHandler recordHandler;
private CursorHandler cursorHandler; public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
this.config = new DatahubConfig(projectName, topicName, shardId);
this.workerKey = projectName + "-" + topicName + "-" + shardId;
this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
this.recordLimits = recordLimits;
this.sleep = sleep;
this.setName("DataHub-Worker");
this.setDaemon(true);
} @Override
public void run() {
initRecordSchema();
String cursor = cursorHandler.positioningCursor(config);
for (; ; ) {
try {
GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
if (result.getRecordCount() <= 0) {
// 無(wú)數(shù)據(jù),sleep后讀取
Thread.sleep(sleep);
continue;
}
List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
// 拿到下一個(gè)游標(biāo)
cursor = cursorHandler.nextCursor(result);
//執(zhí)行方法
WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
} catch (InvalidParameterException ex) {
//非法游標(biāo)或游標(biāo)已過(guò)期,建議重新定位后開(kāi)始消費(fèi)
cursor = cursorHandler.resetCursor(config);
logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
} catch (DatahubClientException e) {
logger.error("DataHubException:{}", e.getErrorMessage());
this.interrupt();
} catch (InterruptedException e) {
logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
} catch (Exception e) {
this.interrupt();
logger.error("receive DataHub records cry.exception:{}", e, e);
}
}
} /**
* 終止
*/
public void shutdown() {
if (!interrupted()) {
interrupt();
}
} /**
* 初始化topic字段以及recordSchema
*/
private void initRecordSchema() {
try {
if (!init) {
recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
List<Field> fields = recordSchema.getFields();
this.recordHandler = new RecordHandler(fields);
init = true;
}
} catch (Exception e) {
logger.error("initRecordSchema error:{}", e, e);
}
}
}
read的時(shí)候結(jié)合了注解開(kāi)發(fā),通過(guò)定義類(lèi)注解DataHubListener和方法注解DataHubHandler內(nèi)置屬性,來(lái)動(dòng)態(tài)的控制需要在哪些方法中處理監(jiān)聽(tīng)到的數(shù)據(jù)的邏輯:
DataHubHandler
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {
/**
* 話(huà)題名稱(chēng)
*
* @return
*/
String topicName(); /**
* shardId
*
* @return
*/
int shardId(); /**
* 最大數(shù)據(jù)量限制
*
* @return
*/
int recordLimit() default 1000; /**
* 游標(biāo)類(lèi)型
*
* @return
*/
CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST; /**
* 若未監(jiān)聽(tīng)到數(shù)據(jù)添加,休眠時(shí)間 ms
*
* @return
*/
int sleep() default 10000; /**
* 使用CursorType.SYSTEM_TIME的時(shí)候配置 時(shí)間偏移量
*
* @return
*/
String startTime() default ""; /**
* 使用使用CursorType.SEQUENCE的時(shí)候配置,偏移量,必須是正整數(shù)
*
* @return
*/
int sequenceOffset() default 0;
}
DataHubListener
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
String projectName();
}最后我們需要啟動(dòng)SpringBootStarter的EnableConfigurationProperties 功能,通過(guò)配置文件來(lái)控制default-bean的開(kāi)啟或者關(guān)閉。
啟動(dòng)類(lèi):
@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
/**
* 初始化dataHub裝飾bean
*
* @return
*/
@Bean
public DataHubClientWrapper dataHubWrapper() {
return new DataHubClientWrapper();
}
}屬性配置類(lèi)
@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{ /**
* http://xxx.aliyuncs.com
*/
private String endpoint; /**
* account
*/
private String accessId; /**
* password
*/
private String accessKey; /**
* private cloud || public cloud
*/
private boolean isPrivate; /**
* unit: ms
*/
private Integer connTimeout = 10000;
}最后記得要做成一個(gè)starter,在resources下新建一個(gè)META-INF文件夾,新建一個(gè)spring.factories文件,
org.springframework.boot.autoconfigure.EnableAutoConfiguration= \ ? cry.starter.datahub.DataHubClientAutoConfiguration
大體邏輯就是這樣了,你學(xué)會(huì)了嗎? hhhhhhhhh~
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot實(shí)現(xiàn)OneDrive文件上傳的詳細(xì)步驟
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)OneDrive文件上傳的詳細(xì)步驟,文中通過(guò)代碼示例和圖文講解的非常詳細(xì),對(duì)大家實(shí)現(xiàn)OneDrive文件上傳有一定的幫助,需要的朋友可以參考下2024-02-02
Eclipse中使用Maven創(chuàng)建Java Web工程的實(shí)現(xiàn)方式
這篇文章主要介紹了Eclipse中使用Maven創(chuàng)建Java Web工程的實(shí)現(xiàn)方式的相關(guān)資料,希望通過(guò)本文能幫助到大家,讓大家實(shí)現(xiàn)這樣的方式,需要的朋友可以參考下2017-10-10
java.util.ArrayDeque類(lèi)使用方法詳解
這篇文章主要介紹了java.util.ArrayDeque類(lèi)使用方法,java.util.ArrayDeque類(lèi)提供了可調(diào)整大小的陣列,并實(shí)現(xiàn)了Deque接口,感興趣的小伙伴們可以參考一下2016-03-03
Spring cloud Gateway簡(jiǎn)介及相關(guān)配置方法
這篇文章主要介紹了Spring cloud Gateway簡(jiǎn)介及相關(guān)配置方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04
Spring Security學(xué)習(xí)筆記(一)
這篇文章主要介紹了Spring Security的相關(guān)資料,幫助大家開(kāi)始學(xué)習(xí)Spring Security框架,感興趣的朋友可以了解下2020-09-09
使用Java對(duì)Hbase操作總結(jié)及示例代碼
這篇文章主要介紹了使用Java對(duì)Hbase進(jìn)行操作總結(jié),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07
springboot讀取application.yml報(bào)錯(cuò)問(wèn)題及解決
這篇文章主要介紹了springboot讀取application.yml報(bào)錯(cuò)問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06

