Spark 에서 Task 의 제출 소스 코드 해독

저작권 성명: 본 고 는 오리지널 문장 으로 허락 없 이 전재 할 수 없습니다.복습 내용: Spark 중 Stage 제출http://www.jianshu.com/p/75751908329a
Spark 에서 Task 의 제출
1. 복습 내용 부분 에서 우 리 는 방법 on Stage Submitted 에서 Stage 의 제출 을 소개 했다. 그러면 이 방법 에 Task 의 제출 도 있다. 다음 과 같다.
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
//(1)Stage , -Spark Task
//(2)Task
//broadcasted task , tasks executors。
// : broadcast RDD task , task RDD
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
case stage: ResultStage =>
closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
}
// task
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e
${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
// stage tasks
val tasks: Seq[Task[]] = try {
stage match {
// ShuffleMapStages ShuffleMapTask
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
// partition, task,
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.internalAccumulators)
}
// ResultStage ResultTask
case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, stage.internalAccumulators)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e
${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
// tasks num 0
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(
.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
// taskScheduler TaskSet, 2
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// SparkListenerStageSubmitted, Stage completed
markStageAsFinished(stage, None)
// debugString
val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
}
}
2. Task 의 제출 은 task Scheduler 의 submitTasks 방법 으로 진행 된다. Task Scheduler 는 trait 이 고 그의 유일한 구체 적 인 실현 은 Task Scheduler Impl 이다. submitTasks 방법 은 다음 과 같다.
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// taskSet TaskSetManager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{
._2.taskSet.id}.mkString(",")}")
}
// taskSetManager taskSet tree ,FIFO or FAIR
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
//
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
// ( )
backend.reviveOffers()
}
이렇게 해서 우 리 는 Task 의 제출 을 완 성 했 습 니 다. 그러면 서로 다른 모델 은 Task 의 자원 에 대해 어떻게 분 배 했 습 니까? 우 리 는 나중에 소개 합 니 다.

좋은 웹페이지 즐겨찾기