Spark 소스 코드 판독 의 Executor 및 Task 작업 원리 분석

이전 글 에 서 는 Task Scheduler 가 Task Set 에 있 는 task 를 executor 에 보 내 실행 하 는 것 을 다 루 었 다. 이 글 은 이 어 executor 의 작업 원리 와 task 가 어떻게 실행 되 는 지 설명 했다.
먼저 executor 의 작업 절 차 를 살 펴 보 겠 습 니 다.
executor 는 백 엔 드 프로 세 스 CoarseGrained Executor Backend 를 시작 합 니 다. 먼저 driver 에 RegisterExecutor 메 시 지 를 보 내 executor 를 등록 한 후 driver 는 RegisterExecutor 메 시 지 를 되 돌려 줍 니 다. CoarseGrained Executor Backend 는 메 시 지 를 받 은 후에 executor 대상 을 만 든 다음 executor 의 launchTask 방법 으로 task 를 시작 합 니 다.
원본 코드 는 다음 과 같 습 니 다.
private[spark] class CoarseGrainedExecutorBackend(
    driverUrl: String,
    executorId: String,
    hostPort: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv)
  extends Actor with ActorLogReceive with ExecutorBackend with Logging {

  Utils.checkHostPort(hostPort, "Expected hostport")

  var executor: Executor = null
  var driver: ActorSelection = null

  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    // Driver  executor
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }
    ......

  override def receiveWithLogging = {
    //  RegisterExecutor      executor
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)
      System.exit(1)

      //    
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val ser = env.closureSerializer.newInstance()
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }
    .......
  }

CoarseGrained Executor Backend 의 LaunchTask 에서 실제 executor 의 launchTask 방법 을 호출 하 였 습 니 다. 이 방법 에서 실제 적 으로 Task Runner (실제 자바 Runnable 인 터 페 이 스 를 실현 하 는 스 레 드) 를 만 든 다음 에 이 taskRunner 를 자바 스 레 드 탱크 에 넣 고 예약 하 였 습 니 다. 그 소스 코드 는 다음 과 같 습 니 다.
 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    //  TaskRunner
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    //          
    runningTasks.put(taskId, tr)
    //     
    threadPool.execute(tr)
  }

따라서 executor 에서 모든 task 에 대해 task Runner 를 만 들 고 스 레 드 탱크 에 넣 어 사용 합 니 다. 그러면 task Runner 에서 task 가 어떻게 작 동 하 는 지 볼 수 있 습 니 다.
TaskRunner 는 Executor 의 내부 클래스 로 실제 자바 의 스 레 드 클래스 입 니 다. 그러면 run 방법 에서 주로 task 의 운행 원리 입 니 다. 소스 코드 는 다음 과 같 습 니 다.
override def run() {
      val deserializeStartTime = System.currentTimeMillis()
      Thread.currentThread.setContextClassLoader(replClassLoader)
      val ser = env.closureSerializer.newInstance()
      logInfo(s"Running $taskName (TID $taskId)")
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var taskStart: Long = 0
      startGCTime = gcTime

      try {
        //              jar,    
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        if (killed) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException
        }

        attemptedTask = Some(task)
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        //  mapOutputTracker     stage map         
        env.mapOutputTracker.updateEpoch(task.epoch)

        // Run the actual task and measure its runtime.
        taskStart = System.currentTimeMillis()
        val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
        val taskFinish = System.currentTimeMillis()
        ......

run 방법 에서 task 류 의 run 방법 을 호출 했 습 니 다. 이 run 방법 은 현재 task 에 대한 실행 입 니 다.
 /**
   * Called by Executor to run this task.
   *
   * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
   * @param attemptNumber how many times this task has been attempted (0 for the first attempt)
   * @return the result of the task
   */
  final def run(taskAttemptId: Long, attemptNumber: Int): T = {
    //  TaskContext,task    
    //     task        ,  task     ,task    stage,task       rdd   partition  
    context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
      taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
    TaskContextHelper.setTaskContext(context)
    context.taskMetrics.setHostname(Utils.localHostName())
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    try {
      //      
      runTask(context)
    } finally {
      context.markTaskCompleted()
      TaskContextHelper.unset()
    }
  }
   /**
    *         ,                   
    *                 
    *       ,          
    * Task   ,shuffleMapTask,ResultTask
    * @param context
    * @return
    */
  def runTask(context: TaskContext): T

앞의 글 에서 stage 구분 알고리즘 분석 에서 우 리 는 중간 stage 에 대해 shuffleMapTask 를 만 들 고 마지막 stage 만 ResultTask 를 만 들 수 있다 는 것 을 알 게 되 었 습 니 다. 그러면 위의 소스 코드 에서 추상 적 인 방법 을 호출 했 기 때문에 task 의 이 두 가지 유형의 runTask 방법 을 각각 호출 할 것 입 니 다.shuffle MapTask 는 Shuffle Dependency 가 지정 한 partitioner 에 따라 rdd 를 여러 개의 bucket 으로 나 눕 니 다.
실제로 shuffleMapTask 의 run 방법 에 서 는 shuffleManager 의 shuffleWriter 를 사용 하여 데 이 터 를 파 티 션 한 후 해당 하 는 파 티 션 에 기록 합 니 다. 모든 작업 이 끝 난 후에 MapStatus 를 DAGScheduler 에 게 되 돌려 줍 니 다.그 소스 코드 는 다음 과 같다.
 override def runTask(context: TaskContext): MapStatus = {
    //          RDD
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      //  ShuffleManager, ShuffleManager   ShuffleWriter
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //    rdd iterator  ,       task     partition,           
      //           ShuffleWriter,  HashPartitioner      ,        bucket
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: product2="" any="" class="hljs-comment">//      ,MapStatus
      //MapStatus     ShffleMapTask      ,     ,    BlockManager   
      //BlockManager spark    ,  ,         
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

한편, Shuffle MapTask 에 비해 ResultTask 는 비교적 간단 합 니 다. 주로 Shuffle MapTask 의 중간 출력 결과 에 대해 shuffle 작업 과 우리 가 정의 한 연산 자 와 함 수 를 실행 하기 때문에 MapOutputTracker 에서 출력 한 중간 데 이 터 를 끌 어 올 릴 수 있 습 니 다. 소스 코드 는 다음 과 같 습 니 다.
  override def runTask(context: TaskContext): U = {
    //          RDD
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

그리고 shuffle 작업 을 통 해 task 의 실행 결 과 를 캐 시 에 넣 거나 디스크 에 기록 합 니 다. 다음 글 에 서 는 주로 shuffle 의 원 리 를 소개 합 니 다.

좋은 웹페이지 즐겨찾기