flink cep 案例之机架温度监控报警
FlinkCEP是在Flink之上实现的复杂事件处理库。它提供了丰富的API,允许您在不停止的事件流中检测事件模式,并对复杂事件做相应处理。模式匹配是复杂事件处理的一个有力的保障,应用场景包括受一系列事件驱动的各种业务流程,例如在正常的网略行为中侦测异常行为;在金融应用中查找价格、交易量和其他行为的模式。
特点:
- 复杂性:多个流join,窗口聚合,事件序列或patterns检测
- 低延迟:秒或毫秒级别,比如做信用卡盗刷检测,或攻击检测
- 高吞吐:每秒上万条消息
在这篇博客中,我们将通过一个案例来讲解flink CEP的使用。 案例来源于官网博客:https://flink.apache.org/news/2016/04/06/cep-monitoring.html
输入事件流由来自一组机架的温度和功率事件组成。目标是检测 当机架过热时我们需要发出警告和报警。
我们通过自定义的source来模拟生成机架的温度,然后定义以下的规则来生成警告和报警
- 警告:某机架在10秒内连续两次上报的温度超过阈值;
- 报警:某机架在20秒内连续两次匹配警告;
首先我们定义一个监控事件
注意要重写里面的hashcode方法和equal方法
来自官网:The events in the DataStream to which you want to apply pattern matching must implement proper equals() and hashCode() methods because FlinkCEP uses them for comparing and matching events.
public abstract class MonitoringEvent {
private int rackID;
public MonitoringEvent(int rackID) {
this.rackID = rackID;
}
public int getRackID() {
return rackID;
}
public void setRackID(int rackID) {
this.rackID = rackID;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof MonitoringEvent) {
MonitoringEvent monitoringEvent = (MonitoringEvent) obj;
return monitoringEvent.canEquals(this) && rackID == monitoringEvent.rackID;
} else {
return false;
}
}
@Override
public int hashCode() {
return rackID;
}
public boolean canEquals(Object obj) {
return obj instanceof MonitoringEvent;
}
}
public class TemperatureEvent extends MonitoringEvent {
private double temperature;
...
}
public class PowerEvent extends MonitoringEvent {
private double voltage;
...
}
我们通过自定义的source来模拟生成MonitoringEvent数据。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Input stream of monitoring events
DataStream<MonitoringEvent> inputEventStream = env
.addSource(new MonitoringEventSource(
MAX_RACK_ID,
PAUSE,
TEMPERATURE_RATIO,
POWER_STD,
POWER_MEAN,
TEMP_STD,
TEMP_MEAN))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
接下来定义模式,在10秒钟之内连续两个event的温度超过阈值
// Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
// appearing within a time interval of 10 seconds
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
.subtype(TemperatureEvent.class)
.where(new IterativeCondition<TemperatureEvent>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
return value.getTemperature() >= TEMPERATURE_THRESHOLD;
}
})
.next("second") //紧接着上一个事件
.subtype(TemperatureEvent.class)
.where(new IterativeCondition<TemperatureEvent>() {
private static final long serialVersionUID = 2392863109523984059L;
@Override
public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
return value.getTemperature() >= TEMPERATURE_THRESHOLD;
}
})
.within(Time.seconds(10));
使用报警模式和输入流生成模式流
// Create a pattern stream from our warning pattern
PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
inputEventStream.keyBy("rackID"),
warningPattern);
使用select方法为每个匹配的报警模式生成相应的报警。其中返回值是一个map,key是我们定义的模式,value是匹配的事件列表。
// Generate temperature warnings for each matched warning pattern
DataStream<TemperatureWarning> warnings = tempPatternStream.select(
(Map<String, List<MonitoringEvent>> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);
return new TemperatureWarning(first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2);
}
);
以上我们最后生成了相应的用于警告的DataStream类型的数据流warnings,接下来我们使用这个警告流来生成我们的报警流,即在20秒内连续两次发生警告。
// Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
.next("second")
.within(Time.seconds(20));
然后通过上面的报警模式alertPattern和警告流warnings生成我们的报警流alertPatternStream。
// Create a pattern stream from our alert pattern
PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
warnings.keyBy("rackID"),
alertPattern);
最后当收集到的两次警告中,第一次警告的平均温度小于第二次的时候,生成报警,封装TemperatureAlert信息返回。
// Generate a temperature alert only if the second temperature warning's average temperature is higher than
// first warning's temperature
DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
(Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
TemperatureWarning first = pattern.get("first").get(0);
TemperatureWarning second = pattern.get("second").get(0);
if (first.getAverageTemperature() < second.getAverageTemperature()) {
out.collect(new TemperatureAlert(first.getRackID()));
}
},
TypeInformation.of(TemperatureAlert.class));
最后我们将报警流和警告流输出,当然我们也可以对这两个流做其他的操作,比如发到报警系统等。
// Print the warning and alert events to stdout
warnings.print();
alerts.print();
参考: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html [2] https://flink.apache.org/news/2016/04/06/cep-monitoring.html
- 带你零基础入门express
- 动态控制C4C UI元素的显示和隐藏
- 深度学习(deep learning)发展史
- 遗传算法简述
- Spark详解03Job 物理执行图Job 物理执行图
- 干货|Kotlin入门第一课:从对比Java开始
- Spark详解04Shuffle 过程Shuffle 过程
- Spark详解02Job 逻辑执行图Job 逻辑执行图
- Spark详解01概览|Spark部署|执行原理概览Job 例子
- Spark详解05架构Architecture架构
- SQL Server常用命令(平时不用别忘了)
- Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint
- SQL Server 学习笔记
- Collaborative Filtering(协同过滤)算法详解
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 2019 ICPC 银川网络赛 H. Fight Against Monsters
- 状态压缩DP(大佬写的很好,转来看)
- 2019 ICPC 银川网络赛 F-Moving On (卡Cache)
- 树形结构--二叉树的遍历算法应用(十九)
- POJ1088 滑雪题解+HDU 1078(记忆化搜索DP)
- 2019 ICPC 南京网络赛 F Greedy Sequence
- 补题Codeforces 1102E. Monotonic Renumeration
- 2019 ICPC 南京网络赛 H-Holy Grail
- 写代码?程序猿?你不能不懂的八大排序算法的Python实现
- Java开发编程规范:5.集合处理
- codeforce 272E Dima and Horses (假DFS)
- 网速慢?NO可能是路由器的原因?
- codeforce 272B Dima and Sequence
- 微软自家的.Net下的JavaScript引擎--- ClearScript
- HDU 1159.Common Subsequence【动态规划DP】