线程池监控1-监控任务执行时间

1.背景

  线程池在执行任务过程中,往往由于个别耗时非常大的任务导致任务积压,影响性能,甚至导致系统崩溃,可以通过监控每个任务执行的耗时来提前预警,进而优化代码,使系统更稳定。

2.实现代码

实现原理:继承ThreadPoolExecutor,重写beforeExecute,在开始的时候记录开始时间,然后重写afterExecute,在结束的时候计算耗时。

package com.xkzhangsan.thread.pool.monitor;

import com.xkzhangsan.thread.pool.monitor.constant.MonitorLevelEnum;

import java.util.List;
import java.util.concurrent.*;

/**
* 线程池监控器 <br>
* 支持监控基本情况和任务执行情况<br>
*
* @author xkzhangsan
*/
public class ThreadPoolMonitor extends ThreadPoolExecutor {

private String poolName;

private MonitorLevelEnum monitorLevel = MonitorLevelEnum.NONE;

private ConcurrentHashMap<String, Long> taskStartTimeMap;

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

//overload
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
init(poolName, monitorLevel);
}

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
init(poolName, monitorLevel);
}

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
init(poolName, monitorLevel);
}

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
init(poolName, monitorLevel);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
if (isTaskMonitor()) {
//TODO 增加线程名称
taskStartTimeMap.put(String.valueOf(r.hashCode()), System.currentTimeMillis());
}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
if (isTaskMonitor()) {
Long startTime = taskStartTimeMap.remove(String.valueOf(r.hashCode()));
if (startTime == null) {
return;
}
System.out.println("poolName:" + poolName + " task:" + r.hashCode() + " cost:" + (System.currentTimeMillis() - startTime));
}
}

@Override
public void shutdown() {
super.shutdown();
//停止时,已经提交未完成的任务仍在运行,不能清理taskStartTimeMap缓存
}

@Override
public List<Runnable> shutdownNow() {
List<Runnable> runnableList = super.shutdownNow();
//停止时,终止所有任务,需要清理taskStartTimeMap缓存,否则会导致内存泄漏
taskStartTimeMap = null;
return runnableList;
}

private void init(String poolName, MonitorLevelEnum monitorLevel) {
this.poolName = poolName;
this.monitorLevel = monitorLevel;
this.taskStartTimeMap = new ConcurrentHashMap<>();
}

private boolean isTaskMonitor() {
return monitorLevel == MonitorLevelEnum.TASK || monitorLevel == MonitorLevelEnum.POOL_TASK
|| monitorLevel == MonitorLevelEnum.SUGGESTION;
}

}

 

package com.xkzhangsan.thread.pool.monitor.constant;

/**
 * 线程池监控级别 <br>
 * @author xkzhangsan
 */
public enum MonitorLevelEnum {
    /**
     * 不监控,默认值
     */
    NONE,
    /**
     * 监控基本信息
     */
    POOL,
    /**
     * 监控任务信息
     */
    TASK,
    /**
     * 同时监控基本信息和任务信息
     */
    POOL_TASK,
    /**
     * 同时监控基本信息和任务信息,并给出优化建议
     */
    SUGGESTION;
}

 

3.测试运行

3.1 测试代码

package com.xkzhangsan.thread.pool.monitor;

import com.xkzhangsan.thread.pool.monitor.constant.MonitorLevelEnum;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ThreadPoolMonitorTest {

    public static void main(String[] args) {
        taskMonitor();
    }

    public static void taskMonitor() {
        ThreadPoolMonitor threadPoolMonitor = new ThreadPoolMonitor(1, 3, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), "test", MonitorLevelEnum.TASK);
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            threadPoolMonitor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(finalI);
            });
        }
        threadPoolMonitor.shutdown();
    }
}

 

3.2 测试结果

0
poolName:test task:338537012 cost:3012
1
poolName:test task:451561153 cost:3008
2
poolName:test task:517325124 cost:3001
3
poolName:test task:1540836602 cost:3004
4
poolName:test task:575812030 cost:3010

 

源代码地址:https://github.com/xkzhangsan/thread-pool-monitor

参考:https://cloud.tencent.com/developer/article/1491221
请登录后发表评论

    没有回复内容