3-Azkaban 작업 수행 과정

Executor 정보 분석 수행


서버가 ExecutorManager dispatch 방법을 통해 서버의 제출 정보를 flow에 전달할 때.
Executor 측은 Azkaban-exec-server/Executor Servlet의 doGet 방법으로 요청 정보를 수신하고 handleAjaxExecute FlowRunner Manager의 submitFlow 방법으로 처리합니다.
public void submitFlow(int execId) throws ExecutorManagerException {

    // Load file and submit
    //   flow   。
    if (runningFlows.containsKey(execId)) {
      throw new ExecutorManagerException("Execution " + execId
          + " is already running.");
    }

    ExecutableFlow flow = null;

    flow = executorLoader.fetchExecutableFlow(execId);

    logger.info("get flow : " + flow.getFlowId());

    if (flow == null) {
      throw new ExecutorManagerException("Error loading flow with exec "
          + execId);
    }

    // Sets up the project files and execution directory.
    //   executor   version_project
    setupFlow(flow);

    // Setup flow runner
    FlowWatcher watcher = null;

    //   flow  
    ExecutionOptions options = flow.getExecutionOptions();


    //  flow   flow
    //    
    if (options.getPipelineExecutionId() != null) {
      Integer pipelineExecId = options.getPipelineExecutionId();
      FlowRunner runner = runningFlows.get(pipelineExecId);

      if (runner != null) {
        watcher = new LocalFlowWatcher(runner);
      } else {
        watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
      }
    }

    //   job   
    int numJobThreads = numJobThreadPerFlow;

    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
      try {
        int numJobs =
            Integer.valueOf(options.getFlowParameters().get(
                FLOW_NUM_JOB_THREADS));
        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
                .isProjectWhitelisted(flow.getProjectId(),
                    WhitelistType.NumJobPerFlow))) {
          numJobThreads = numJobs;
        }
      } catch (Exception e) {
        throw new ExecutorManagerException(
            "Failed to set the number of job threads "
                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                + " for flow " + execId, e);
      }
    }

    FlowRunner runner =
        new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);

    //  
    runner.setFlowWatcher(watcher)
        .setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
        .setValidateProxyUser(validateProxyUser)
        .setNumJobThreads(numJobThreads).addListener(this);

    configureFlowLevelMetrics(runner);

    // Check again.
    if (runningFlows.containsKey(execId)) {
      throw new ExecutorManagerException("Execution " + execId
          + " is already running.");
    }

    // Finally, queue the sucker.
    runningFlows.put(execId, runner);

    try {
      // The executorService already has a queue.
      // The submit method below actually returns an instance of FutureTask,
      // which implements interface RunnableFuture, which extends both
      // Runnable and Future interfaces
      //   flow
      Future> future = executorService.submit(runner);
      // keep track of this future
      submittedFlows.put(future, runner.getExecutionId());
      // update the last submitted time.
      this.lastFlowSubmittedDate = System.currentTimeMillis();
    } catch (RejectedExecutionException re) {
      throw new ExecutorManagerException(
          "Azkaban server can't execute any more flows. "
              + "The number of running flows has reached the system configured limit."
              + "Please notify Azkaban administrators");
    }
  }

계속 기다리다

좋은 웹페이지 즐겨찾기