分布式调度:Quartz/XXL-Job/ElasticJob
分布式调度:Quartz/XXL-Job/ElasticJob
分布式调度核心挑战
分布式调度需要解决任务分片、故障转移、负载均衡、幂等执行等问题。不同框架采用不同的架构策略,但核心目标一致:保证任务准确、高效、可靠地执行。
// 任务调度器核心接口
public interface TaskScheduler {
void schedule(String taskId, ScheduleConfig config, TaskExecutor executor);
void unschedule(String taskId);
void pause(String taskId);
void resume(String taskId);
List<ScheduledTask> getScheduledTasks();
}
// 任务执行器
@FunctionalInterface
public interface TaskExecutor {
TaskResult execute(TaskContext context);
}
// 任务执行结果
public class TaskResult {
private final boolean success;
private final String message;
private final Map<String, Object> output;
}
Quartz集群调度
Quartz通过数据库锁实现集群调度,多个节点竞争执行任务。JobStore使用悲观锁保证同一任务只有一个节点执行。
// Quartz集群配置
@Configuration
public class QuartzConfig {
@Bean
public Scheduler scheduler() throws SchedulerException {
SchedulerFactory factory = new StdSchedulerFactory();
// 集群配置
Properties props = new Properties();
props.setProperty("org.quartz.scheduler.instanceId", "AUTO");
props.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
props.setProperty("org.quartz.jobStore.driverDelegateClass",
"org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
props.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
props.setProperty("org.quartz.jobStore.isClustered", "true");
props.setProperty("org.quartz.jobStore.clusterCheckinInterval", "15000");
props.setProperty("org.quartz.jobStoreMisfireThreshold", "60000");
factory.initialize(props);
return factory.getScheduler();
}
}
// 定义任务
@DisallowConcurrentExecution
public class DataSyncJob implements Job {
@Override
public void execute(JobExecutionContext context) {
JobDataMap dataMap = context.getMergedJobDataMap();
String syncType = dataMap.getString("syncType");
// 执行数据同步
dataSyncService.sync(syncType);
}
}
// 调度任务
JobDetail job = JobBuilder.newJob(DataSyncJob.class)
.withIdentity("dataSync", "etl")
.usingJobData("syncType", "full")
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("dataSyncTrigger", "etl")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?"))
.build();
scheduler.scheduleJob(job, trigger);
XXL-Job调度中心
XXL-Job采用调度中心和执行器分离的架构,支持可视化管理、分片广播、故障转移。
// XXL-Job执行器配置
@Component
public class XxlJobConfig {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
executor.setAppname("my-executor");
executor.setAdminAddresses("http://admin1:8080,http://admin2:8080");
executor.setPort(9999);
executor.setLogPath("/data/applogs/xxl-job/jobhandler");
executor.setLogRetentionDays(30);
return executor;
}
}
// 分片广播任务
@XxlJob("shardingJobHandler")
public void shardingJobHandler() {
// 获取分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
// 根据分片处理数据
List<Data> dataList = dataMapper.findByShard(shardIndex, shardTotal);
for (Data data : dataList) {
processData(data);
}
XxlJobHelper.handleSuccess("Shard " + shardIndex + "/" + shardTotal + " completed");
}
ElasticJob分片与故障转移
ElasticJob基于ZooKeeper实现分布式协调,支持弹性扩容、故障转移和作业分片。
// ElasticJob配置
@Configuration
public class ElasticJobConfig {
@Bean
public CoordinatorRegistryCenter coordinatorRegistryCenter() {
return new ZookeeperRegistryCenter("localhost:2181", "/elastic-job");
}
@Bean
public SpringScheduler springScheduler(CoordinatorRegistryCenter regCenter) {
return new SpringScheduler(regCenter,
JobConfiguration.newBuilder("dataSyncJob", 3) // 3个分片
.cron("0 0/5 * * * ?")
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
.failover(true)
.jobParameter("dataSource=mysql")
.build());
}
}
// 分片处理
@Component
public class DataSyncJob implements SimpleJob {
@Autowired
private DataService dataService;
@Override
public void execute(ShardingContext context) {
String shardParam = context.getShardingParameter();
int shardIndex = context.getShardingItem();
// 根据分片参数处理对应区域的数据
List<Record> records = dataService.getRecordsByRegion(shardParam);
for (Record record : records) {
try {
dataService.process(record);
} catch (Exception e) {
// 故障转移:将失败任务重新分配
log.error("Processing failed for record: {}", record.getId(), e);
throw e;
}
}
}
}
调度框架对比
| 特性 | Quartz | XXL-Job | ElasticJob |
|---|---|---|---|
| 架构 | 数据库集群 | 调度中心+执行器 | ZooKeeper协调 |
| 分片 | 不支持 | 广播分片 | 弹性分片 |
| 故障转移 | 有限 | 支持 | 完善 |
| 可视化 | 无 | 完善 | 简单 |
| 学习成本 | 低 | 低 | 中等 |