一、ScheduledThreadPoolExecutor是什么
ScheduledThreadPoolExecutor是一个多线程任务调度器,可以在指定的时间(延迟)后执行任务或定期执行任务。使用ScheduledThreadPoolExecutor可以帮助我们实现复杂的任务调度需求,如定时任务、延迟任务、周期性任务等。
ScheduledThreadPoolExecutor的主要属性:
- corePoolSize: 线程池核心线程数,即同时执行的任务数。
- maximumPoolSize: 线程池最大线程数。
- keepAliveTime: 空闲线程的存活时间。
- ThreadFactory: 线程工厂,用于创建线程对象。
- RejectedExecutionHandler: 拒绝策略,用于决定当任务队列已满时如何处理新任务。
import java.util.concurrent.*; public class ScheduledThreadPoolExample { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); Runnable task = () -> { System.out.println("Task executed at " + System.currentTimeMillis()); }; System.out.println("Scheduled to run after 2 seconds"); executor.schedule(task, 2, TimeUnit.SECONDS); System.out.println("Scheduled to run after every 3 seconds, with initial delay of 1 second"); executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS); System.out.println("Shutdown executor"); executor.shutdown(); } }
二、ScheduledThreadPoolExecutor的使用
1、延迟任务
我们可以使用schedule()方法在指定的时间后执行任务,这个时间可以用时间和时间单位(TimeUnit)表示。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); Runnable task = () -> { System.out.println("Task executed at " + System.currentTimeMillis()); }; System.out.println("Scheduled to run after 2 seconds"); executor.schedule(task, 2, TimeUnit.SECONDS);
上面的代码中,我们创建了一个ScheduledExecutorService,然后通过schedule()方法延迟2秒执行任务。在任务执行时,我们会输出任务执行时间的信息。
2、周期性任务
我们也可以使用scheduleAtFixedRate()方法定期执行任务,参数包括初始延迟时间、执行周期,以及时间单位。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); Runnable task = () -> { System.out.println("Task executed at " + System.currentTimeMillis()); }; System.out.println("Scheduled to run after every 3 seconds, with initial delay of 1 second"); executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);
上面的代码中,我们执行了一个周期性任务,并且设置了初始延迟时间为1秒,执行周期为3秒。在任务执行时,我们同样会输出任务执行时间的信息。
3、使用线程池
ScheduledThreadPoolExecutor本质上仍然是一个线程池,我们可以像使用ThreadPoolExecutor一样使用ScheduledThreadPoolExecutor。
首先,我们需要创建一个ScheduledExecutorService:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
然后,我们可以像ThreadPoolExecutor一样提交任务:
Runnable task = () -> { System.out.println("Task executed at " + System.currentTimeMillis()); }; executor.submit(task);
当然,我们也可以使用schedule()或者scheduleAtFixedRate()方法提交任务,只需要把任务封装为Runnable或者Callable接口即可。
4、异常处理
在任务执行过程中,如果任务抛出了异常,ScheduledThreadPoolExecutor将并不会打印异常堆栈信息,也不会将异常抛到调用方。因此,我们需要在任务中显式地处理异常。
Runnable task = () -> { try { // ... } catch (Exception e) { e.printStackTrace(); } };
三、ScheduledThreadPoolExecutor的拓展
1、线程池监控
使用ScheduledThreadPoolExecutor可以实现线程池的监控,比如通过定期打印线程池的状态来观察线程池的健康状态。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; System.out.println("Pool Size: " + threadPoolExecutor.getPoolSize()); System.out.println("Active Count: " + threadPoolExecutor.getActiveCount()); System.out.println("Completed Task Count: " + threadPoolExecutor.getCompletedTaskCount()); System.out.println("Task Count: " + threadPoolExecutor.getTaskCount()); }, 0, 1, TimeUnit.SECONDS); // 在程序结束时停止监控线程池 Runtime.getRuntime().addShutdownHook(new Thread(() -> { future.cancel(true); executor.shutdown(); }));
上面的代码中,我们通过修改线程池的拒绝策略来打印线程池的状态信息。在程序结束时,我们需要使用ShutdownHook来关闭线程池,确保程序结束时不会留下线程池中的未完成任务。
2、拒绝策略
当任务队列已满时,如果再提交新的任务,ScheduledThreadPoolExecutor默认的拒绝策略是抛出RejectedExecutionException异常。为了避免出现这种情况,我们可以自定义拒绝策略。
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (executor.isShutdown()) { // 如果线程池已经关闭,则直接拒绝任务 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString()); } else { // 否则,将任务放回任务队列,尝试继续执行。 executor.getQueue().offer(r); } } } ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new CustomRejectedExecutionHandler());
上面的代码中,我们实现了一个自定义的拒绝策略CustomRejectedExecutionHandler。当任务队列已满时,我们会首先检测线程池是否已经关闭。如果线程池已经关闭,则直接抛出拒绝执行异常,否则将任务放回任务队列,等待线程池的新线程来执行。
3、定时任务比较工具
在一些需要进行定期任务处理的项目中,需要精确地掌握每个任务的执行情况。定时任务比较工具可以对比实际执行时间与定时时间之间的误差,这样就可以帮助我们调整任务的定时精度,保证任务的准确执行。
public class ScheduleTimeComparator implements Comparable{ private final Long scheduleTime; private final Long actualTime; public ScheduleTimeComparator(Long scheduleTime, Long actualTime) { this.scheduleTime = scheduleTime; this.actualTime = actualTime; } public boolean isDelay() { return actualTime - scheduleTime > 0; } public Long getDelay() { return actualTime - scheduleTime; } @Override public int compareTo(ScheduleTimeComparator o) { return this.actualTime.compareTo(o.actualTime); } } ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); List list = new ArrayList<>(); Runnable task = () -> { list.add(new ScheduleTimeComparator(System.currentTimeMillis() - 2000, System.currentTimeMillis())); }; for (int i = 0; i < 10; i++) { executor.schedule(task, i * 100, TimeUnit.MILLISECONDS); } Thread.sleep(5000); list.sort(Comparator.comparing(ScheduleTimeComparator::getDelay).reversed()); for (int i = 0; i < list.size(); i++) { ScheduleTimeComparator element = list.get(i); System.out.println(String.format("Index=%d, Scheduled Time=%d, Actual Time=%d, Delay=%d, Is Delay=%b", i, element.scheduleTime, element.actualTime, element.getDelay(), element.isDelay())); }
上面的代码中,我们通过创建一个ScheduleTimeComparator类来记录定时时间和实际执行时间之间的误差。当所有任务执行完成后,我们对误差进行排序并输出。
四、总结
本文介绍了ScheduledThreadPoolExecutor的使用方法和拓展,包括定时任务、周期性任务、线程池监控、拒绝策略、定时任务比较工具等。在实现复杂任务调度的过程中,ScheduledThreadPoolExecutor是一个非常实用的工具。