欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java Flink與kafka實(shí)現(xiàn)實(shí)時(shí)告警功能過(guò)程

 更新時(shí)間:2023年01月18日 09:45:29   作者:欽拆大仁  
這篇文章主要介紹了Java Flink與kafka實(shí)現(xiàn)實(shí)時(shí)告警功能,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

引出問(wèn)題

項(xiàng)目使用告警系統(tǒng)的邏輯是將實(shí)時(shí)數(shù)據(jù)保存到本地?cái)?shù)據(jù)庫(kù)再使用定時(shí)任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時(shí)實(shí)在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會(huì)存在時(shí)間差,按照保存數(shù)據(jù)定時(shí)5分鐘一次,定時(shí)任務(wù)5分鐘一次。最高會(huì)產(chǎn)生10分鐘的誤差,這種告警就沒(méi)什么意義了。

demo設(shè)計(jì)

為了簡(jiǎn)單的還原業(yè)務(wù)場(chǎng)景,做了簡(jiǎn)單的demo假設(shè)

實(shí)現(xiàn)一個(gè)對(duì)于學(xué)生成績(jī)?cè)u(píng)價(jià)的實(shí)時(shí)處理程序

數(shù)學(xué)成績(jī),基準(zhǔn)范圍是90-140,超出告警

物理成績(jī),基準(zhǔn)范圍是60-95,超出告警

環(huán)境搭建

使用windows環(huán)境演示

準(zhǔn)備工作

1、安裝jdk

2、安裝zookeeper

解壓壓縮包

zoo_sample.cfg將它重命名為zoo.cfg

修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置環(huán)境變量

3、安裝kafka

解壓壓縮包

修改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代碼

pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

主程序

public class StreamAlertDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);
		DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);
		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}
}
主程序,配置告警規(guī)則后期可以使用推送或者拉去方式獲取數(shù)據(jù)
public class RuleMap {
	private RuleMap(){}
	public final static Map<String,List<AlertRule>> initialRuleMap;
	private static List<AlertRule> ruleList = new ArrayList<>();
	private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}
}

AlertFlatMapper,處理告警邏輯

public class AlertFlatMapper implements FlatMapFunction<String, String> {
	@Override
	public void flatMap(String inVal, Collector<String> out) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
		List<AlertInfo> resList = new ArrayList<>();
		List<AlertRule> mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}
	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal < targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal > targetVal;
			default:
				return false;
		}
	}
}

三個(gè)實(shí)體類(lèi)

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private Integer mathVal;
    private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
	private static final long serialVersionUID = -1L;
	private String target;
	//0小于 1等于 2大于
	private Integer type;
	private Integer criticalVal;
	private String descInfo;
}

項(xiàng)目演示

創(chuàng)建kafka生產(chǎn)者 test

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

創(chuàng)建kafka消費(fèi)者 demo

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

啟動(dòng)flink應(yīng)用

給topic test發(fā)送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

消費(fèi)topic demo

告警系統(tǒng)架構(gòu)

到此這篇關(guān)于Java Flink與kafka實(shí)現(xiàn)實(shí)時(shí)告警功能過(guò)程的文章就介紹到這了,更多相關(guān)Java Flink與kafka實(shí)時(shí)告警內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring中的循環(huán)依賴問(wèn)題

    Spring中的循環(huán)依賴問(wèn)題

    在Spring框架中,循環(huán)依賴是指兩個(gè)或多個(gè)Bean相互依賴,這導(dǎo)致在Bean的創(chuàng)建過(guò)程中出現(xiàn)依賴死鎖,為了解決這一問(wèn)題,Spring引入了三級(jí)緩存機(jī)制,包括singletonObjects、earlySingletonObjects和singletonFactories
    2024-09-09
  • Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息

    Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息

    這篇文章主要給大家介紹了關(guān)于Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • SpringBoot整個(gè)啟動(dòng)過(guò)程的分析

    SpringBoot整個(gè)啟動(dòng)過(guò)程的分析

    今天小編就為大家分享一篇關(guān)于SpringBoot整個(gè)啟動(dòng)過(guò)程的分析,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-03-03
  • Java數(shù)據(jù)結(jié)構(gòu)之LinkedList的用法詳解

    Java數(shù)據(jù)結(jié)構(gòu)之LinkedList的用法詳解

    鏈表(Linked?list)是一種常見(jiàn)的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),是一種線性表。Java的LinkedList(鏈表)?類(lèi)似于?ArrayList,是一種常用的數(shù)據(jù)容器,本文就來(lái)簡(jiǎn)單講講它的使用吧
    2023-05-05
  • Springboot?HTTP如何調(diào)用其他服務(wù)

    Springboot?HTTP如何調(diào)用其他服務(wù)

    這篇文章主要介紹了Springboot?HTTP如何調(diào)用其他服務(wù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • 使用Java的Lucene搜索工具對(duì)檢索結(jié)果進(jìn)行分組和分頁(yè)

    使用Java的Lucene搜索工具對(duì)檢索結(jié)果進(jìn)行分組和分頁(yè)

    這篇文章主要介紹了使用Java的搜索工具Lucene對(duì)檢索結(jié)果進(jìn)行分組和分頁(yè)的方法,Luence是Java環(huán)境中的一個(gè)全文檢索引擎工具包,需要的朋友可以參考下
    2016-03-03
  • Spring自定義注解配置簡(jiǎn)單日志示例

    Spring自定義注解配置簡(jiǎn)單日志示例

    這篇文章主要介紹了Spring自定義注解配置簡(jiǎn)單日志示例,注解可以增強(qiáng)我們的java代碼,同時(shí)利用反射技術(shù)可以擴(kuò)充實(shí)現(xiàn)很多功能,它們被廣泛應(yīng)用于三大框架底層,需要的朋友可以參考下
    2023-05-05
  • SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼

    SpringBoot定制JSON響應(yīng)數(shù)據(jù)返回的示例代碼

    @JsonView 是 Jackson 庫(kù)中的一個(gè)注解,它允許你定義哪些屬性應(yīng)該被序列化到 JSON 中,基于不同的“視圖”或“配置”,在本文中,通過(guò)了解@JsonView,你將能夠更好地掌握如何在Spring Boot應(yīng)用中定制JSON數(shù)據(jù)的輸出,需要的朋友可以參考下
    2024-05-05
  • 解決Spring?MVC中文亂碼的編碼配置

    解決Spring?MVC中文亂碼的編碼配置

    這篇文章主要為大家介紹了解決SpringMVC中文亂碼的編碼配置示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • Java OOP三大特征之封裝繼承與多態(tài)詳解

    Java OOP三大特征之封裝繼承與多態(tài)詳解

    本文主要講述的是面向?qū)ο蟮娜筇匦裕悍庋b,繼承,多態(tài),內(nèi)容含括從封裝到繼承再到多態(tài)的所有重點(diǎn)內(nèi)容以及使用細(xì)節(jié)和注意事項(xiàng),內(nèi)容有點(diǎn)長(zhǎng),請(qǐng)大家耐心看完
    2022-07-07

最新評(píng)論