当前位置: 代码迷 >> 综合 >> 对线程池 ThreadPoolExecutor 进行增强,使用 metrics 增加可监控、可运维性
  详细解决方案

对线程池 ThreadPoolExecutor 进行增强,使用 metrics 增加可监控、可运维性

热度:106   发布时间:2023-11-25 14:39:40.0

一:背景

最近在做推荐排单时,有些走规则进行实时查询的需求,需要并行的远程调用处理数据,所以使用了线程池异步并行处理。但是原生线程池没有监控,没有监控就没法统计数据,没法调参数,需要一个能统计任务数据的线程池。

约束:时间有限、不脱离线程池的范畴、不过度设计。

二:方案

本次方案实现了三个功能:任务监控、告警通知、运行时参数查看。

1) 任务监控

实现方式:通过继承 ThreadPoolExecutor,重写预留的两个方法 beforeExecute(Thread t, Runnable r)、 afterExecute(Runnable r, Throwable t),使用 metrics 来统计任务的执行时间情况,塞入 Timer,监控任务级别的执行情况。指标有 任务的执行情况、95/99线、最大任务执行时间、平均任务执行时间等。

2) 告警通知
实现方式:这里有一个线程池活跃度的定义,线程池活跃度 = activeCount/maximumPoolSize,在 afterExecute(r,t) 中,检测最大任务数占线程池的比例,在大于 0.8 时,使用回调函数进行通知。
回调函数可以不通知,因为在获取 活跃任务数的时候,使用到了全局锁 mainLock ,所以会影响性能。
可以优化为开启线程检测 metric 中的统计数据,但是比较重,看效果后续可能加上。

3) 运行时参数查看
实现方式:通过 ThreadPoolExecutor 的 getxxx() 方法,把线程中参数透传出来,便于监控。但是统计的时候需要 mainlock 锁,所以会影响性能。
这里还可以通过 ThreadPoolExecutor 的 setxxx() 方法,动态调整线程参数。

三:统计数据及代码

{"preorderdispatch": {"activeCount": 1,"taskCount": 114,"queueSize": 0,"largestPoolSize": 4,"poolSize": 4,"corePoolSize": 4,"queueType": "LinkedBlockingQueue","threshold": 0.0625,"completedTaskCount": 113,"maximumPoolSize": 16,"poolName": "preorderdispatch"}
}"com.jielin.service.helper.MetricThreadPoolExecutor.preorderdispatch": "{"count": 113,"max": 6369707.927201,"mean": 63661.37753799848,"min": 2.0927439999999997,"p50": 48144.104597,"p75": 108121.226983,"p95": 108121.226983,"p98": 108121.226983,"p99": 108121.226983,"p999": 108121.226983,"values": [2.0927439999999997,2.763193,1512.84979,4146.670182,4251.080376,2614438.16841,6369707.927201......  // 省略],"stddev": 42183.84282547084,"m15_rate": 0.006633354226373456,"m1_rate": 0.003916834328484738,"m5_rate": 0.011232882660355912,"mean_rate": 0.0021251915446803046,"duration_units": "milliseconds","rate_units": "calls/second"
}"

监控使用了,io.dropwizard.metrics metrics-core 包,挺轻量的工具。监控代码:

public class MetricService {
    private static ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();private static final MetricRegistry metricRegistry = new MetricRegistry();private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, true, MetricFilter.ALL));public static Timer newTimer(Class<?> klass, String name) {
    String metricName = MetricRegistry.name(klass, name);Timer timer = metricRegistry.timer(metricName);timerMap.put(metricName, timer);return timer;}public static Map<String, String> metrics() {
    Map<String, String> retMap = new HashMap<>();timerMap.forEach( (s, t) -> {
    try {
    String metricsStr = objectMapper.writeValueAsString(t);retMap.put(s, metricsStr);} catch (JsonProcessingException e) {
    e.printStackTrace();}});return retMap;}
}

线程池代码:

public class MetricThreadPoolExecutor extends ThreadPoolExecutor {
    private static final Map<Integer, Timer.Context> timerContextMap = new HashMap<>();private static final Map<String, MetricThreadPoolExecutor> threadPoolExecutorMap = new HashMap<>();private final String name;private final Timer timer;private final Notification notification;public MetricThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notification notification) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new DefaultThreadFactory(name));this.timer = MetricService.newTimer(this.getClass(), name);this.name = name;this.notification = notification;threadPoolExecutorMap.put(name, this);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {
    super.beforeExecute(t, r);try {
    timerContextMap.put(r.hashCode(), timer.time());} catch (Exception e) {
    e.printStackTrace();}}@Overrideprotected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);try {
    timerContextMap.get(r.hashCode()).stop();timerContextMap.remove(r.hashCode());if (this.notification != null) {
    if ((double) getActiveCount() / getMaximumPoolSize() > 0.8) {
     // 线程池活跃度 = activeCount/maximumPoolSizenotification.warningNotify();}}} catch (Exception e) {
    e.printStackTrace();}}private String getPoolName() {
    return this.name;}public static Map<String, JSONObject> runtimeInfo() {
    Map<String, JSONObject> retMap = new HashMap<>();threadPoolExecutorMap.forEach((s, t) -> {
    JSONObject j = new JSONObject();String poolName = t.getPoolName();Integer corePoolSize = t.getCorePoolSize();int maximumPoolSize = t.getMaximumPoolSize();Integer poolSize = t.getPoolSize();Integer activeCount = t.getActiveCount();double threshold = (double) activeCount / maximumPoolSize;  // 活跃度String queueType = t.getQueue().getClass().getSimpleName();Integer queueSize = t.getQueue().size();Integer completedTaskCount  = Long.valueOf(t.getCompletedTaskCount()).intValue();Integer taskCount  = Long.valueOf(t.getTaskCount()).intValue();Integer largestPoolSize = t.getLargestPoolSize();j.put("poolName", poolName);j.put("corePoolSize", corePoolSize);j.put("maximumPoolSize", maximumPoolSize);j.put("poolSize", poolSize);j.put("activeCount", activeCount);j.put("threshold", threshold);j.put("queueType", queueType);j.put("queueSize", queueSize);j.put("completedTaskCount", completedTaskCount);j.put("taskCount", taskCount);j.put("largestPoolSize", largestPoolSize);retMap.put(s, j);});return retMap;}public interface Notification {
    void warningNotify();}
}

四:参考

附上几篇不错的文章:

Java线程池实现原理及其在美团业务中的实践

Metrics 是个什么鬼之入门教程| Jark’s Blog

  相关解决方案