随着业务量的增长,单机版 Quartz 已经难以应对分布式任务管理的需求,此时集群版 Quartz 就迎刃而解了。集群版 Quartz 的优点是可以将多台机器作为工作节点来执行任务,这个方案可以提高任务的可靠性、性能和吞吐量。接下来,将从多个方面详细介绍集群版 Quartz 的实现原理和使用方法。
一、配置文件
Quartz 集群的配置可以放在属性文件中,集群中每个节点都需要使用相同的配置文件。下面是一个简单的示例:
org.quartz.scheduler.instanceName = MyClusteredScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 25
org.quartz.threadPool.threadPriority = 5
其中,org.quartz.scheduler.instanceName
和 org.quartz.scheduler.instanceId
是必须的。
二、消息通讯
Quartz 集群依靠 JDBC 或者 RMI 方式进行消息通讯,以实现不同节点之间的任务调度信息共享。在消息通讯的过程中,需要注意下面的事项:
- 消息通讯组件不能够出现重复 id 的节点;
- 消息通讯组件的状态一定要准确。 同时,集群中所有节点的数据库连接信息必须相同。
三、任务执行原则
在 Quartz 集群模型中,所有的任务都必须实现 Job
接口,并且该接口应该是 StatefulJob
的子接口。StatefulJob
接口要求任务必须是有状态的并且支持并发执行。这个要求是 Quartz 集群正常运行必须遵守的规则,并且可以通过实现 StatefulJob
接口来实现。
为了确保任务被正确分配到集群中各个节点,我们需要在任务类的添加注解 @PersistJobDataAfterExecution
。这个注解会在执行任务之前保存任务当前状态,并在任务执行完成后更新状态到数据库。这种方式可以确保任务能够正确的在不同节点之间传递。
四、任务负载均衡
任务负载均衡是 Quartz 集群中特别重要的一个方面。Quartz 使用平衡负载策略以确保在集群中任务的平衡的分配。 四种负载均衡策略如下:
- 默认的负载均衡策略:按照最近的一次执行时间靠前的优先被执行;
- Round Robin 策略:所有节点轮流执行任务,每个节点会轮流执行一次;
- 随机策略:随机挑选一个节点去执行任务;
- 基于权重的策略:按照节点配置的权重来分配任务,权重大的节点分配到的任务数量也会比较多。
下面是一个
TaskDispatchSelector
的示例,可以用来选择节点:
public class TaskDispatchSelector implements LoadBalanceTaskDispatcher {
public void dispatchTask(List<JobExecutionContext> ctxs) {
JobExecutionContext selectedContext = null;
try {
//......
//根据权重选择节点
selectedContext = new WeightedLoadBalancedScheduler().instanceIdSchedulerMap.entrySet().stream().max(Comparator.comparingInt((entry) -> entry.getValue().getThreadPool().getBusyThreadCount())).get().getValue().getContext();
//......
selectedContext.getScheduler().triggerJob(selectedContext.getJobDetail().getKey());
} catch (Exception e) {
throw new MyException("Dispatch Quartz Task Fail.", e);
}
}
}
五、运行模式
可以在两种运行模式之间切换:
- 只读模式:节点向集群中获取任务并执行;
- 读写模式:节点不仅获取任务,还会去分配或者删除任务。
可以使用
JobStoreCMT
来实现读写模式。下面是一个JobFactory
实现的示例:
public class JobFactory extends AdaptableJobFactory {
@Autowired
ApplicationContext applicationContext;
/**
* 实例化任务类
* @param bundle
* @param method
* @return
* @throws Exception
*/
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
JobDetail jobDetail = bundle.getJobDetail();
Class<?> jobClass = jobDetail.getJobClass();
return applicationContext.getBean(jobClass);
}
}
六、集群初始化及关闭
对于集群的初始化,需要确保所有节点都按照相同的配置启动 JobScheduler
。关闭集群时需要调用 shutdown()
方法,不要在代码中手动关闭 Scheduler
。
下面是一个启动和关闭 Quartz 集群的示例:
public class QuartzClusterRunner {
public void start() {
try {
// 初始化Quartz
SchedulerFactory sf = new StdSchedulerFactory("quartz.properties");
Scheduler scheduler = sf.getScheduler();
//监听Quartz scheduler的启动与关闭事件,和job调度事件
scheduler.getListenerManager().addSchedulerListener(new MySchedulerListenerAdapter());
scheduler.start();
LOGGER.info("Quartz Scheduler started...");
} catch (Throwable e) {
LOGGER.error("Quartz Scheduler start error: {}", e.getMessage(), e);
}
}
public void stop() {
try {
LOGGER.info("Shutting Quartz Scheduler down...");
SchedulerFactory sf = new StdSchedulerFactory(); //注意这里没有指定属性文件
Scheduler scheduler = sf.getScheduler();
scheduler.shutdown();
LOGGER.info("Quartz Scheduler shut down.");
} catch (Throwable e) {
LOGGER.error("Quartz Scheduler stop error: {}", e.getMessage(), e);
}
}
}
致谢
本文档参考了官方文档和网上的一些文章,特此致谢: