您的位置:

深入浅出ScheduledThreadPoolExecutor

一、ScheduledThreadPoolExecutor是什么

ScheduledThreadPoolExecutor是一个多线程任务调度器,可以在指定的时间(延迟)后执行任务或定期执行任务。使用ScheduledThreadPoolExecutor可以帮助我们实现复杂的任务调度需求,如定时任务、延迟任务、周期性任务等。

ScheduledThreadPoolExecutor的主要属性:

  • corePoolSize: 线程池核心线程数,即同时执行的任务数。
  • maximumPoolSize: 线程池最大线程数。
  • keepAliveTime: 空闲线程的存活时间。
  • ThreadFactory: 线程工厂,用于创建线程对象。
  • RejectedExecutionHandler: 拒绝策略,用于决定当任务队列已满时如何处理新任务。
import java.util.concurrent.*;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        Runnable task = () -> {
            System.out.println("Task executed at " + System.currentTimeMillis());
        };

        System.out.println("Scheduled to run after 2 seconds");
        executor.schedule(task, 2, TimeUnit.SECONDS);

        System.out.println("Scheduled to run after every 3 seconds, with initial delay of 1 second");
        executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);

        System.out.println("Shutdown executor");
        executor.shutdown();
    }
}

二、ScheduledThreadPoolExecutor的使用

1、延迟任务

我们可以使用schedule()方法在指定的时间后执行任务,这个时间可以用时间和时间单位(TimeUnit)表示。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

Runnable task = () -> {
    System.out.println("Task executed at " + System.currentTimeMillis());
};

System.out.println("Scheduled to run after 2 seconds");
executor.schedule(task, 2, TimeUnit.SECONDS);

上面的代码中,我们创建了一个ScheduledExecutorService,然后通过schedule()方法延迟2秒执行任务。在任务执行时,我们会输出任务执行时间的信息。

2、周期性任务

我们也可以使用scheduleAtFixedRate()方法定期执行任务,参数包括初始延迟时间、执行周期,以及时间单位。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

Runnable task = () -> {
    System.out.println("Task executed at " + System.currentTimeMillis());
};

System.out.println("Scheduled to run after every 3 seconds, with initial delay of 1 second");
executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);

上面的代码中,我们执行了一个周期性任务,并且设置了初始延迟时间为1秒,执行周期为3秒。在任务执行时,我们同样会输出任务执行时间的信息。

3、使用线程池

ScheduledThreadPoolExecutor本质上仍然是一个线程池,我们可以像使用ThreadPoolExecutor一样使用ScheduledThreadPoolExecutor。

首先,我们需要创建一个ScheduledExecutorService:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

然后,我们可以像ThreadPoolExecutor一样提交任务:

Runnable task = () -> {
    System.out.println("Task executed at " + System.currentTimeMillis());
};

executor.submit(task);

当然,我们也可以使用schedule()或者scheduleAtFixedRate()方法提交任务,只需要把任务封装为Runnable或者Callable接口即可。

4、异常处理

在任务执行过程中,如果任务抛出了异常,ScheduledThreadPoolExecutor将并不会打印异常堆栈信息,也不会将异常抛到调用方。因此,我们需要在任务中显式地处理异常。

Runnable task = () -> {
    try {
        // ...
    } catch (Exception e) {
        e.printStackTrace();
    }
};

三、ScheduledThreadPoolExecutor的拓展

1、线程池监控

使用ScheduledThreadPoolExecutor可以实现线程池的监控,比如通过定期打印线程池的状态来观察线程池的健康状态。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
ScheduledFuture future = executor.scheduleAtFixedRate(() -> {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
        System.out.println("Pool Size: " + threadPoolExecutor.getPoolSize());
        System.out.println("Active Count: " + threadPoolExecutor.getActiveCount());
        System.out.println("Completed Task Count: " + threadPoolExecutor.getCompletedTaskCount());
        System.out.println("Task Count: " + threadPoolExecutor.getTaskCount());
    }, 0, 1, TimeUnit.SECONDS);

// 在程序结束时停止监控线程池
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        future.cancel(true);
        executor.shutdown();
    }));

上面的代码中,我们通过修改线程池的拒绝策略来打印线程池的状态信息。在程序结束时,我们需要使用ShutdownHook来关闭线程池,确保程序结束时不会留下线程池中的未完成任务。

2、拒绝策略

当任务队列已满时,如果再提交新的任务,ScheduledThreadPoolExecutor默认的拒绝策略是抛出RejectedExecutionException异常。为了避免出现这种情况,我们可以自定义拒绝策略。

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (executor.isShutdown()) {
            // 如果线程池已经关闭,则直接拒绝任务
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
        } else {
            // 否则,将任务放回任务队列,尝试继续执行。
            executor.getQueue().offer(r);
        }
    }
}

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new CustomRejectedExecutionHandler());

上面的代码中,我们实现了一个自定义的拒绝策略CustomRejectedExecutionHandler。当任务队列已满时,我们会首先检测线程池是否已经关闭。如果线程池已经关闭,则直接抛出拒绝执行异常,否则将任务放回任务队列,等待线程池的新线程来执行。

3、定时任务比较工具

在一些需要进行定期任务处理的项目中,需要精确地掌握每个任务的执行情况。定时任务比较工具可以对比实际执行时间与定时时间之间的误差,这样就可以帮助我们调整任务的定时精度,保证任务的准确执行。

public class ScheduleTimeComparator implements Comparable {
    private final Long scheduleTime;
    private final Long actualTime;

    public ScheduleTimeComparator(Long scheduleTime, Long actualTime) {
        this.scheduleTime = scheduleTime;
        this.actualTime = actualTime;
    }

    public boolean isDelay() {
        return actualTime - scheduleTime > 0;
    }

    public Long getDelay() {
        return actualTime - scheduleTime;
    }

    @Override
    public int compareTo(ScheduleTimeComparator o) {
        return this.actualTime.compareTo(o.actualTime);
    }
}

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
List
    list = new ArrayList<>();

Runnable task = () -> {
    list.add(new ScheduleTimeComparator(System.currentTimeMillis() - 2000, System.currentTimeMillis()));
};

for (int i = 0; i < 10; i++) {
    executor.schedule(task, i * 100, TimeUnit.MILLISECONDS);
}

Thread.sleep(5000);

list.sort(Comparator.comparing(ScheduleTimeComparator::getDelay).reversed());

for (int i = 0; i < list.size(); i++) {
    ScheduleTimeComparator element = list.get(i);
    System.out.println(String.format("Index=%d, Scheduled Time=%d, Actual Time=%d, Delay=%d, Is Delay=%b", i, element.scheduleTime, element.actualTime, element.getDelay(), element.isDelay()));
}

   
  

上面的代码中,我们通过创建一个ScheduleTimeComparator类来记录定时时间和实际执行时间之间的误差。当所有任务执行完成后,我们对误差进行排序并输出。

四、总结

本文介绍了ScheduledThreadPoolExecutor的使用方法和拓展,包括定时任务、周期性任务、线程池监控、拒绝策略、定时任务比较工具等。在实现复杂任务调度的过程中,ScheduledThreadPoolExecutor是一个非常实用的工具。