找回密码
 立即注册
首页 业界区 业界 大疆不同任务类型执行逻辑,上云API源码分析 ...

大疆不同任务类型执行逻辑,上云API源码分析

迭婵椟 昨天 21:15
大疆不同任务类型执行逻辑,上云API源码分析

大疆司空2中有不同的任务类型:立即任务、定时任务、条件任务。
最初我们实现时,选择的是用Quartz创建定时任务,调用API中executeFlightTask接口实现任务下发。
在功能实现之后,随着对API的深入了解,发现大疆API中有相关的任务下发逻辑。
当时只看了实现逻辑,因为活比较多,加上懒,就一直拖到现在。
本文是对源码的学习总结。
所有任务下发的逻辑都从publishFlightTask()方法开始。
com.dji.sample.wayline.service.impl.FlightTaskServiceImpl#publishFlightTask
立即任务


  • fillImmediateTime(param)方法,如果是 “立即任务(IMMEDIATE)”,就把任务的执行日期taskDays与执行时间段taskPeriods强制设置为当前服务器时间。
  • 由于taskDays = [now],taskPeriods = [ [now] ],所以只会进入循环一次,并且beginTime = 当前毫秒时间点,endTime = beginTime。
  • 创建一条飞行任务记录存到数据库中---- waylineJobOpt。
  • 调用publishOneFlight()方法发布任务
[1]publishFlightTask
  1.     /**
  2.      * 发布飞行任务的核心入口方法。
  3.      * 功能:
  4.      * 1. 处理立即任务(IMMEDIATE),强制以服务器当前时间作为任务执行时间;
  5.      * 2. 根据用户提交的任务日期(taskDays)与时间段(taskPeriods),
  6.      *    生成多个 beginTime/endTime 的任务实例;
  7.      * 3. 为每个时间段创建一条 WaylineJob(航线任务记录);
  8.      * 4. 若为条件任务,写入对应的触发条件;
  9.      * 5. 立即调用 publishOneFlightTask 将任务下发给飞行设备(Dock);
  10.      * 6. 如果任何一次下发失败,则返回失败并终止流程;
  11.      * 7. 所有任务下发成功后返回成功响应。
  12.      */
  13.     @Override
  14.     public HttpResultResponse publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
  15.         fillImmediateTime(param);
  16.         //立即任务只会进入循环一次
  17.         for (Long taskDay : param.getTaskDays()) {
  18.             LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault());
  19.             for (List<Long> taskPeriod : param.getTaskPeriods()) {
  20.                 //立即任务的beginTime = endTime = 当前毫秒时间
  21.                 long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault()))
  22.                         .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
  23.                 long endTime = taskPeriod.size() > 1 ?
  24.                         LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault()))
  25.                                 .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime;
  26.                 //立即任务直接跳过这个判断
  27.                 if (TaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) {
  28.                     continue;
  29.                 }
  30.                 //创建一条飞行任务记录
  31.                 Optional<WaylineJobDTO> waylineJobOpt = waylineJobService.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime);
  32.                 if (waylineJobOpt.isEmpty()) {
  33.                     throw new SQLException("Failed to create wayline job.");
  34.                 }
  35.                 WaylineJobDTO waylineJob = waylineJobOpt.get();
  36.                
  37.                 //立即任务直接跳过这个方法
  38.                 // If it is a conditional task type, add conditions to the job parameters.
  39.                 addConditions(waylineJob, param, beginTime, endTime);
  40.                 //发布任务
  41.                 HttpResultResponse response = this.publishOneFlightTask(waylineJob);
  42.                 if (HttpResultResponse.CODE_SUCCESS != response.getCode()) {
  43.                     return response;
  44.                 }
  45.             }
  46.         }
  47.         return HttpResultResponse.success();
  48.     }
  49.    
  50.     /**
  51.      * 如果任务类型为 IMMEDIATE(立即任务),则忽略用户传入的任何日期和时间段,
  52.      * 将任务的执行日期(taskDays)与执行时间段(taskPeriods)强制设置为当前服务器时间。
  53.      * 立即任务必须由服务器当前时间触发,不允许由客户端自定义时间。
  54.      */
  55.     private void fillImmediateTime(CreateJobParam param) {
  56.         if (TaskTypeEnum.IMMEDIATE != param.getTaskType()) {
  57.             return;
  58.         }
  59.         long now = System.currentTimeMillis() / 1000;
  60.         param.setTaskDays(List.of(now));
  61.         param.setTaskPeriods(List.of(List.of(now)));
  62.     }
复制代码

  • deviceRedisService.checkDeviceOnline判断机场是否在线
  • 调用prepareFlightTask方法,下发飞行准备指令
  • 调用executeFlightTask方法,下发飞行任务执行指令
[2]publishOneFlight
  1.     public HttpResultResponse publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException {
  2.         //判断机场是否在线
  3.         boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
  4.         if (!isOnline) {
  5.             throw new RuntimeException("Dock is offline.");
  6.         }
  7.         //下发飞行准备指令
  8.         boolean isSuccess = this.prepareFlightTask(waylineJob);
  9.         if (!isSuccess) {
  10.             return HttpResultResponse.error("Failed to prepare job.");
  11.         }
  12.         //下发立即任务执行指令
  13.         // Issue an immediate task execution command.
  14.         if (TaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) {
  15.             if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) {
  16.                 return HttpResultResponse.error("Failed to execute job.");
  17.             }
  18.         }
  19.         ..............(省略部分代码)
  20.         return HttpResultResponse.success();
  21.     }
复制代码
定时任务


  • 遍历定时任务设置中每一天的每一个时间段
  • 忽略已经过期的时间段
  • 创建waylineJob并存到数据库中。
  • 调用publishOneFlightTask()发布任务
[1]publishFlightTask
  1.     @Override
  2.     public HttpResultResponse publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
  3.         //定时任务在这个方法中直接返回,不会进行任何处理
  4.         fillImmediateTime(param);
  5.         //遍历每一天每一个时间段
  6.         for (Long taskDay : param.getTaskDays()) {
  7.             LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault());
  8.             for (List<Long> taskPeriod : param.getTaskPeriods()) {
  9.                 long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault()))
  10.                         .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
  11.                 long endTime = taskPeriod.size() > 1 ?
  12.                         LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault()))
  13.                                 .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime;
  14.                
  15.                 //忽略已经过期的时间段
  16.                 if (TaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) {
  17.                     continue;
  18.                 }
  19.                 //创建waylineJob
  20.                 Optional<WaylineJobDTO> waylineJobOpt = waylineJobService.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime);
  21.                 if (waylineJobOpt.isEmpty()) {
  22.                     throw new SQLException("Failed to create wayline job.");
  23.                 }
  24.                 WaylineJobDTO waylineJob = waylineJobOpt.get();
  25.                
  26.                 //不是条件任务,进入这个方法会直接返回。
  27.                 // If it is a conditional task type, add conditions to the job parameters.
  28.                 addConditions(waylineJob, param, beginTime, endTime);
  29.                 //发布任务
  30.                 HttpResultResponse response = this.publishOneFlightTask(waylineJob);
  31.                 if (HttpResultResponse.CODE_SUCCESS != response.getCode()) {
  32.                     return response;
  33.                 }
  34.             }
  35.         }
  36.         return HttpResultResponse.success();
  37.     }
复制代码

  • deviceRedisService.checkDeviceOnline检查机场是否在线。
  • 调用prepareFlightTask方法,下发飞行准备指令.
  • 把定时任务添加到Redis的一个Sorted Set(有序集合)里,用任务的开始时间作为排序依据。
[2]publishOneFlightTask
  1.     public HttpResultResponse publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException {
  2.         //检查机场是否在线
  3.         boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
  4.         if (!isOnline) {
  5.             throw new RuntimeException("Dock is offline.");
  6.         }
  7.         //下发飞行准备指令
  8.         boolean isSuccess = this.prepareFlightTask(waylineJob);
  9.         if (!isSuccess) {
  10.             return HttpResultResponse.error("Failed to prepare job.");
  11.         }
  12.         
  13.         ..........(省略部分代码)
  14.         
  15.         //把定时任务添加到Redis有序集合里,用开始时间排序
  16.         if (TaskTypeEnum.TIMED == waylineJob.getTaskType()) {
  17.             // key: wayline_job_timed, value: {workspace_id}:{dock_sn}:{job_id}
  18.             boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE,
  19.                     waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(),
  20.                     waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
  21.             if (!isAdd) {
  22.                 return HttpResultResponse.error("Failed to create scheduled job.");
  23.             }
  24.         }
  25.         return HttpResultResponse.success();
  26.     }
复制代码

  • checkScheduledJob定时任务,每5s对 Redis 中的定时任务集合扫描一次。
  • 获取最早的定时任务,拆解任务信息。
  • 如果任务信息开始比现在早30s (offset)以上,认为任务过期,删除 Redis 队列中的任务,更新任务状态为失败。
  • 如果任务执行时间在now + offset这个时间区间内,调用executeFlightTask下发执行任务命令,且无论成功或失败,都从 Redis 队列中删除任务信息。
[3]checkScheduledJob定时任务
[code]    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)    public void checkScheduledJob() {                //获取最早的定时任务,并拆解任务信息        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);        if (Objects.isNull(jobIdValue)) {            return;        }        log.info("Check the timed tasks of the wayline. {}", jobIdValue);        // format: {workspace_id}:{dock_sn}:{job_id}        String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);        long now = System.currentTimeMillis();        int offset = 30_000;                //任务信息开始比现在早30s以上,认为任务过期,删除Redis队列中的任务,更新任务状态为失败。        // Expired tasks are deleted directly.        if (time < now - offset) {            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);            waylineJobService.updateJob(WaylineJobDTO.builder()                    .jobId(jobArr[2])                    .status(WaylineJobStatusEnum.FAILED.getVal())                    .executeTime(LocalDateTime.now())                    .completedTime(LocalDateTime.now())                    .code(HttpStatus.SC_REQUEST_TIMEOUT).build());            return;        }        //判断任务执行时间在now + offset这个时间区间内,下发执行任务命令。        if (now

相关推荐

您需要登录后才可以回帖 登录 | 立即注册