1.背景
线程池在执行任务过程中,往往由于个别耗时非常大的任务导致任务积压,影响性能,甚至导致系统崩溃,可以通过监控每个任务执行的耗时来提前预警,进而优化代码,使系统更稳定。
2.实现代码
实现原理:继承ThreadPoolExecutor,重写beforeExecute,在开始的时候记录开始时间,然后重写afterExecute,在结束的时候计算耗时。
package com.xkzhangsan.thread.pool.monitor;
import com.xkzhangsan.thread.pool.monitor.constant.MonitorLevelEnum;
import java.util.List;
import java.util.concurrent.*;
/**
* 线程池监控器 <br>
* 支持监控基本情况和任务执行情况<br>
*
* @author xkzhangsan
*/
public class ThreadPoolMonitor extends ThreadPoolExecutor {
private String poolName;
private MonitorLevelEnum monitorLevel = MonitorLevelEnum.NONE;
private ConcurrentHashMap<String, Long> taskStartTimeMap;
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
//overload
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
init(poolName, monitorLevel);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
init(poolName, monitorLevel);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
init(poolName, monitorLevel);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, String poolName, MonitorLevelEnum monitorLevel) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
init(poolName, monitorLevel);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (isTaskMonitor()) {
//TODO 增加线程名称
taskStartTimeMap.put(String.valueOf(r.hashCode()), System.currentTimeMillis());
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (isTaskMonitor()) {
Long startTime = taskStartTimeMap.remove(String.valueOf(r.hashCode()));
if (startTime == null) {
return;
}
System.out.println("poolName:" + poolName + " task:" + r.hashCode() + " cost:" + (System.currentTimeMillis() - startTime));
}
}
@Override
public void shutdown() {
super.shutdown();
//停止时,已经提交未完成的任务仍在运行,不能清理taskStartTimeMap缓存
}
@Override
public List<Runnable> shutdownNow() {
List<Runnable> runnableList = super.shutdownNow();
//停止时,终止所有任务,需要清理taskStartTimeMap缓存,否则会导致内存泄漏
taskStartTimeMap = null;
return runnableList;
}
private void init(String poolName, MonitorLevelEnum monitorLevel) {
this.poolName = poolName;
this.monitorLevel = monitorLevel;
this.taskStartTimeMap = new ConcurrentHashMap<>();
}
private boolean isTaskMonitor() {
return monitorLevel == MonitorLevelEnum.TASK || monitorLevel == MonitorLevelEnum.POOL_TASK
|| monitorLevel == MonitorLevelEnum.SUGGESTION;
}
}
package com.xkzhangsan.thread.pool.monitor.constant; /** * 线程池监控级别 <br> * @author xkzhangsan */ public enum MonitorLevelEnum { /** * 不监控,默认值 */ NONE, /** * 监控基本信息 */ POOL, /** * 监控任务信息 */ TASK, /** * 同时监控基本信息和任务信息 */ POOL_TASK, /** * 同时监控基本信息和任务信息,并给出优化建议 */ SUGGESTION; }
3.测试运行
3.1 测试代码
package com.xkzhangsan.thread.pool.monitor; import com.xkzhangsan.thread.pool.monitor.constant.MonitorLevelEnum; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class ThreadPoolMonitorTest { public static void main(String[] args) { taskMonitor(); } public static void taskMonitor() { ThreadPoolMonitor threadPoolMonitor = new ThreadPoolMonitor(1, 3, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), "test", MonitorLevelEnum.TASK); for (int i = 0; i < 5; i++) { int finalI = i; threadPoolMonitor.execute(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(finalI); }); } threadPoolMonitor.shutdown(); } }
3.2 测试结果
0 poolName:test task:338537012 cost:3012 1 poolName:test task:451561153 cost:3008 2 poolName:test task:517325124 cost:3001 3 poolName:test task:1540836602 cost:3004 4 poolName:test task:575812030 cost:3010
源代码地址:https://github.com/xkzhangsan/thread-pool-monitor
参考:https://cloud.tencent.com/developer/article/1491221
没有回复内容