Task 실행 성공 시 결과 처리

이전 절에서 Task가 Executor에서 실행하는 코드를 보여 주었는데, 우리는 코드의 최종 실행이 TaskRunner 방법을 통과한 것을 알고 있다
class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable { //         // Driver       execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) //          //     ,  Driver        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { //   ,  Driver       //     }

상태 업데이트 시 CoarseGrainedExecutorBackend의statusUpdate 방법을 먼저 호출합니다
  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { // Driver   StatusUpdate   case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") }

DriverEndpoint의receive 방법은 보내온 StatusUpdate 메시지를 수신하고 처리합니다. 구체적인 원본 코드는 다음과 같습니다.
 override def receive: PartialFunction[Any, Unit] = {
      //  StatusUpdate       
      case StatusUpdate(executorId, taskId, state, data) =>
        //  TaskSchedulerImpl  statusUpdate  
        scheduler.statusUpdate(taskId, state, data.value)

        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")

      case ReviveOffers =>

      case KillTask(taskId, executorId, interruptThread) =>
        executorDataMap.get(executorId) match {
          case Some(executorInfo) =>
            executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
          case None =>
            // Ignoring the task kill since the executor is not registered.
            logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")


TaskSchedulerImpl의 statusUpdate 메서드 소스는 다음과 같습니다.
 def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    synchronized {
      try {
        if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
          // We lost this entire executor, so remember that it's gone
          val execId = taskIdToExecutorId(tid)
          if (activeExecutorIds.contains(execId)) {
            failedExecutor = Some(execId)
        taskIdToTaskSetManager.get(tid) match {
          case Some(taskSet) =>
            if (TaskState.isFinished(state)) {
            if (state == TaskState.FINISHED) {
             //taskResultGetter    ,         
             taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
            //       ,        、          
            } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
              taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
          case None =>
              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
                "likely the result of receiving duplicate task finished status updates)")
                .format(state, tid))
      } catch {
        case e: Exception => logError("Exception in statusUpdate", e)
    // Update the DAGScheduler without holding a lock on this, since that can deadlock
    if (failedExecutor.isDefined) {

Task가 성공적으로 실행되면 TaskResultGetter의 enqueueSuccessfulTask 메서드를 호출하여 처리합니다.
 def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
            case directResult: DirectTaskResult[_] =>
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
              // deserialize "value" without holding any lock so that it won't block other threads.
              // We should call it here, so that when it's called again in
              // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
              (directResult, serializedData.limit())
            //       Worker   BlockManager  
            case IndirectTaskResult(blockId, size) =>
              if (!taskSetManager.canFetchMoreResults(size)) {
                // dropped by executor if size is larger than maxResultSize
              logDebug("Fetching indirect task result for TID %s".format(tid))
              scheduler.handleTaskGettingResult(taskSetManager, tid)
              //   Worker    
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                /* We won't be able to get the task result if the machine that ran the task failed
                 * between when the task ended and when we tried to fetch the result, or if the
                 * block manager had to flush the result. */
                 //     ,    Eexecutor               ,          
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
             //      sparkEnv.blockManager.master.removeBlock(blockId)
              (deserializedResult, size)

          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch {
          case cnf: ClassNotFoundException =>
            val loader = Thread.currentThread.getContextClassLoader
            taskSetManager.abort("ClassNotFound with classloader: " + loader)
          // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
          case NonFatal(ex) =>
            logError("Exception while getting task result", ex)
            taskSetManager.abort("Exception while getting task result: %s".format(ex))

TaskSchedulerImpl의 handleSuccessfulTask 메서드는 다음과 같은 소스로 계산 결과를 처리합니다.
def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized {
     //  TaskSetManager.handleSuccessfulTask      
    taskSetManager.handleSuccessfulTask(tid, taskResult)

TaskSetManager.handleSuccessfulTask 메소드 소스는 다음과 같습니다.
   * Marks the task as successful and notifies the DAGScheduler that a task has ended.
  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
    val info = taskInfos(tid)
    val index = info.index
    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
    // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
    // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads.
    //  DagScheduler taskEnded  
      tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
    if (!successful(index)) {
      tasksSuccessful += 1
      logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(,, info.taskId, info.duration,, tasksSuccessful, numTasks))
      // Mark successful and stop if all the tasks have succeeded.
      successful(index) = true
      if (tasksSuccessful == numTasks) {
        isZombie = true
    } else {
      logInfo("Ignoring task-finished event for " + + " in stage " + +
        " because task " + index + " has already completed successfully")

DAGscheduler에 들어가는 taskEnded 방법
//DAGScheduler  taskEnded  
   * Called by the TaskSetManager to report task completions or failures.
  def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = {
      //  DAGSchedulerEventProcessLoop post   CompletionEvent        ,  eventThread    ,onReceive   
      CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))

onReceive 방법으로 이동하면 onReceive가 호출된 것을 볼 수 있습니다
//DAGSchedulerEventProcessLoop  onReceive  
/** * The main event loop of the DAG scheduler. */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    } finally {

DoOnReceive 메서드로 이동하면 보실 수 있습니다.
//DAGSchedulerEventProcessLoop  doOnReceive  
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case StageCancelled(stageId) =>

    case JobCancelled(jobId) =>

    case JobGroupCancelled(groupId) =>

    case AllJobsCancelled =>

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
    //  CompletionEvent  
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
    //  DAGScheduler.handleTaskCompletion    

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>

DAGScheduler.handleTaskCompletion 방법으로 계산 결과 처리 완료
/** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. */
  private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
    val task = event.task
    val stageId = task.stageId
    val taskType = Utils.getFormattedClassName(task)

    outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
      event.taskInfo.attempt, event.reason)

    // The success case is dealt with separately below, since we need to compute accumulator
    // updates before posting.
    if (event.reason != Success) {
      val attemptId = task.stageAttemptId, attemptId, taskType, event.reason,
        event.taskInfo, event.taskMetrics))

    if (!stageIdToStage.contains(task.stageId)) {
      // Skip all the actions if the stage has been cancelled.

    val stage = stageIdToStage(task.stageId)
    event.reason match {
      case Success =>, stage.latestInfo.attemptId, taskType,
          event.reason, event.taskInfo, event.taskMetrics))
        stage.pendingTasks -= task
        task match {
           //  ResultTask
          case rt: ResultTask[_, _] =>
            // Cast to ResultStage here because it's part of the ResultTask
            // TODO Refactor this out to a function that accepts a ResultStage
            val resultStage = stage.asInstanceOf[ResultStage]
            resultStage.resultOfJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  job.finished(rt.outputId) = true
                  job.numFinished += 1
                  // If the whole job has finished, remove it
                  //  job       ,   Task      
                  if (job.numFinished == job.numPartitions) {
                      SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))

                  // taskSucceeded runs some user code that might throw an exception. Make sure
                  // we are resilient against that.
                  //  JobWaiter,job    
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the resultStage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
          //  ShuffleMapTask
          case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {
               //     ShuffleMapStage
              shuffleStage.addOutputLoc(smt.partitionId, status)

            if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)

              // We supply true to increment the epoch number here in case this is a
              // recomputation of the map outputs. In that case, some nodes may have cached
              // locations with holes (from when we detected the error) and will need the
              // epoch incremented to refetch them.
              // TODO: Only increment the epoch number if this is not the first time
              // we registered these map outputs.
       => if (list.isEmpty) null else list.head),
                changeEpoch = true)

              //    Task     
              if (shuffleStage.outputLocs.contains(Nil)) {
                // Some tasks had failed; let's resubmit this shuffleStage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + shuffleStage + " (" + +
                  ") because some of its tasks had failed: " +
                      .map(_._2).mkString(", "))
              } else {
                //        Stage
                val newlyRunnable = new ArrayBuffer[Stage]
                for (shuffleStage <- waitingStages) {
                  logInfo("Missing parents for " + shuffleStage + ": " +
                for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
                  newlyRunnable += shuffleStage
                waitingStages --= newlyRunnable
                runningStages ++= newlyRunnable
                for {
                  shuffleStage <- newlyRunnable.sortBy(
                  jobId <- activeJobForStage(shuffleStage)
                } {
                  logInfo("Submitting " + shuffleStage + " (" +
                    shuffleStage.rdd + "), which is now runnable")
                  submitMissingTasks(shuffleStage, jobId)


실행 프로세스: 메서드 방법 메서드, DriverEndPoint는 내부 클래스의 statusUpdate 방법 메서드 방법

