[Chapter 6] Parse the Driver, notify the Master after the Executor state changes

4415 단어
This section is mainly to continue the content of the previous section. In the previous section, we talked about the registration mechanism of the Master, including the registration of the Driver, Application, and Worker to the Master, so that the Master can clearly know the Driver, Application, and Worker. startup state. When the task is running, when the status of Driver, Application (Executor), and Worker is changed, the registration information on the Master side will be updated at the same time.
The following is when the Driver state changes
case DriverStateChanged(driverId, state, exception) => {
      state match {
        // Driverr , Driver
        case DriverState.ERROR | 
DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received
 unexpected state update for driver $driverId: $state")
      }
    }


def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
    // driverId driver 
    drivers.find(d => d.id == driverId) match {
      //Some , 
    case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //driver 
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // driver completeDrivers 
        completedDrivers += driver
        // driver
        persistenceEngine.removeDriver(driver)
        // driver 
        driver.state = finalState
        driver.exception = exception
        // driver worker, worker driver 
        driver.worker.foreach(w => w.removeDriver(driver))
        //   schedule , 
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }
Through the above method, are we familiar with it? This is the reverse direction of the steps to be performed when we registered the Master earlier.
The following is the source code when the Executor state changes: The code is very simple, and I have written annotations, which is similar to the opposite of the previous Application registration Master step.
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      // executorID applicaiton executor 
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        case Some(exec) => {
          val appInfo = idToApp(appId)
          exec.state = state
          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
          // executor applicaiton Driver。 
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          // executor 
          if (ExecutorState.isFinished(state)) {
            // Remove this executor from the worker and app
            logInfo(s"Removing executor ${exec.fullId} because it is $state")
            // application executor 
            appInfo.removeExecutor(exec)
            // executor worker executor 
            exec.worker.removeExecutor(exec)

            val normalExit = exitStatus == Some(0)
            // Only retry certain number of times so we don't go into an infinite loop.
            if (!normalExit) {
              // executor  , 10 , 
              if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
                schedule()
              } else {
                // 10 , executor ,
                val execs = appInfo.executors.values
                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
                    s"${appInfo.retryCount} times; removing it")
    // executor task application 
                  removeApplication(appInfo, ApplicationState.FAILED)
                }
              }
The following code briefly explains that when the number of retry executors is greater than 10, the executor is deleted, and of course the application running on the executor does not exist.

Every word of the article is written by the author. If you find it useful after reading it, please click "Like"

좋은 웹페이지 즐겨찾기