Spark 소스 코드 판독 의 Executor 및 Task 작업 원리 분석
13370 단어 Spark빅 데이터Spark 소스 코드 분석 및 변조
먼저 executor 의 작업 절 차 를 살 펴 보 겠 습 니 다.
executor 는 백 엔 드 프로 세 스 CoarseGrained Executor Backend 를 시작 합 니 다. 먼저 driver 에 RegisterExecutor 메 시 지 를 보 내 executor 를 등록 한 후 driver 는 RegisterExecutor 메 시 지 를 되 돌려 줍 니 다. CoarseGrained Executor Backend 는 메 시 지 를 받 은 후에 executor 대상 을 만 든 다음 executor 의 launchTask 방법 으로 task 를 시작 합 니 다.
원본 코드 는 다음 과 같 습 니 다.
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
var driver: ActorSelection = null
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
// Driver executor
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
......
override def receiveWithLogging = {
// RegisterExecutor executor
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
//
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = env.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
.......
}
CoarseGrained Executor Backend 의 LaunchTask 에서 실제 executor 의 launchTask 방법 을 호출 하 였 습 니 다. 이 방법 에서 실제 적 으로 Task Runner (실제 자바 Runnable 인 터 페 이 스 를 실현 하 는 스 레 드) 를 만 든 다음 에 이 taskRunner 를 자바 스 레 드 탱크 에 넣 고 예약 하 였 습 니 다. 그 소스 코드 는 다음 과 같 습 니 다.
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {
// TaskRunner
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
//
runningTasks.put(taskId, tr)
//
threadPool.execute(tr)
}
따라서 executor 에서 모든 task 에 대해 task Runner 를 만 들 고 스 레 드 탱크 에 넣 어 사용 합 니 다. 그러면 task Runner 에서 task 가 어떻게 작 동 하 는 지 볼 수 있 습 니 다.
TaskRunner 는 Executor 의 내부 클래스 로 실제 자바 의 스 레 드 클래스 입 니 다. 그러면 run 방법 에서 주로 task 의 운행 원리 입 니 다. 소스 코드 는 다음 과 같 습 니 다.
override def run() {
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
startGCTime = gcTime
try {
// jar,
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
}
attemptedTask = Some(task)
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
// mapOutputTracker stage map
env.mapOutputTracker.updateEpoch(task.epoch)
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
val taskFinish = System.currentTimeMillis()
......
run 방법 에서 task 류 의 run 방법 을 호출 했 습 니 다. 이 run 방법 은 현재 task 에 대한 실행 입 니 다.
/**
* Called by Executor to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task
*/
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
// TaskContext,task
// task , task ,task stage,task rdd partition
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
//
runTask(context)
} finally {
context.markTaskCompleted()
TaskContextHelper.unset()
}
}
/**
* ,
*
* ,
* Task ,shuffleMapTask,ResultTask
* @param context
* @return
*/
def runTask(context: TaskContext): T
앞의 글 에서 stage 구분 알고리즘 분석 에서 우 리 는 중간 stage 에 대해 shuffleMapTask 를 만 들 고 마지막 stage 만 ResultTask 를 만 들 수 있다 는 것 을 알 게 되 었 습 니 다. 그러면 위의 소스 코드 에서 추상 적 인 방법 을 호출 했 기 때문에 task 의 이 두 가지 유형의 runTask 방법 을 각각 호출 할 것 입 니 다.shuffle MapTask 는 Shuffle Dependency 가 지정 한 partitioner 에 따라 rdd 를 여러 개의 bucket 으로 나 눕 니 다.
실제로 shuffleMapTask 의 run 방법 에 서 는 shuffleManager 의 shuffleWriter 를 사용 하여 데 이 터 를 파 티 션 한 후 해당 하 는 파 티 션 에 기록 합 니 다. 모든 작업 이 끝 난 후에 MapStatus 를 DAGScheduler 에 게 되 돌려 줍 니 다.그 소스 코드 는 다음 과 같다.
override def runTask(context: TaskContext): MapStatus = {
// RDD
// Deserialize the RDD using the broadcast variable.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
// ShuffleManager, ShuffleManager ShuffleWriter
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// rdd iterator , task partition,
// ShuffleWriter, HashPartitioner , bucket
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: product2="" any="" class="hljs-comment">// ,MapStatus
//MapStatus ShffleMapTask , , BlockManager
//BlockManager spark , ,
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
한편, Shuffle MapTask 에 비해 ResultTask 는 비교적 간단 합 니 다. 주로 Shuffle MapTask 의 중간 출력 결과 에 대해 shuffle 작업 과 우리 가 정의 한 연산 자 와 함 수 를 실행 하기 때문에 MapOutputTracker 에서 출력 한 중간 데 이 터 를 끌 어 올 릴 수 있 습 니 다. 소스 코드 는 다음 과 같 습 니 다.
override def runTask(context: TaskContext): U = {
// RDD
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
그리고 shuffle 작업 을 통 해 task 의 실행 결 과 를 캐 시 에 넣 거나 디스크 에 기록 합 니 다. 다음 글 에 서 는 주로 shuffle 의 원 리 를 소개 합 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.