← 返回首页
🌐

分布式调度:Quartz/XXL-Job/ElasticJob

📂 architecture ⏱ 2 min 362 words

分布式调度:Quartz/XXL-Job/ElasticJob

分布式调度核心挑战

分布式调度需要解决任务分片、故障转移、负载均衡、幂等执行等问题。不同框架采用不同的架构策略,但核心目标一致:保证任务准确、高效、可靠地执行。

// 任务调度器核心接口
public interface TaskScheduler {
    void schedule(String taskId, ScheduleConfig config, TaskExecutor executor);
    void unschedule(String taskId);
    void pause(String taskId);
    void resume(String taskId);
    List<ScheduledTask> getScheduledTasks();
}

// 任务执行器
@FunctionalInterface
public interface TaskExecutor {
    TaskResult execute(TaskContext context);
}

// 任务执行结果
public class TaskResult {
    private final boolean success;
    private final String message;
    private final Map<String, Object> output;
}

Quartz集群调度

Quartz通过数据库锁实现集群调度,多个节点竞争执行任务。JobStore使用悲观锁保证同一任务只有一个节点执行。

// Quartz集群配置
@Configuration
public class QuartzConfig {
    @Bean
    public Scheduler scheduler() throws SchedulerException {
        SchedulerFactory factory = new StdSchedulerFactory();
        
        // 集群配置
        Properties props = new Properties();
        props.setProperty("org.quartz.scheduler.instanceId", "AUTO");
        props.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        props.setProperty("org.quartz.jobStore.driverDelegateClass", 
            "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
        props.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_");
        props.setProperty("org.quartz.jobStore.isClustered", "true");
        props.setProperty("org.quartz.jobStore.clusterCheckinInterval", "15000");
        props.setProperty("org.quartz.jobStoreMisfireThreshold", "60000");
        
        factory.initialize(props);
        return factory.getScheduler();
    }
}

// 定义任务
@DisallowConcurrentExecution
public class DataSyncJob implements Job {
    @Override
    public void execute(JobExecutionContext context) {
        JobDataMap dataMap = context.getMergedJobDataMap();
        String syncType = dataMap.getString("syncType");
        
        // 执行数据同步
        dataSyncService.sync(syncType);
    }
}

// 调度任务
JobDetail job = JobBuilder.newJob(DataSyncJob.class)
    .withIdentity("dataSync", "etl")
    .usingJobData("syncType", "full")
    .build();

Trigger trigger = TriggerBuilder.newTrigger()
    .withIdentity("dataSyncTrigger", "etl")
    .withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?"))
    .build();

scheduler.scheduleJob(job, trigger);

XXL-Job调度中心

XXL-Job采用调度中心和执行器分离的架构,支持可视化管理、分片广播、故障转移。

// XXL-Job执行器配置
@Component
public class XxlJobConfig {
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
        executor.setAppname("my-executor");
        executor.setAdminAddresses("http://admin1:8080,http://admin2:8080");
        executor.setPort(9999);
        executor.setLogPath("/data/applogs/xxl-job/jobhandler");
        executor.setLogRetentionDays(30);
        return executor;
    }
}

// 分片广播任务
@XxlJob("shardingJobHandler")
public void shardingJobHandler() {
    // 获取分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();
    
    // 根据分片处理数据
    List<Data> dataList = dataMapper.findByShard(shardIndex, shardTotal);
    
    for (Data data : dataList) {
        processData(data);
    }
    
    XxlJobHelper.handleSuccess("Shard " + shardIndex + "/" + shardTotal + " completed");
}

ElasticJob分片与故障转移

ElasticJob基于ZooKeeper实现分布式协调,支持弹性扩容、故障转移和作业分片。

// ElasticJob配置
@Configuration
public class ElasticJobConfig {
    @Bean
    public CoordinatorRegistryCenter coordinatorRegistryCenter() {
        return new ZookeeperRegistryCenter("localhost:2181", "/elastic-job");
    }
    
    @Bean
    public SpringScheduler springScheduler(CoordinatorRegistryCenter regCenter) {
        return new SpringScheduler(regCenter, 
            JobConfiguration.newBuilder("dataSyncJob", 3)  // 3个分片
                .cron("0 0/5 * * * ?")
                .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
                .failover(true)
                .jobParameter("dataSource=mysql")
                .build());
    }
}

// 分片处理
@Component
public class DataSyncJob implements SimpleJob {
    @Autowired
    private DataService dataService;
    
    @Override
    public void execute(ShardingContext context) {
        String shardParam = context.getShardingParameter();
        int shardIndex = context.getShardingItem();
        
        // 根据分片参数处理对应区域的数据
        List<Record> records = dataService.getRecordsByRegion(shardParam);
        
        for (Record record : records) {
            try {
                dataService.process(record);
            } catch (Exception e) {
                // 故障转移:将失败任务重新分配
                log.error("Processing failed for record: {}", record.getId(), e);
                throw e;
            }
        }
    }
}

调度框架对比

特性 Quartz XXL-Job ElasticJob
架构 数据库集群 调度中心+执行器 ZooKeeper协调
分片 不支持 广播分片 弹性分片
故障转移 有限 支持 完善
可视化 完善 简单
学习成本 中等