訾懵 发表于 2025-5-31 23:49:38

【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程

本文将介绍在DolphinScheduler中使用ProcessBuilder执行Shell命令的方法。默认通过BashShellInterceptorBuilder封装Shell脚本并生成执行命令,支持普通模式和sudo模式运行。同时,结合Spring Boot应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度。
1、ProcessBuilder DolphinScheduler中的使用

1.1、命令的封装

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
public class ShellInterceptorBuilderFactory {

    private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");

    @SuppressWarnings("unchecked")
    public static IShellInterceptorBuilder newBuilder() {
      // TODO 默认的走的是这个逻辑
      if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
            return new BashShellInterceptorBuilder();
      }
      if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
            return new ShShellInterceptorBuilder();
      }
      if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
            return new CmdShellInterceptorBuilder();
      }
      throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
    }
}默认走的是 BashShellInterceptorBuilder。
org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder
public class BashShellInterceptorBuilder
      extends
            BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {

    @Override
    public BashShellInterceptorBuilder newBuilder() {
      return new BashShellInterceptorBuilder();
    }

    @Override
    public BashShellInterceptor build() throws FileOperateException, IOException {
      // TODO 这里是生成shell脚本的核心点
      generateShellScript();
      List<String> bootstrapCommand = generateBootstrapCommand();
      // TODO 实例化BashShellInterceptor
      return new BashShellInterceptor(bootstrapCommand, shellDirectory);
    }

    // 这个是如果不是sudo的方式,进行命令执行的前缀
    @Override
    protected String shellInterpreter() {
      return "bash";
    }

    @Override
    protected String shellExtension() {
      return ".sh";
    }

    @Override
    protected String shellHeader() {
      return "#!/bin/bash";
    }
}org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand
protected List<String> generateBootstrapCommand() {
      if (sudoEnable) {
            // TODO 默认是走这里的,其实就是sudo -u 租户 -i /opt/xx.sh
            return bootstrapCommandInSudoMode();
      }
      // TODO bash /opt/xx.sh
      return bootstrapCommandInNormalMode();
    }bootstrapCommandInSudoMode():
private List<String>


bootstrapCommandInSudoMode() {
      if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
            return bootstrapCommandInResourceLimitMode();
      }
      List<String> bootstrapCommand = new ArrayList<>();
      bootstrapCommand.add("sudo");
      if (StringUtils.isNotBlank(runUser)) {
            bootstrapCommand.add("-u");
            bootstrapCommand.add(runUser);
      }
      bootstrapCommand.add("-i");
      bootstrapCommand.add(shellAbsolutePath().toString());
      return bootstrapCommand;
    }bootstrapCommandInNormalMode():
private List<String> bootstrapCommandInNormalMode() {
      List<String> bootstrapCommand = new ArrayList<>();
      bootstrapCommand.add(shellInterpreter());
      bootstrapCommand.add(shellAbsolutePath().toString());
      return bootstrapCommand;
    }1.2、命令的执行

org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor
public abstract class BaseShellInterceptor implements IShellInterceptor {

    protected final String workingDirectory;
    protected final List<String> executeCommands;

    protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
      this.executeCommands = executeCommands;
      this.workingDirectory = workingDirectory;
    }

    @Override
    public Process execute() throws IOException {
      // init process builder
      ProcessBuilder processBuilder = new ProcessBuilder();
      // setting up a working directory
      // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
      processBuilder.directory(new File(workingDirectory));
      // merge error information to standard output stream
      processBuilder.redirectErrorStream(true);
      processBuilder.command(executeCommands);
      log.info("Executing shell command : {}", String.join(" ", executeCommands));
      return processBuilder.start();
    }
}2、最佳实践实例

2.1、pom.xml配置

<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter</artifactId>
<version>2.6.1</version>
</dependency>2.2、pom.xml配置

@SpringBootApplication
public class Application {

    public static void main(String[] args) throws Exception {
      SpringApplication.run(Application.class, args);

      List<String> executeCommands = new ArrayList<>();
      executeCommands.add("sudo");
      executeCommands.add("-u");
      executeCommands.add("qiaozhanwei");
      executeCommands.add("-i");
      executeCommands.add("/opt/test/my.sh");


      ProcessBuilder processBuilder = new ProcessBuilder();
      // setting up a working directory
      // TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
      processBuilder.directory(new File("/opt/test"));
      // merge error information to standard output stream
      processBuilder.redirectErrorStream(true);
      processBuilder.command(executeCommands);
      Process process = processBuilder.start();

      try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
            String line;
            while ((line = inReader.readLine()) != null) {
                // TODO 终端日志输出
                System.out.println(line);
            }
      } catch (Exception e) {
            e.printStackTrace();
      }


      // TODO 等10分钟,如果10分钟不结束,返回且status为false
      boolean status = process.waitFor(10, TimeUnit.MINUTES);

      System.out.println("status ->" + status);
    }
}2.3、日志输出结果

.   ____          _            __ _ _
/\\ / ___'_ __ _ _(_)_ ____ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/___)| |_)| | | | | || (_| |) ) ) )
'|____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot ::                (v2.6.1)

2024-06-15 18:33:16.090INFO 31834 --- [         main] com.journey.test.Application             : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in /Users/qiaozhanwei/IdeaProjects/springboot2)
2024-06-15 18:33:16.091INFO 31834 --- [         main] com.journey.test.Application             : No active profile set, falling back to default profiles: default
2024-06-15 18:33:16.244INFO 31834 --- [         main] com.journey.test.Application             : Started Application in 0.252 seconds (JVM running for 0.42)
Number of Maps= 1
Samples per Map = 100000
2024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Wrote input for Map #0
Starting Job
2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:8032
2024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_0931
2024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 1
2024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:1
2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_0931
2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found
2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_0931
2024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/
2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_0931
2024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false
2024-06-15 18:33:24,978 INFO mapreduce.Job:map 0% reduce 0%
2024-06-15 18:33:29,153 INFO mapreduce.Job:map 100% reduce 0%
2024-06-15 18:33:34,384 INFO mapreduce.Job:map 100% reduce 100%
2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully
2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54
    File System Counters
      FILE: Number of bytes read=28
      FILE: Number of bytes written=548863
      FILE: Number of read operations=0
      FILE: Number of large read operations=0
      FILE: Number of write operations=0
      HDFS: Number of bytes read=278
      HDFS: Number of bytes written=215
      HDFS: Number of read operations=9
      HDFS: Number of large read operations=0
      HDFS: Number of write operations=3
      HDFS: Number of bytes read erasure-coded=0
    Job Counters
      Launched map tasks=1
      Launched reduce tasks=1
      Data-local map tasks=1
      Total time spent by all maps in occupied slots (ms)=37968
      Total time spent by all reduces in occupied slots (ms)=79360
      Total time spent by all map tasks (ms)=2373
      Total time spent by all reduce tasks (ms)=2480
      Total vcore-milliseconds taken by all map tasks=2373
      Total vcore-milliseconds taken by all reduce tasks=2480
      Total megabyte-milliseconds taken by all map tasks=4859904
      Total megabyte-milliseconds taken by all reduce tasks=10158080
    Map-Reduce Framework
      Map input records=1
      Map output records=2
      Map output bytes=18
      Map output materialized bytes=28
      Input split bytes=160
      Combine input records=0
      Combine output records=0
      Reduce input groups=2
      Reduce shuffle bytes=28
      Reduce input records=2
      Reduce output records=0
      Spilled Records=4
      Shuffled Maps =1
      Failed Shuffles=0
      Merged Map outputs=1
      GC time elapsed (ms)=87
      CPU time spent (ms)=1420
      Physical memory (bytes) snapshot=870387712
      Virtual memory (bytes) snapshot=9336647680
      Total committed heap usage (bytes)=2716860416
      Peak Map Physical memory (bytes)=457416704
      Peak Map Virtual memory (bytes)=3773362176
      Peak Reduce Physical memory (bytes)=412971008
      Peak Reduce Virtual memory (bytes)=5563285504
    Shuffle Errors
      BAD_ID=0
      CONNECTION=0
      IO_ERROR=0
      WRONG_LENGTH=0
      WRONG_MAP=0
      WRONG_REDUCE=0
    File Input Format Counters
      Bytes Read=118
    File Output Format Counters
      Bytes Written=97
Job Finished in 17.292 seconds
Estimated value of Pi is 3.14120000000000000000
status ->true

Process finished with exit code 0转载自Journey
原文链接:https://segmentfault.com/a/1190000044966157
本文由 白鲸开源 提供发布支持!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程