Spark 수련의 길(고급편) - Spark 원본 읽기: 제9절Task 실행 성공 시 결과 처리
Task 실행 성공 시 결과 처리
이전 절에서 Task가 Executor에서 실행하는 코드를 보여 주었는데, 우리는 코드의 최종 실행이 TaskRunner 방법을 통과한 것을 알고 있다
class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable { // // Driver execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) // // , Driver execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { // , Driver // }
상태 업데이트 시 CoarseGrainedExecutorBackend의statusUpdate 방법을 먼저 호출합니다
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { // Driver StatusUpdate case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") }
}
}
DriverEndpoint의receive 방법은 보내온 StatusUpdate 메시지를 수신하고 처리합니다. 구체적인 원본 코드는 다음과 같습니다.
override def receive: PartialFunction[Any, Unit] = {
// StatusUpdate
case StatusUpdate(executorId, taskId, state, data) =>
// TaskSchedulerImpl statusUpdate
scheduler.statusUpdate(taskId, state, data.value)
//
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
case ReviveOffers =>
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
}
TaskSchedulerImpl의 statusUpdate 메서드 소스는 다음과 같습니다.
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {
removeExecutor(execId)
failedExecutor = Some(execId)
}
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid)
}
//
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
//taskResultGetter ,
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
// , 、
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
//
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
Task가 성공적으로 실행되면 TaskResultGetter의 enqueueSuccessfulTask 메서드를 호출하여 처리합니다.
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
//
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
// Worker BlockManager
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
// Worker
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
// , Eexecutor ,
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
//
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
// sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
result.metrics.setResultSize(size)
//TaskSchedulerImpl
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
}
})
}
TaskSchedulerImpl의 handleSuccessfulTask 메서드는 다음과 같은 소스로 계산 결과를 처리합니다.
def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized {
// TaskSetManager.handleSuccessfulTask
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
TaskSetManager.handleSuccessfulTask 메소드 소스는 다음과 같습니다.
/**
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
*/
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads.
// DagScheduler taskEnded
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}
DAGscheduler에 들어가는 taskEnded 방법
//DAGScheduler taskEnded
/**
* Called by the TaskSetManager to report task completions or failures.
*/
def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit = {
// DAGSchedulerEventProcessLoop post CompletionEvent , eventThread ,onReceive
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
}
onReceive 방법으로 이동하면 onReceive가 호출된 것을 볼 수 있습니다
//DAGSchedulerEventProcessLoop onReceive
/** * The main event loop of the DAG scheduler. */
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
DoOnReceive 메서드로 이동하면 보실 수 있습니다.
//DAGSchedulerEventProcessLoop doOnReceive
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
// CompletionEvent
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
// DAGScheduler.handleTaskCompletion
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
DAGScheduler.handleTaskCompletion 방법으로 계산 결과 처리 완료
/** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. */
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
event.taskInfo.attempt, event.reason)
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
event.taskInfo, event.taskMetrics))
}
if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
}
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
task match {
// ResultTask
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
// job , Task
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
// JobWaiter,job
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
// ShuffleMapTask
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// ShuffleMapStage
shuffleStage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head),
changeEpoch = true)
clearCacheLocs()
// Task
if (shuffleStage.outputLocs.contains(Nil)) {
// Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
.map(_._2).mkString(", "))
//
submitStage(shuffleStage)
} else {
// Stage
val newlyRunnable = new ArrayBuffer[Stage]
for (shuffleStage <- waitingStages) {
logInfo("Missing parents for " + shuffleStage + ": " +
getMissingParentStages(shuffleStage))
}
for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
{
newlyRunnable += shuffleStage
}
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
shuffleStage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(shuffleStage)
} {
logInfo("Submitting " + shuffleStage + " (" +
shuffleStage.rdd + "), which is now runnable")
submitMissingTasks(shuffleStage, jobId)
}
}
}
}
//
}
실행 프로세스: 1.org.apache.spark.executor.TaskRunner.statusUpdate 메서드 2.org.apache.spark.executor.CoarseGrainedExecutorBackend.statusUpdate 방법 3.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#DriverEndpoint.recieve 메서드, DriverEndPoint는 내부 클래스 4.org.apache.spark.scheduler.TaskSchedulerImpl의 statusUpdate 방법 5.org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask 메서드 6.org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion 방법
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.