Java Flink與kafka實(shí)現(xiàn)實(shí)時(shí)告警功能過(guò)程
引出問(wèn)題
項(xiàng)目使用告警系統(tǒng)的邏輯是將實(shí)時(shí)數(shù)據(jù)保存到本地?cái)?shù)據(jù)庫(kù)再使用定時(shí)任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時(shí)實(shí)在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會(huì)存在時(shí)間差,按照保存數(shù)據(jù)定時(shí)5分鐘一次,定時(shí)任務(wù)5分鐘一次。最高會(huì)產(chǎn)生10分鐘的誤差,這種告警就沒(méi)什么意義了。
demo設(shè)計(jì)
為了簡(jiǎn)單的還原業(yè)務(wù)場(chǎng)景,做了簡(jiǎn)單的demo假設(shè)
實(shí)現(xiàn)一個(gè)對(duì)于學(xué)生成績(jī)?cè)u(píng)價(jià)的實(shí)時(shí)處理程序
數(shù)學(xué)成績(jī),基準(zhǔn)范圍是90-140,超出告警
物理成績(jī),基準(zhǔn)范圍是60-95,超出告警
環(huán)境搭建
使用windows環(huán)境演示
準(zhǔn)備工作
1、安裝jdk
2、安裝zookeeper
解壓壓縮包
zoo_sample.cfg將它重命名為zoo.cfg
修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data
配置環(huán)境變量
3、安裝kafka
解壓壓縮包
修改config/server.properties
log.dirs=D://tools//kafka_2.11-2.1.0//log
flink程序代碼
pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version> </dependency>
主程序
public class StreamAlertDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3); Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer); DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper()); resultStream.print().setParallelism(4); resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties)); env.execute(); } }
主程序,配置告警規(guī)則后期可以使用推送或者拉去方式獲取數(shù)據(jù)
public class RuleMap { private RuleMap(){} public final static Map<String,List<AlertRule>> initialRuleMap; private static List<AlertRule> ruleList = new ArrayList<>(); private static List<String> ruleStringList = new ArrayList<>(Arrays.asList( "{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}", "{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}", "{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}", "{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}")); static { for (String i : ruleStringList) { ruleList.add(JSON.parseObject(i, AlertRule.class)); } initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget)); } }
AlertFlatMapper,處理告警邏輯
public class AlertFlatMapper implements FlatMapFunction<String, String> { @Override public void flatMap(String inVal, Collector<String> out) throws Exception { Achievement user = JSON.parseObject(inVal, Achievement.class); Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap; List<AlertInfo> resList = new ArrayList<>(); List<AlertRule> mathRule = initialRuleMap.get("MathVal"); for (AlertRule rule : mathRule) { if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) { resList.add(new AlertInfo(user.getName(), rule.getDescInfo())); } } List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal"); for (AlertRule rule : physicsRule) { if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) { resList.add(new AlertInfo(user.getName(), rule.getDescInfo())); } } String result = JSON.toJSONString(resList); out.collect(result); } private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) { switch (type) { case 0: return actVal < targetVal; case 1: return actVal.equals(targetVal); case 2: return actVal > targetVal; default: return false; } } }
三個(gè)實(shí)體類(lèi)
@Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class Achievement implements Serializable { private static final long serialVersionUID = -1L; private String name; private Integer mathVal; private Integer physicsVal; } @Data @AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class AlertInfo implements Serializable { private static final long serialVersionUID = -1L; private String name; private String descInfo; } @Data @AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class AlertRule implements Serializable { private static final long serialVersionUID = -1L; private String target; //0小于 1等于 2大于 private Integer type; private Integer criticalVal; private String descInfo; }
項(xiàng)目演示
創(chuàng)建kafka生產(chǎn)者 test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
創(chuàng)建kafka消費(fèi)者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning
啟動(dòng)flink應(yīng)用
給topic test發(fā)送消息
{"name":"liu","MathVal":45,"PhysicsVal":76}
消費(fèi)topic demo
告警系統(tǒng)架構(gòu)
到此這篇關(guān)于Java Flink與kafka實(shí)現(xiàn)實(shí)時(shí)告警功能過(guò)程的文章就介紹到這了,更多相關(guān)Java Flink與kafka實(shí)時(shí)告警內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息
這篇文章主要給大家介紹了關(guān)于Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11SpringBoot整個(gè)啟動(dòng)過(guò)程的分析
今天小編就為大家分享一篇關(guān)于SpringBoot整個(gè)啟動(dòng)過(guò)程的分析,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03Java數(shù)據(jù)結(jié)構(gòu)之LinkedList的用法詳解
鏈表(Linked?list)是一種常見(jiàn)的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),是一種線性表。Java的LinkedList(鏈表)?類(lèi)似于?ArrayList,是一種常用的數(shù)據(jù)容器,本文就來(lái)簡(jiǎn)單講講它的使用吧2023-05-05Springboot?HTTP如何調(diào)用其他服務(wù)
這篇文章主要介紹了Springboot?HTTP如何調(diào)用其他服務(wù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01使用Java的Lucene搜索工具對(duì)檢索結(jié)果進(jìn)行分組和分頁(yè)
這篇文章主要介紹了使用Java的搜索工具Lucene對(duì)檢索結(jié)果進(jìn)行分組和分頁(yè)的方法,Luence是Java環(huán)境中的一個(gè)全文檢索引擎工具包,需要的朋友可以參考下2016-03-03SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼
@JsonView 是 Jackson 庫(kù)中的一個(gè)注解,它允許你定義哪些屬性應(yīng)該被序列化到 JSON 中,基于不同的“視圖”或“配置”,在本文中,通過(guò)了解@JsonView,你將能夠更好地掌握如何在Spring Boot應(yīng)用中定制JSON數(shù)據(jù)的輸出,需要的朋友可以參考下2024-05-05