找回密码
 立即注册
首页 业界区 安全 DolphinScheduler心脏:Quartz的定时任务调度框架深度解 ...

DolphinScheduler心脏:Quartz的定时任务调度框架深度解析

搁胱 2025-6-10 19:11:43
Quartz是一个开源的Java作业调度框架,它提供了强大的定时任务调度功能。在DolphinScheduler中,Quartz用于实现定时任务的调度和管理。DolphinScheduler通过QuartzExecutorImpl类与Quartz集成,将工作流及其定时管理操作与Quartz调度框架相结合,实现任务的调度执行。
本文将详细剖析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。
Quartz ER图

1.webp


  • QRTZ_JOB_DETAILS 和 QRTZ_TRIGGERS 是中心表,定义了任务与触发器之间的关系;
  • QRTZ_TRIGGERS 表通过外键关联了多个触发器类型表,如 QRTZ_SIMPLE_TRIGGERS 和 QRTZ_CRON_TRIGGERS,用于实现不同类型的触发方式;
  • QRTZ_FIRED_TRIGGERS 用于记录每次任务执行的历史,与任务和触发器表都有关联;
  • QRTZ_CALENDARS 用于定义触发器的日历排除规则,QRTZ_PAUSED_TRIGGER_GRPS 用于管理触发器组的暂停状态;
  • QRTZ_SCHEDULER_STATE 和 QRTZ_LOCKS 主要用于集群环境中的任务调度协调,确保高可用性。
Dolphinscheduler Quartz使用

新建SHELL任务

2.webp

流程定义上线并配置调度

3.webp

4.webp

定时上线

5.webp

流程实例运行结果

6.webp

原理剖析

创建调度
  1. org.apache.dolphinscheduler.api.controller.SchedulerController#createSchedule
  2. --org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#insertSchedule
  3. ....
  4. Schedule scheduleObj = new Schedule();
  5. Date now = new Date();
  6. scheduleObj.setTenantCode(tenantCode);
  7. scheduleObj.setProjectName(project.getName());
  8. scheduleObj.setProcessDefinitionCode(processDefineCode);
  9. scheduleObj.setProcessDefinitionName(processDefinition.getName());
  10. ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
  11. scheduleObj.setCrontab(scheduleParam.getCrontab());
  12. scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());
  13. scheduleObj.setWarningType(warningType);
  14. scheduleObj.setWarningGroupId(warningGroupId);
  15. scheduleObj.setFailureStrategy(failureStrategy);
  16. scheduleObj.setCreateTime(now);
  17. scheduleObj.setUpdateTime(now);
  18. scheduleObj.setUserId(loginUser.getId());
  19. scheduleObj.setUserName(loginUser.getUserName());
  20. scheduleObj.setReleaseState(ReleaseState.OFFLINE);
  21. scheduleObj.setProcessInstancePriority(processInstancePriority);
  22. scheduleObj.setWorkerGroup(workerGroup);
  23. scheduleObj.setEnvironmentCode(environmentCode);
  24. scheduleMapper.insert(scheduleObj);
  25. ....
复制代码
核心其实就是向 schedule 表中插入了一条数据而已,如下 :
7.webp

调度上线
  1. org.apache.dolphinscheduler.api.controller.SchedulerController#publishScheduleOnline
  2. --org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#onlineScheduler
  3. ----org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#doOnlineScheduler
  4. ------org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler#insertOrUpdateScheduleTask
  5. 精简代码如下 :
  6. // TODO 使用schedule id和projectId封装 JobKey,比如jobName=job_25(schedulerId),jobGroup=jobgroup_1(projectId)
  7. JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);
  8. // TODO 使用projectId和schedule封装jobDataMap,里面封装的是projectId、scheduleId和schedule(JSON存储)
  9. Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);
  10. // TODO 获取cron表达式
  11. String cronExpression = schedule.getCrontab();
  12. // TODO 获取时区
  13. String timezoneId = schedule.getTimezoneId();
  14. // TODO 定时调度的开启时间
  15. Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
  16. // TODO 定时调度的结束时间
  17. Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
  18. jobDetail jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();
  19. jobDetail.getJobDataMap().putAll(jobDataMap);
  20. // TODO 创建一个Job
  21. scheduler.addJob(jobDetail, false, true);
  22. // TODO 封装Trigger
  23. TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
  24. CronTrigger cronTrigger = newTrigger()
  25.                     .withIdentity(triggerKey)
  26.                     .startAt(startDate)
  27.                     .endAt(endDate)
  28.                     .withSchedule(
  29.                             cronSchedule(cronExpression)
  30.                                     .withMisfireHandlingInstructionIgnoreMisfires()
  31.                                     .inTimeZone(DateUtils.getTimezone(timezoneId)))
  32.                     .forJob(jobDetail).build();
  33. // TODO 开始调度
  34. scheduler.scheduleJob(cronTrigger);
复制代码
对应的表

存储每个任务的详细信息
8.webp

存储触发器的基本信息,是所有触发器类型的父表
9.webp

存储 Cron 表达式触发器(Cron Trigger)的信息
10.webp

调度执行
  1. org.apache.dolphinscheduler.scheduler.quartz.ProcessScheduleTask,这个类是 qrtz_job_details 中的 JOB_CLASS_NAME 字段
  2. protected void executeInternal(JobExecutionContext context) {
  3.     JobDataMap dataMap = context.getJobDetail().getJobDataMap();
  4.     int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);
  5.     int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
  6.     Date scheduledFireTime = context.getScheduledFireTime();
  7.     Date fireTime = context.getFireTime();
  8.     Command command = new Command();
  9.     command.setCommandType(CommandType.SCHEDULER);
  10.     command.setExecutorId(schedule.getUserId());
  11.     command.setFailureStrategy(schedule.getFailureStrategy());
  12.     command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
  13.     command.setScheduleTime(scheduledFireTime);
  14.     command.setStartTime(fireTime);
  15.     command.setWarningGroupId(schedule.getWarningGroupId());
  16.     String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP
  17.             : schedule.getWorkerGroup();
  18.     command.setWorkerGroup(workerGroup);
  19.     command.setTenantCode(schedule.getTenantCode());
  20.     command.setEnvironmentCode(schedule.getEnvironmentCode());
  21.     command.setWarningType(schedule.getWarningType());
  22.     command.setProcessInstancePriority(schedule.getProcessInstancePriority());
  23.     command.setProcessDefinitionVersion(processDefinition.getVersion());
  24.     commandService.createCommand(command);
  25. }
复制代码
说白了,这个就是quartz的一个回调函数,最终生成Command。
转载自Journey
原文链接:https://segmentfault.com/a/1190000045471756
本文由 白鲸开源 提供发布支持!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册