自定义业务指标开发
Flink自身具有一套完整的监控系统,用于监控Flink任务的运行状况,具有如Failover、输入输出QPS、CheckPoint成功率、延迟等一系列指标,但是对于业务而言,在一些情况下,仅有任务本身的监控是不够的,Flink任务运行正常也不能意味着业务场景没有问题,用户需要将Flink任务计算的某些业务指标(如统计金额异常的订单数)进行上报,并且监控异常,发送报警。基于此需求,我们在提供了上报业务指标并进行监控报警的方案。
本文将从三个方面:1、业务指标的上报;2、监控展示;3、异常报警三方面指导用户完成业务监控报警需求。
1、业务指标的上报
Flink提供了4种类型的Metrics:Counters, Gauges, Histograms, Meters,每种metrics的各自特点可参考 官方文档。
在绝大部分的场景中,Counters和Gauges就可以满足需求,Counters主要用于累加的Metircs,比如计算1分钟内的金额,Gauges主要用于设置当前值,比如计算当前这条数据的延迟,为了方便用户使用,我们将这两种类型的Metrics进行了封装,用户只需要引入pom依赖,然后调用相应的接口即可。
1.1 添加mvn仓库(在Flink 任务的pom文件中添加仓库)
<repositories>
<repository>
<id>libs-releases</id>
<url>http://mvn.hz.netease.com/artifactory/libs-releases</url>
</repository>
<repository>
<id>libs-snapshots</id>
<url>http://mvn.hz.netease.com/artifactory/libs-snapshots</url>
</repository>
</repositories>
1.2 添加依赖
<dependency>
<groupId>com.netease.sloth</groupId>
<artifactId>sloth-udmetric-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
1.3 选择指标类型
1.3.1 MetricGauge
MetricGauge主要用于上报当前值,其中setValue(long value)方法会将设置当前值,每次getValue时会将value上报到指标系统中并将value赋为0
/** the current value. */
private AtomicLong value = new AtomicLong(0);
@Override
public Long getValue() {
return value.getAndSet(0);
}
public void setValue(Long value) {
this.value.set(value);
}
1.3.1 MetricCounter
MetricGauge主要用于上报累加值,其中包含一个累加器counter,inc()方法会将counter加1,inc(long n)方法会将counter加n,每次getValue时会将value上报到指标系统中并将value赋为0。
/** the current value. */
private AtomicLong count = new AtomicLong(0);
@Override
public void inc() {
count.incrementAndGet();
}
@Override
public void inc(long n) {
count.addAndGet(n);
}
@Override
public void dec() {
count.decrementAndGet();
}
@Override
public void dec(long n) {
count.set(count.get() - n);
}
@Override
public long getCount() {
return count.getAndSet(0);
}
1.4 指标上报
Metric指标的上报包含3个过程:
1.4.1 在静态域中创建Metric对象
private static MetricCounter count = new MetricCounter();
private static MetricGague gauge = new MetricGague();
1.4.2 在一个RichFunction中的open()函数中注册Metric
注册Metric需要调用getRuntimeContext方法,该方法需要在RichFunction的open()方法中才能获得。
stream.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters){
getRuntimeContext().getMetricGroup().addGroup("MetricName", "IpCount").counter("UDMetric", count);
getRuntimeContext().getMetricGroup().addGroup("MetricName", "IpGauge").gauge("UDMetric", gauge);
}
}
这里需要注意的是,getRuntimeContext().getMetricGroup().addGroup("MetricName", "IpCount").counter("UDMetric", count)中,加粗部分MetricName及UDMetric是固定的,是用于上报指标时监控系统进行过滤识别业务指标的,因此这两个字段不能改动,而IpCount是用户定义的指标名称,可以根据需要设置,count是静态域中定义的Metric对象。
1.4.3 上报指标
在完成好Metric的创建和注册后,用户就可以在代码中上报自己的指标了
@Override
public String map(String s) throws Exception {
gauge.setValue(1L);
count.inc(1);
return s;
}
1.4.4 添加Falcon Reporter类
在完成指标上报后,需要在任务依赖中添加Open falcon Reporter类,因此,需要将下面的jar包添加到任务的依赖中: ic-falcon-1.10.jar
注意:依赖Jar包根据不同引擎版本有所区别,可咨询对应技术支持进行获取与详细确认。 |
2 监控指标查看
指标上报完成后,可以登陆Grafana监控页面,查看业务监控指标栏查看指标上报情况,在panel的下方,可以看到对应的指标名。其中,对于Gauge类型指标,展示max值,代表指定周期内的最大值;对于Counter类型指标,展示sum值,代表指定周期内sum的求和值,sum的周期可以在界面上方进行选择,目前可供选择的周期有10s/1min/5min/1h。