非聚合
Router:
@Router(policyId=209995, fields={'denominator'}, tags={'method'}) select * from metric(source = 'application' and metricName = 'rec.bifrost.soa_consumer_success_rate' and (
tags('method') in ('ActivityService.query', 'BrandShopService.searchBrandShopList') )) as e
Worker:
insert into check_stream_209995_0 select metricName, timestamp, FunctionUtil.tags('method', tags('method')) as tags, FunctionUtil.fields('denominator', fields('denominator')) as fields from event(tags('_policyId') = '209995')
insert into check_stream_209995_1 select metricName, timestamp, tags, fields from check_stream_209995_0
@LinDB select metricName, timestamp, tags, fields from check_stream_209995_1
@Alert select metricName, timestamp, tags, fields from check_stream_209995_1.std:groupwin(metricName , tags('method')).win:length(6) where (cast(fields('denominator'), double) > 50000.00) group by `metricName` , tags('method') having count(1)>=2 (持续触发窗口:过去6次触发2次则报警)
非聚合很简单,Router做好过滤,metric保留需要的fields和tags,Worker重新构造tags和fields,在最后@Alert语句判断convertor plugin是否满足,一个groupByTags组合一条通道,通道长6,往某通道放数据,若通道总数据大于等于2,则输出此数据,可能会淘汰老数据
聚合
Router:
@Router(policyId=209995, fields={'denominator', 'numerator'}, tags={'method'}) context pre_agg select metric_agg(e) from metric(source = 'application' and metricName = 'rec.bifrost.soa_consumer_success_rate' and ( tags('method') in ('ActivityService.query', 'BrandShopService.searchBrandShopList') )) as e group by `metricName`, DateTimeUtil.truncate(timestamp, 10000) , tags('method') output snapshot when terminated"
Worker:
create constant variable long start_time_209995=0 (workerEngine获取到后,改为实际时间再运行)
create constant variable long period_209995=60000 (聚合时间1分钟)
create context context_209995_0 start @now end me.ele.arch.watchdog.engine.esper.FlushEvent(policyId=209995 and DateTimeUtil.mod(timestamp - start_time_209995, 2 * period_209995)=0) (长度为2倍聚合时间 0-1,1-3,3-5)
context context_209995_0 insert into check_stream_209995_0 select metricName, (start_time_209995 + DateTimeUtil.truncate(timestamp - start_time_209995, period_209995)) as timestamp, FunctionUtil.tags('method', tags('method')) as tags, FunctionUtil.fields('denominator+numerator', sum(fields('denominator'))+sum(fields('numerator')), 'numerator', sum(fields('numerator'))) as fields from event(tags('_policyId') = '209995' and DateTimeUtil.mod(timestamp - start_time_209995, 2 * period_209995)<period_209995) group by `metricName`, DateTimeUtil.truncate(timestamp - start_time_209995, period_209995) , tags('method') output snapshot when terminated (聚合0-1,2-3)
create context context_209995_1 start @now end me.ele.arch.watchdog.engine.esper.FlushEvent(policyId=209995 and DateTimeUtil.mod(timestamp - start_time_209995, 2 * period_209995)=period_209995) (长度为2倍聚合时间,0-2,2-4)
context context_209995_1 insert into check_stream_209995_0 select metricName, (start_time_209995 + DateTimeUtil.truncate(timestamp - start_time_209995, period_209995)) as timestamp, FunctionUtil.tags('method', tags('method')) as tags, FunctionUtil.fields('denominator+numerator', sum(fields('denominator'))+sum(fields('numerator')), 'numerator', sum(fields('numerator'))) as fields from event(tags('_policyId') = '209995' and DateTimeUtil.mod(timestamp - start_time_209995, 2 * period_209995)>=period_209995) group by `metricName`, DateTimeUtil.truncate(timestamp - start_time_209995, period_209995) , tags('method') output snapshot when terminated (聚合1-2,3-4)
insert into check_stream_209995_1 select metricName, timestamp, tags, fields from check_stream_209995_0.std:groupwin(metricName , tags('method')).dog:des(FunctionUtil.toList('numerator'), FunctionUtil.toList('method'), false) //TrendConvertPlugin
insert into check_stream_209995_2 select metricName, timestamp, tags, fields from check_stream_209995_1 //ThresholdPlugin
@LinDB select metricName, timestamp, tags, fields from check_stream_209995_2
@Alert select metricName, timestamp, tags, fields from check_stream_209995_2.std:groupwin(metricName , tags('method')).win:length(6) where ((cast(fields('numerator'), double) > cast(fields('numerator%forecast'), double) + 1.400000 * cast(fields('numerator%width'), double) or cast(fields('numerator'), double) < cast(fields('numerator%forecast'), double) - 1.000000 * cast(fields('numerator%width'), double) ) and Math.abs(cast(fields('numerator'), double) - cast(fields('numerator%lastNAvg'), double) ) > 0.1 * cast(fields('numerator%lastNAvg'), double) ) and (cast(fields('denominator+numerator'), double) > 50000.00) group by `metricName` , tags('method') having count(1)>=2