详解Quartz集群

发布时间:2023-05-21

随着业务量的增长,单机版 Quartz 已经难以应对分布式任务管理的需求,此时集群版 Quartz 就迎刃而解了。集群版 Quartz 的优点是可以将多台机器作为工作节点来执行任务,这个方案可以提高任务的可靠性、性能和吞吐量。接下来,将从多个方面详细介绍集群版 Quartz 的实现原理和使用方法。

一、配置文件

Quartz 集群的配置可以放在属性文件中,集群中每个节点都需要使用相同的配置文件。下面是一个简单的示例:

org.quartz.scheduler.instanceName = MyClusteredScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 25
org.quartz.threadPool.threadPriority = 5

其中,org.quartz.scheduler.instanceNameorg.quartz.scheduler.instanceId 是必须的。

二、消息通讯

Quartz 集群依靠 JDBC 或者 RMI 方式进行消息通讯,以实现不同节点之间的任务调度信息共享。在消息通讯的过程中,需要注意下面的事项:

  1. 消息通讯组件不能够出现重复 id 的节点;
  2. 消息通讯组件的状态一定要准确。 同时,集群中所有节点的数据库连接信息必须相同。

三、任务执行原则

在 Quartz 集群模型中,所有的任务都必须实现 Job 接口,并且该接口应该是 StatefulJob 的子接口。StatefulJob 接口要求任务必须是有状态的并且支持并发执行。这个要求是 Quartz 集群正常运行必须遵守的规则,并且可以通过实现 StatefulJob 接口来实现。 为了确保任务被正确分配到集群中各个节点,我们需要在任务类的添加注解 @PersistJobDataAfterExecution。这个注解会在执行任务之前保存任务当前状态,并在任务执行完成后更新状态到数据库。这种方式可以确保任务能够正确的在不同节点之间传递。

四、任务负载均衡

任务负载均衡是 Quartz 集群中特别重要的一个方面。Quartz 使用平衡负载策略以确保在集群中任务的平衡的分配。 四种负载均衡策略如下:

  1. 默认的负载均衡策略:按照最近的一次执行时间靠前的优先被执行;
  2. Round Robin 策略:所有节点轮流执行任务,每个节点会轮流执行一次;
  3. 随机策略:随机挑选一个节点去执行任务;
  4. 基于权重的策略:按照节点配置的权重来分配任务,权重大的节点分配到的任务数量也会比较多。 下面是一个 TaskDispatchSelector 的示例,可以用来选择节点:
public class TaskDispatchSelector implements LoadBalanceTaskDispatcher {
    public void dispatchTask(List<JobExecutionContext> ctxs) {
        JobExecutionContext selectedContext = null;
        try {
            //......
            //根据权重选择节点
            selectedContext = new WeightedLoadBalancedScheduler().instanceIdSchedulerMap.entrySet().stream().max(Comparator.comparingInt((entry) -> entry.getValue().getThreadPool().getBusyThreadCount())).get().getValue().getContext();
            //......
            selectedContext.getScheduler().triggerJob(selectedContext.getJobDetail().getKey());
        } catch (Exception e) {
            throw new MyException("Dispatch Quartz Task Fail.", e);
        }
    }
}

五、运行模式

可以在两种运行模式之间切换:

  1. 只读模式:节点向集群中获取任务并执行;
  2. 读写模式:节点不仅获取任务,还会去分配或者删除任务。 可以使用 JobStoreCMT 来实现读写模式。下面是一个 JobFactory 实现的示例:
public class JobFactory extends AdaptableJobFactory {
    @Autowired
    ApplicationContext applicationContext;
    /**
     * 实例化任务类
     * @param bundle
     * @param method
     * @return
     * @throws Exception
     */
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        JobDetail jobDetail = bundle.getJobDetail();
        Class<?> jobClass = jobDetail.getJobClass();
        return applicationContext.getBean(jobClass);
    }
}

六、集群初始化及关闭

对于集群的初始化,需要确保所有节点都按照相同的配置启动 JobScheduler。关闭集群时需要调用 shutdown() 方法,不要在代码中手动关闭 Scheduler。 下面是一个启动和关闭 Quartz 集群的示例:

public class QuartzClusterRunner {
    public void start() {
        try {
            // 初始化Quartz
            SchedulerFactory sf = new StdSchedulerFactory("quartz.properties");
            Scheduler scheduler = sf.getScheduler();
            //监听Quartz scheduler的启动与关闭事件,和job调度事件
            scheduler.getListenerManager().addSchedulerListener(new MySchedulerListenerAdapter());
            scheduler.start();
            LOGGER.info("Quartz Scheduler started...");
        } catch (Throwable e) {
            LOGGER.error("Quartz Scheduler start error: {}", e.getMessage(), e);
        }
    }
    public void stop() {
        try {
            LOGGER.info("Shutting Quartz Scheduler down...");
            SchedulerFactory sf = new StdSchedulerFactory();  //注意这里没有指定属性文件
            Scheduler scheduler = sf.getScheduler();
            scheduler.shutdown();
            LOGGER.info("Quartz Scheduler shut down.");
        } catch (Throwable e) {
            LOGGER.error("Quartz Scheduler stop error: {}", e.getMessage(), e);
        }
    }
}

致谢

本文档参考了官方文档和网上的一些文章,特此致谢:

  1. Quartz 官方文档
  2. 博客: