详解Java线程池监控

详解Java线程池监控

背景业务使用线程池的时候,出现了问题,影响线上业务,由于没有线程池监控,导致问题难以发现和排查。于是需要这么一个线程池监控组件,用来监控线程池执行状态,任务执行状态等。

实现方式ThreadPoolExecutor 提供了以下几个方法可以监控线程池的使用情况:

方法

含义

getActiveCount()

线程池中正在执行任务的线程数量

getCompletedTaskCount()

线程池已完成的任务数量,该值小于等于 taskCount

getCorePoolSize()

线程池的核心线程数量

getLargestPoolSize()

线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize

getMaximumPoolSize()

线程池的最大线程数量

getPoolSize()

线程池当前的线程数量

getTaskCount()

线程池已经执行的和未执行的任务总数

通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法, afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。极简示例如下

代码语言:javascript代码运行次数:0运行复制@Slf4j

public class ThreadPoolMonitor extends ThreadPoolExecutor {

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,

TimeUnit unit, BlockingQueue workQueue,

ThreadFactory threadFactory) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

log.info("init");

}

@Override

public void shutdown() {

log.info("shutdown");

super.shutdown();

}

@Override

public List shutdownNow() {

log.info("shutdownNow");

return super.shutdownNow();

}

@Override

protected void beforeExecute(Thread t, Runnable r) {

log.info("beforeExecute");

}

@Override

protected void afterExecute(Runnable r, Throwable t) {

log.info("afterExecute");

}

}

实战应用上面是已经说明该组件的实现方式,但是在生产环境中,面对业务的复杂度高、变数大,我们应该如何实现一个高可拓展的线程池监控组件呢?这是这一小节的内容主题。

使用示例先来个 hello world 演示效果

代码语言:javascript代码运行次数:0运行复制@Test

public void helloWorld() throws InterruptedException {

//创建线程池对象

MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor("被监控的线程池1", 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new MonitorConfig().setQueueSlowTime(100).setTaskSlowTimeThreshold(100));

monitoredThreadPoolExecutor.execute(() -> {

log.info("任务1开始……");

try {

Thread.sleep(RandomUtils.nextInt(1000, 2000));

} catch (InterruptedException ignore) {

}

log.info("任务1完成……");

});

monitoredThreadPoolExecutor.execute(() -> {

log.info("任务2开始……");

try {

Thread.sleep(RandomUtils.nextInt(0, 100));

} catch (InterruptedException ignore) {

}

log.info("任务2完成……");

});

Thread.currentThread().join(5 * 1000);

}

hello world 程序运行日志如下:

代码语言:javascript代码运行次数:0运行复制Connected to the target VM, address: '127.0.0.1:62724', transport: 'socket'

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 提交任务数+1

[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 提交任务数+1

[被监控的线程池1_1] INFO MonitoredThreadPoolExecutorTest - 任务2开始……

[被监控的线程池1_0] INFO MonitoredThreadPoolExecutorTest - 任务1开始……

[被监控的线程池1_1] INFO MonitoredThreadPoolExecutorTest - 任务2完成……

[被监控的线程池1_1] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 任务排队时间 = 1, 任务执行时间 = 86

[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 1, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 2

[被监控的线程池1_0] INFO MonitoredThreadPoolExecutorTest - 任务1完成……

[被监控的线程池1_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 任务排队时间 = 2, 任务执行时间 = 1452

[被监控的线程池1_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 执行慢任务数+1

[ThreadPoolMonitor_1] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0

[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0

[ThreadPoolMonitor_2] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0

from the target VM, address: '127.0.0.1:62724', transport: 'socket'

详细使用示例「监控方式」

线程池的监控分为 2 种类型,一种是在执行任务前后全量统计任务排队时间和执行时间,另外一种是通过定时任务,定时获取活跃线程数,队列中的任务数,核心线程数,最大线程数等数据。

MonitoredThreadPoolExecutor 会同时统计这两种类型的数据。如果您不想统计全量任务执行和排队的监控数据,可以使用 ThreadPoolMonitor.monitor(String name, ThreadPoolExecutor threadPoolExecutor) 方法,该方法只使用定时任务来监控线程数据。其中,name 需要唯一,threadPoolExecutor 不能是 MonitoredThreadPoolExecutor 类型,否则会抛出异常。

「监控参数」

poolName :线程池名称。必须为每个线程池创建不同的名称,否则会抛出异常。可以将其作为监控平台的 id,通过名称找到对应的监控数据。monitorConfig :监控配置参数。其中可以设置两个参数,taskSlowTimeThreshold 和 queueSlowTimeThreshold。如果 taskSlowTime 指定为 100,则表示任务执行时间大于 100ms 的任务会统计为慢任务,在监控中可以看到慢任务的数量。同样的,queueSlowTime 指定为 100,表示排队时间大于 100ms 的任务统计为排队慢任务,可以在监控中看到排队慢任务的数量。其他参数和 JDK 中线程池参数意义相同。

示例代码

代码语言:javascript代码运行次数:0运行复制@Slf4j

public class MonitoredThreadPoolExecutorTest {

@Test

public void helloWorld() throws InterruptedException {

//创建线程池对象

MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor("被监控的线程池1", 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new MonitorConfig().setQueueSlowTimeThreshold(100).setTaskSlowTimeThreshold(100));

monitoredThreadPoolExecutor.execute(() -> {

log.info("任务1开始……");

try {

Thread.sleep(RandomUtils.nextInt(1000, 2000));

} catch (InterruptedException ignore) {

}

log.info("任务1完成……");

});

monitoredThreadPoolExecutor.execute(() -> {

log.info("任务2开始……");

try {

Thread.sleep(RandomUtils.nextInt(0, 100));

} catch (InterruptedException ignore) {

}

log.info("任务2完成……");

});

Thread.currentThread().join(5 * 1000);

}

@Test

public void ThreadPoolExecutorTest() throws InterruptedException {

//线程池需要指定唯一的线程池名称,否则会抛出异常

String uniqPoolName = "被监控的线程池2";

MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor(uniqPoolName, 1, 4, 60,

TimeUnit.SECONDS, new ArrayBlockingQueue<>(256), new MonitorConfig().setQueueSlowTimeThreshold(100).setTaskSlowTimeThreshold(100)) {

//如果想在任务执行开始或者执行结束时,执行一些操作,覆盖afterExecute0(Runnable r, Throwable t)和beforeExecute0(Thread t, Runnable r),注意方法名称后面有0

@Override

public void afterExecute0(Runnable r, Throwable t) {

log.info("增强afterExecute0");

}

@Override

public void beforeExecute0(Thread t, Runnable r) {

log.info("增强beforeExecute0");

}

};

//使用方式和ThreadPoolExecutor完全相同

for (int i = 0; i < 3; i++) {

int taskId = i;

monitoredThreadPoolExecutor.submit(() -> log.info("任务{}", taskId));

}

for (int i = 0; i < 3; i++) {

monitoredThreadPoolExecutor.submit(() -> {

int id = RandomUtils.nextInt();

log.info("生成id = {}", id);

return id;

});

}

//使用结束后,如果需要再创建相同名称的线程池,则需要调用remove方法移除定时任务。

ThreadPoolMonitor.remove(uniqPoolName);

//关闭线程池,在一段时间内会关闭所有监控的定时任务

monitoredThreadPoolExecutor.shutdown();

Thread.currentThread().join(2 * 1000);

}

}

运行日志

代码语言:javascript代码运行次数:0运行复制[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1

[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池2, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 1, 最大线程数 = 4, 执行的任务总数 = 0

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务1

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务2

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 488282372

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 3

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 1176017668

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 4, 任务执行时间 = 0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 463520743

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 4, 任务执行时间 = 0

[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

主要实现ThreadPoolMonitor 负责线程池与监控方法的管理;代码语言:javascript代码运行次数:0运行复制public class ThreadPoolMonitor {

private static final Map POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();

private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

public ThreadPoolMonitor() {

}

public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {

if (threadPoolExecutor instanceof MonitoredThreadPoolExecutor) {

throw new IllegalArgumentException("MonitoredThreadPoolExecutor is already monitored.");

} else {

monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);

}

}

public static void remove(String name) {

ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);

if (futureWrapper != null) {

futureWrapper.future.cancel(false);

}

}

public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {

ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);

if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {

POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);

futureWrapper.future.cancel(false);

}

}

static void monitor(MonitoredThreadPoolExecutor threadPoolExecutor) {

monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);

}

private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {

PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);

POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {

if (v == null) {

return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);

} else {

throw new IllegalStateException("duplicate pool name: " + name);

}

});

}

static {

Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));

}

static class FutureWrapper {

private final Future future;

private final ThreadPoolExecutor threadPoolExecutor;

public FutureWrapper(Future future, ThreadPoolExecutor threadPoolExecutor) {

this.future = future;

this.threadPoolExecutor = threadPoolExecutor;

}

}

}

给异步任务 Runnable 套一个壳,让他该任务可监控;代码语言:javascript代码运行次数:0运行复制public class MonitoredRunnable implements Runnable, Monitored {

private final Runnable runnable;

private final long inQueueNanoTime;

public MonitoredRunnable(Runnable runnable) {

this.runnable = runnable;

this.inQueueNanoTime = System.nanoTime();

}

@Override

public long inQueueNanoTime() {

return this.inQueueNanoTime;

}

@Override

public void run() {

this.runnable.run();

}

}

MonitoredThreadPoolExecutor 继承 ThreadPoolExecutor 覆盖其方法做监控统计增强;代码语言:javascript代码运行次数:0运行复制@Slf4j

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

private final ThreadLocal executeStartTimeThreadLocal;

protected String poolName;

private final int slowTaskThreshold;

private final int queueTimeThreshold;

private static final int DEFAULT_SLOW_TASK_TIME = 5000;

private static final int DEFAULT_QUEUE_TIME = 100;

@Override

public void execute(Runnable command) {

log.info("线程池名称 = {}, 提交任务数+1", this.poolName());

super.execute(new MonitoredRunnable(command));

}

public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {

this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), new AbortPolicy(), new MonitorConfig());

}

public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, new MonitorConfig());

}

public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, MonitorConfig monitorConfig) {

this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), new AbortPolicy(), monitorConfig);

}

public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, MonitorConfig monitorConfig) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new MonitorRejectedExecutionHandler(handler, poolName));

this.executeStartTimeThreadLocal = new ThreadLocal<>();

this.poolName = poolName;

this.slowTaskThreshold = monitorConfig.getTaskSlowTimeThreshold() > 0 ? monitorConfig.getTaskSlowTimeThreshold() : DEFAULT_SLOW_TASK_TIME;

this.queueTimeThreshold = monitorConfig.getQueueSlowTimeThreshold() > 0 ? monitorConfig.getQueueSlowTimeThreshold() : DEFAULT_QUEUE_TIME;

ThreadPoolMonitor.monitor(this);

}

@Override

protected final void beforeExecute(Thread t, Runnable r) {

try {

this.beforeExecute0(t, r);

} finally {

this.executeStartTimeThreadLocal.set(System.nanoTime());

}

}

@Override

protected final void afterExecute(Runnable r, Throwable t) {

this.afterExecuteMonitor(r, t);

this.afterExecute0(r, t);

}

private void afterExecuteMonitor(Runnable r, Throwable t) {

try {

long executeEndNano = System.nanoTime();

Long executeStartTime = this.executeStartTimeThreadLocal.get();

Monitored monitored = (Monitored) r;

long queueNanoTime = monitored.inQueueNanoTime();

int queueTime = (int) ((executeStartTime - queueNanoTime) / 1000000L);

int executeTime = (int) ((executeEndNano - executeStartTime) / 1000000L);

log.info("线程池名称 = {}, 任务排队时间 = {}, 任务执行时间 = {}",

this.poolName(), queueTime, executeTime);

if (executeTime > this.slowTaskThreshold) {

log.info("线程池名称 = {}, 执行慢任务数+1", this.poolName());

}

if (queueTime > this.queueTimeThreshold) {

log.info("线程池名称 = {}, 排队慢任务数+1", this.poolName());

}

if (t != null) {

log.info("线程池名称 = {}, 执行异常的任务数+1", this.poolName());

}

} catch (Exception ignore) {

} finally {

executeStartTimeThreadLocal.remove();

}

}

protected void beforeExecute0(Thread t, Runnable r) {

}

protected void afterExecute0(Runnable r, Throwable t) {

}

@Override

protected final void terminated() {

ThreadPoolMonitor.remove(this.poolName(), this);

}

public String poolName() {

return this.poolName;

}

}

PoolMonitorTask 定时收集线程池监控项的任务实现;代码语言:javascript代码运行次数:0运行复制@Slf4j

@Getter

public class PoolMonitorTask implements Runnable {

private final ThreadPoolExecutor monitoredThreadPool;

private final String poolName;

private volatile long lastTaskCount = 0L;

public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) {

this.monitoredThreadPool = monitoredThreadPool;

this.poolName = poolName;

}

@Override

public void run() {

int activeCount = this.monitoredThreadPool.getActiveCount();

int corePoolSize = this.monitoredThreadPool.getCorePoolSize();

int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize();

int queueTaskSize = this.monitoredThreadPool.getQueue().size();

long taskCount = this.monitoredThreadPool.getTaskCount();

int executedTask = (int) (taskCount - this.lastTaskCount);

log.info("线程池名称 = {}, 活跃线程数峰值 = {}, 队列任务数峰值 = {}, 核心线程数 = {}, 最大线程数 = {}, 执行的任务总数 = {}",

this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask);

this.lastTaskCount = taskCount;

if (this.monitoredThreadPool.isTerminated()) {

ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool);

}

}

}

ThreadPoolMonitor 线程池监控者,负责线程池与监控方法的管理,定时采集任务的执行者;代码语言:javascript代码运行次数:0运行复制public class ThreadPoolMonitor {

private static final Map POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();

private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

public ThreadPoolMonitor() {

}

public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {

if (threadPoolExecutor instanceof MonitoredThreadPoolExecutor) {

throw new IllegalArgumentException("MonitoredThreadPoolExecutor is already monitored.");

} else {

monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);

}

}

public static void remove(String name) {

ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);

if (futureWrapper != null) {

futureWrapper.future.cancel(false);

}

}

public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {

ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);

if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {

POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);

futureWrapper.future.cancel(false);

}

}

static void monitor(MonitoredThreadPoolExecutor threadPoolExecutor) {

monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);

}

private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {

PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);

POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {

if (v == null) {

return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);

} else {

throw new IllegalStateException("duplicate pool name: " + name);

}

});

}

static {

Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));

}

static class FutureWrapper {

private final Future future;

private final ThreadPoolExecutor threadPoolExecutor;

public FutureWrapper(Future future, ThreadPoolExecutor threadPoolExecutor) {

this.future = future;

this.threadPoolExecutor = threadPoolExecutor;

}

}

}

相关推荐

轩辕传奇哪个区人多(轩辕传奇哪个区人最多)
比分365网页版

轩辕传奇哪个区人多(轩辕传奇哪个区人最多)

📅 07-02 👁️ 7157
如何正确安装驱动程序的详细步骤与技巧
365bet官网网址

如何正确安装驱动程序的详细步骤与技巧

📅 07-29 👁️ 6325
DNF2021:法驱与力驱的全面对比,哪个更适合你?
365bet官网网址

DNF2021:法驱与力驱的全面对比,哪个更适合你?

📅 08-08 👁️ 6243