[Chapter 6] Parse the Driver, notify the Master after the Executor state changes
The following is when the Driver state changes
case DriverStateChanged(driverId, state, exception) => {
state match {
// Driverr , Driver
case DriverState.ERROR |
DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received
unexpected state update for driver $driverId: $state")
}
}
def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
// driverId driver
drivers.find(d => d.id == driverId) match {
//Some ,
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//driver
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// driver completeDrivers
completedDrivers += driver
// driver
persistenceEngine.removeDriver(driver)
// driver
driver.state = finalState
driver.exception = exception
// driver worker, worker driver
driver.worker.foreach(w => w.removeDriver(driver))
// schedule ,
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
Through the above method, are we familiar with it? This is the reverse direction of the steps to be performed when we registered the Master earlier.The following is the source code when the Executor state changes: The code is very simple, and I have written annotations, which is similar to the opposite of the previous Application registration Master step.
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
// executorID applicaiton executor
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
// executor applicaiton Driver。
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
// executor
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// application executor
appInfo.removeExecutor(exec)
// executor worker executor
exec.worker.removeExecutor(exec)
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
// executor , 10 ,
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
// 10 , executor ,
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
// executor task application
removeApplication(appInfo, ApplicationState.FAILED)
}
}
The following code briefly explains that when the number of retry executors is greater than 10, the executor is deleted, and of course the application running on the executor does not exist.Every word of the article is written by the author. If you find it useful after reading it, please click "Like"
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.