PowerJob Alarmable工作流程源碼剖析
序
本文主要研究一下PowerJob的Alarmable
Alarmable
tech/powerjob/server/extension/Alarmable.java
public interface Alarmable { void onFailed(Alarm alarm, List<UserInfoDO> targetUserList); }
Alarmable接口定義了onFailed方法,其入?yún)閍larm及targetUserList
Alarm
public interface Alarm extends PowerSerializable { String fetchTitle(); default String fetchContent() { StringBuilder sb = new StringBuilder(); JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this)); content.forEach((key, originWord) -> { sb.append(key).append(": "); String word = String.valueOf(originWord); if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) { try { if (originWord instanceof Long) { word = CommonUtils.formatTime((Long) originWord); } }catch (Exception ignore) { } } sb.append(word).append(OmsConstant.LINE_SEPARATOR); }); return sb.toString(); } }
Alarm定義了fetchTitle方法,提供了fetchContent默認(rèn)方法,它有兩個(gè)實(shí)現(xiàn)類(lèi)分別是JobInstanceAlarm、WorkflowInstanceAlarm
DingTalkAlarmService
tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java
@Slf4j @Service @RequiredArgsConstructor public class DingTalkAlarmService implements Alarmable { private final Environment environment; private Long agentId; private DingTalkUtils dingTalkUtils; private Cache<String, String> mobile2UserIdCache; private static final int CACHE_SIZE = 8192; /** * 防止緩存擊穿 */ private static final String EMPTY_TAG = "EMPTY"; @Override public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { if (dingTalkUtils == null) { return; } Set<String> userIds = Sets.newHashSet(); targetUserList.forEach(user -> { String phone = user.getPhone(); if (StringUtils.isEmpty(phone)) { return; } try { String userId = mobile2UserIdCache.get(phone, () -> { try { return dingTalkUtils.fetchUserIdByMobile(phone); } catch (PowerJobException ignore) { return EMPTY_TAG; } catch (Exception ignore) { return null; } }); if (!EMPTY_TAG.equals(userId)) { userIds .add(userId); } }catch (Exception ignore) { } }); userIds.remove(null); if (!userIds.isEmpty()) { String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds); List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList(); markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost())); String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA); markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content)); try { dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId); }catch (Exception e) { log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage()); } } } @PostConstruct public void init() { String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID); String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY); String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET); log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId); if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) { log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable"); return; } if (!StringUtils.isNumeric(agentId)) { log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId); return; } this.agentId = Long.valueOf(agentId); dingTalkUtils = new DingTalkUtils(appKey, appSecret); mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build(); log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!"); } }
DingTalkAlarmService實(shí)現(xiàn)了Alarmable接口,其onFailed遍歷targetUserList獲取userId,最后通過(guò)dingTalkUtils.sendMarkdownAsync發(fā)送
MailAlarmService
tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java
@Slf4j @Service public class MailAlarmService implements Alarmable { @Resource private Environment environment; private JavaMailSender javaMailSender; @Value("${spring.mail.username:''}") private String from; @Override public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) { return; } SimpleMailMessage sm = new SimpleMailMessage(); try { sm.setFrom(from); sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new)); sm.setSubject(alarm.fetchTitle()); sm.setText(alarm.fetchContent()); javaMailSender.send(sm); }catch (Exception e) { log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage()); } } @Autowired(required = false) public void setJavaMailSender(JavaMailSender javaMailSender) { this.javaMailSender = javaMailSender; } }
MailAlarmService實(shí)現(xiàn)了Alarmable接口,其onFailed方法構(gòu)建SimpleMailMessage,然后通過(guò)spring的javaMailSender.send發(fā)送
WebHookAlarmService
tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java
@Slf4j @Service public class WebHookAlarmService implements Alarmable { private static final String HTTP_PROTOCOL_PREFIX = "http://"; private static final String HTTPS_PROTOCOL_PREFIX = "https://"; @Override public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { if (CollectionUtils.isEmpty(targetUserList)) { return; } targetUserList.forEach(user -> { String webHook = user.getWebHook(); if (StringUtils.isEmpty(webHook)) { return; } // 自動(dòng)添加協(xié)議頭 if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) { webHook = HTTP_PROTOCOL_PREFIX + webHook; } MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE); RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm)); try { String response = HttpUtils.post(webHook, requestBody); log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response); }catch (Exception e) { log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e); } }); } }
WebHookAlarmService實(shí)現(xiàn)了Alarmable接口,其onFailed方法遍歷targetUserList,挨個(gè)執(zhí)行HttpUtils.post(webHook, requestBody),用的是okhttp3來(lái)實(shí)現(xiàn)http請(qǐng)求回調(diào)
小結(jié)
PowerJob的Alarmable接口定義了onFailed方法,其入?yún)閍larm及targetUserList;它有三個(gè)實(shí)現(xiàn)類(lèi),分別是DingTalkAlarmService(用的是DingTalkClient
)、MailAlarmService(用的是spring的JavaMailSender
)、WebHookAlarmService(用的是okhttp3的OkHttpClient
)。
以上就是PowerJob Alarmable工作流程源碼剖析的詳細(xì)內(nèi)容,更多關(guān)于PowerJob Alarmable的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實(shí)現(xiàn)多設(shè)備同時(shí)登錄或強(qiáng)制下線
本文主要介紹了java實(shí)現(xiàn)多設(shè)備同時(shí)登錄或強(qiáng)制下線,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07springboot上傳zip包并解壓至服務(wù)器nginx目錄方式
這篇文章主要介紹了springboot上傳zip包并解壓至服務(wù)器nginx目錄方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04java實(shí)現(xiàn)KFC點(diǎn)餐系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)KFC點(diǎn)餐系統(tǒng),模擬肯德基快餐店的收銀系統(tǒng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01解決創(chuàng)建springboot后啟動(dòng)報(bào)錯(cuò):Failed?to?bind?properties?under‘spri
在Spring?Boot項(xiàng)目中,application.properties和application.yml是用于配置參數(shù)的兩種文件格式,properties格式簡(jiǎn)潔但不支持層次結(jié)構(gòu),而yml格式支持層次性,可讀性更好,在yml文件中,要注意細(xì)節(jié),比如冒號(hào)后面需要空格2024-10-10Java使用正則表達(dá)式判斷獨(dú)立字符的存在(代碼示例)
通過(guò)使用正則表達(dá)式,我們可以更加靈活地判斷字符串中是否包含特定的字符,并且可以控制匹配的條件,如獨(dú)立的字符,這為我們處理字符串提供了更多的選擇和功能,這篇文章主要介紹了Java使用正則表達(dá)式判斷獨(dú)立字符的存在,需要的朋友可以參考下2023-10-10輕松掌握J(rèn)ava注解,讓編程更智能、更優(yōu)雅
輕松掌握J(rèn)ava注解?沒(méi)問(wèn)題!想要讓你的Java代碼更具可讀性、維護(hù)性,同時(shí)提升開(kāi)發(fā)效率?本指南將帶你快速入門(mén)Java注解的世界,只需短短幾分鐘,你就能揭秘這個(gè)強(qiáng)大的編程工具,讓編寫(xiě)有聲明性邏輯的代碼變得輕而易舉,趕快一起來(lái)探索吧!2024-01-01JDBC連接MySql數(shù)據(jù)庫(kù)步驟 以及查詢、插入、刪除、更新等
這篇文章主要介紹了JDBC連接MySql數(shù)據(jù)庫(kù)步驟,以及查詢、插入、刪除、更新等十一個(gè)處理數(shù)據(jù)庫(kù)信息的功能,需要的朋友可以參考下2018-05-05