Spark 구조 원리 - Worker 소스 코드 분석

원본 주소:https://blog.csdn.net/zhanglh046/article/details/78485663
Worker 는 spark 의 작업 노드 로 주로 Master 명령 을 받 고 Executor, Driver 등 을 시작 하거나 죽 이 는 것 을 책임 집 니 다.Driver 나 Executor 상 태 를 Master 에 보고 하고 심장 박동 요청 을 Master 에 보 내 는 등.
1. 중요 속성
  • RpcEnv rpcEnv: RpcEndpoint 와 RpcEndpoint Ref 를 등록 하고 유지 하 는 데 사 용 됩 니 다.
  • Int webUiport: web ui 포트.
  • Int cores: 이 worker 에 분 배 된 CPU 핵 수 입 니 다.
  • Int coresUsed: 이 worker 가 사용 하 는 CPU 핵 수 입 니 다.
  • Int coresFree = cores - coresUsed 에 남 은 CPU 핵 수.
  • Int memory: 이 worker 에 분 배 된 메모리 용량.
  • Int memory Used: 이 worker 가 사용 하 는 메모리 용량.
  • Int memory Free = memory - memory Used 에 남 은 메모리 용량.
  • Array [RpcAddress] masterRpcAddresses: master RpcAddress 배열.
  • String endpoint Name: worker 의 rpc 터미널 이름 입 니 다.
  • String workDirPath: 작업 디 렉 터 리.
  • forwordmessage Scheduler: 배경 스 레 드 를 예약 하여 지정 한 시간 에 메 시 지 를 보 냅 니 다.
  • cleanupThreadExecutor: 작업 디 렉 터 리 를 배경 으로 청소 하 는 스 레 드 입 니 다.
  • Option [RpcEndpoint Ref] master: master 터미널.
  • String activeMasterUrl: 현재 유효한 master url.
  • String activeMasterWebUiUrl: 현재 유효한 master web ui url.
  • String worker WebUiUrl: worker 의 웹 ui url.
  • String workerUri: worker 의 url.
  • boolean registered: 이 worker 가 이미 등록 되 었 는 지 여부 입 니 다.
  • boolean connected: 이 worker 가 master 에 연결 되 었 는 지 여부 입 니 다.
  • String workerId: worker 의 id.
  • HashMap [String, DriverRunner] drivers: worker 가 유지 하 는 모든 driver id - > DriverRunner 의 맵 입 니 다.
  • HashMap [String, Executor Runner] executors: worker 가 유지 하 는 모든 executor id - > Executor Runner 의 맵 입 니 다.
  • LinkedHashMap [String, DriverRunner] finished Drivers: Worker 가 유지 하 는 작업 이 완 료 된 driver id - > DriverRunner 의 맵 입 니 다.
  • HashMap [String, Seq [String]] app Directories: worker 가 유지 하 는 application id - > app 디 렉 터 리 의 맵 입 니 다.
  • HashSet [String] finished Apps: 이 worker 가 작업 을 마 친 application.
  • HEARTBEAT_MILLIS: Master 에 게 심장 박동 수 를 보 냅 니 다.
  • INITIAL_REGISTRATION_RETRIES: master 에 초기 재 시도 횟수 를 등록 합 니 다. 기본 값 은 6 회 입 니 다.
  • TOTAL_REGISTRATION_RETRIES: master 에 총 시도 횟수 를 등록 합 니 다.
  • INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS: 초기 화 된 등록 재 시도 간격.
  • PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS: 연 장 된 등록 재 시도 간격.
  • CLEANUP_ENABLED: cleanup 기능 을 사용 할 지 여부 입 니 다.
  • CLEANUP_INTERVAL_MILLIS: cleanup 시간 간격.
  • APP_DATA_RETENTION_SECONDS: app 데이터 저장 시간 입 니 다.

  • 중요 한 방법
    2.1 main 방법
    def main(argStrings:Array[String]) {
      Utils.initDaemon(log)
      val conf = new SparkConf
      //         
      val args= new WorkerArguments(argStrings,conf)
      //   Rpc         
      val rpcEnv= startRpcEnvAndEndpoint(args.host,args.port,args.webUiPort,args.cores,
        args.memory,args.masters,args.workDir,conf = conf)
      rpcEnv.awaitTermination()
    }
    

    2.2 온 스타 트 시작 워 커
  • 작업 디 렉 터 리 만 들 기
  • 웹 UI 를 만 들 고 웹 UI 를 연결 합 니 다
  • Master 에 등록
  • override def onStart() {
      assert(!registered)
      logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
        host, port, cores, Utils.megabytesToString(memory)))
      logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
      logInfo("Spark home: " + sparkHome)
      //       
      createWorkDir()
      //   ExternalShuffleService    ,     start  
      shuffleService.startIfEnabled()
      //    worker web ui
      webUi = new WorkerWebUI(this, workDir, webUiPort)
      webUi.bind()
    
      workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
      //  Master  
      registerWithMaster()
    
      metricsSystem.registerSource(workerSource)
      metricsSystem.start()
      // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
      metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    }
    

    2.3 createWorkDir 작업 디 렉 터 리 만 들 기
    /**
     *   worker          
     * app-20170613113959-0000
     * app-20170613114457-0001
     * app-20170613114710-0002
     */
    private def createWorkDir() {
      //       
      workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
      try {
        //     
        workDir.mkdirs()
        //              ,   
        if ( !workDir.exists() || !workDir.isDirectory) {
          logError("Failed to create work directory " + workDir)
          System.exit(1)
        }
        assert (workDir.isDirectory)
      } catch {
        case e: Exception =>
          logError("Failed to create work directory " + workDir, e)
          System.exit(1)
      }
    }
    

    2.4 register With Master (): master 에 등록
    private def registerWithMaster() {
      registrationRetryTimer match {
        //    ,       ,        
        case None =>
          //        false
          registered = false
          //      master  
          registerMasterFutures = tryRegisterAllMasters()
          //         0
          connectionAttemptCount = 0
          //         ,  ReregisterWithMaster  ,          ,       ,     
          registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
            new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                Option(self).foreach(_.send(ReregisterWithMaster))
              }
            },
            INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
            INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
            TimeUnit.SECONDS))
        //       registrationRetryTimer,     
        case Some(_) =>
    
      }
    }
    

    2.5 try Register AllMasters 는 모든 그룹 내 모든 master 에 등록 하려 고 시도 합 니 다.
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      masterRpcAddresses.map { masterAddress =>
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = {
            try {
              logInfo("Connecting to master " + masterAddress + "...")
              //   master RpcEndpoint,   master        
              val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
              //     master  
              registerWithMaster(masterEndpoint)
            } catch {
              case ie: InterruptedException => // Cancelled
              case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
            }
          }
        })
      }
    }
    

    2.6 registerWithMaster(masterEndpoint: RpcEndpointRef)
    //  master  
    private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
      //  master  RegisterWorker  
      masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
        workerId, host, port, self, cores, memory, workerWebUiUrl))
        .onComplete {
          //     ,   handleRegisterResponse
          case Success(msg) =>
            Utils.tryLogNonFatalError {
              handleRegisterResponse(msg)
            }
          //     ,   
          case Failure(e) =>
            logError(s"Cannot register with master: ${masterEndpoint.address}", e)
            System.exit(1)
        }(ThreadUtils.sameThread)
    }
    

    2.7 handle 레지스터 Response 처리 반전 함수 의 결과
    private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
      msg match {
        //    RegisteredWorker  ,        
        case RegisteredWorker(masterRef, masterWebUiUrl) =>
          logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
          registered = true //   registered  
          changeMaster(masterRef, masterWebUiUrl)
          //            master       
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(SendHeartbeat)
            }
          }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
          //      cleanup  ,            WorkDirCleanup  ,    
          if (CLEANUP_ENABLED) {
            logInfo(
              s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
            forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(WorkDirCleanup)
              }
            }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
          }
          //   worker    executor  ExecutorDescription  ,   executor
          val execs = executors.values.map { e =>
            new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
          }
          //  master  WorkerLatestState  ,  worker    
          masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
        //    RegisterWorkerFailed  ,      
        case RegisterWorkerFailed(message) =>
          //          ,   
          if (!registered) {
            logError("Worker registration failed: " + message)
            System.exit(1)
          }
        //    MasterInStandby  ,     
        case MasterInStandby =>
          // Ignore. Master not yet ready.
      }
    }
    

    2.8 receive 는 메 시 지 를 받 지만 결 과 를 되 돌 릴 필요 가 없다.
    override def receive: PartialFunction[Any, Unit] = synchronized {
      //       SendHeartbeat  ,     master      
      case SendHeartbeat =>
        if (connected) { sendToMaster(Heartbeat(workerId, self)) }
      //       WorkDirCleanup  ,          
      case WorkDirCleanup =>
        //     executors       app id   
        val appIds = executors.values.map(_.appId).toSet
        //          application  ,       ,        Future   
        val cleanupFuture = concurrent.Future {
          //           
          val appDirs = workDir.listFiles()
          if (appDirs == null) {
            throw new IOException("ERROR: Failed to list files in " + appDirs)
          }
          //
          appDirs.filter { dir =>
            val appIdFromDir = dir.getName //       
            val isAppStillRunning = appIds.contains(appIdFromDir) //          application      
            //      ,          ,        
            dir.isDirectory && !isAppStillRunning &&
            !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
          }.foreach { dir =>
            logInfo(s"Removing directory: ${dir.getPath}")
            Utils.deleteRecursively(dir)
          }
        }(cleanupThreadExecutor)
    
        cleanupFuture.onFailure {
          case e: Throwable =>
            logError("App dir cleanup failed: " + e.getMessage, e)
        }(cleanupThreadExecutor)
      //     MasterChanged  ,  master       
      case MasterChanged(masterRef, masterWebUiUrl) =>
        logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
        //     master url master,      true,           
        changeMaster(masterRef, masterWebUiUrl)
        //       executors       ExecutorDescription
        val execs = executors.values.
          map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
        //    master  WorkerSchedulerStateResponse  ,        
        masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
      //      ReconnectWorker  ,    worker  ,      
      case ReconnectWorker(masterUrl) =>
        logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
        //     ,     master  
        registerWithMaster()
      //      LaunchExecutor  ,      executor
      case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
        //   master    
        if (masterUrl != activeMasterUrl) {
          logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
        } else {
          try {
            logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
    
            //   executor  ,appId/execId
            val executorDir = new File(workDir, appId + "/" + execId)
            if (!executorDir.mkdirs()) {
              throw new IOException("Failed to create directory " + executorDir)
            }
    
            //   application    ,       ,                    
            val appLocalDirs = appDirectories.getOrElse(appId,
              Utils.getOrCreateLocalRootDirs(conf).map { dir =>
                val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                Utils.chmod700(appDir)
                appDir.getAbsolutePath()
              }.toSeq)
            appDirectories(appId) = appLocalDirs
            //   ExecutorRunner  ,      executor     
            val manager = new ExecutorRunner(
              appId,
              execId,
              appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
              cores_,
              memory_,
              self,
              workerId,
              host,
              webUi.boundPort,
              publicAddress,
              sparkHome,
              executorDir,
              workerUri,
              conf,
              appLocalDirs, ExecutorState.RUNNING)
            // worker   executor id->ExecutorRunner           ExecutorRunner
            executors(appId + "/" + execId) = manager
            //     ExecutorRunner
            manager.start()
            //          cpu       
            coresUsed += cores_
            memoryUsed += memory_
            //  master  ExecutorStateChanged  
            sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
          } catch {
            case e: Exception =>
              logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
              if (executors.contains(appId + "/" + execId)) {
                executors(appId + "/" + execId).kill()
                executors -= appId + "/" + execId
              }
              sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
                Some(e.toString), None))
          }
        }
      //     ExecutorStateChanged  ,  executor      
      case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
        handleExecutorStateChanged(executorStateChanged)
      //      KillExecutor  ,        executor  
      case KillExecutor(masterUrl, appId, execId) =>
        if (masterUrl != activeMasterUrl) {
          logWarning("Invalid Master (" + masterUrl + ") attempted to kill executor " + execId)
        } else {
          val fullId = appId + "/" + execId
          executors.get(fullId) match {
            case Some(executor) =>
              logInfo("Asked to kill executor " + fullId)
              executor.kill()
            case None =>
              logInfo("Asked to kill unknown executor " + fullId)
          }
        }
      //      LaunchDriver  ,    Driver
      case LaunchDriver(driverId, driverDesc) =>
        logInfo(s"Asked to launch driver $driverId")
        //   DriverRunner,    
        val driver = new DriverRunner(
          conf,
          driverId,
          workDir,
          sparkHome,
          driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
          self,
          workerUri,
          securityMgr)
        //   drivers
        drivers(driverId) = driver
        //     driver
        driver.start()
        //       worker      cpu
        coresUsed += driverDesc.cores
        memoryUsed += driverDesc.mem
      //      KillDriver  ,       driver
      case KillDriver(driverId) =>
        logInfo(s"Asked to kill driver $driverId")
        drivers.get(driverId) match {
          case Some(runner) =>
            runner.kill()
          case None =>
            logError(s"Asked to kill unknown driver $driverId")
        }
      //      DriverStateChanged  ,  driver    
      case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
        handleDriverStateChanged(driverStateChanged)
      //      ReregisterWithMaster  ,       master  
      case ReregisterWithMaster =>
        reregisterWithMaster()
      //      ApplicationFinished  ,  application      
      case ApplicationFinished(id) =>
        finishedApps += id
        //          application   
        maybeCleanupApplication(id)
    }
    

    2.9 receiveAndReply 메 시 지 를 받 고 결 과 를 되 돌려 줍 니 다.
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      //      RequestWorkerState  ,     worker     
      case RequestWorkerState =>
        context.reply(WorkerStateResponse(host, port, workerId, executors.values.toList,
          finishedExecutors.values.toList, drivers.values.toList,
          finishedDrivers.values.toList, activeMasterUrl, cores, memory,
          coresUsed, memoryUsed, activeMasterWebUiUrl))
    }
    

    2.10 changeMaster
    새로운 master 의 url 과 master 를 가 져 옵 니 다. 이전 재 등록 시 도 를 취소 합 니 다. 새로운 master 가 발견 되 었 기 때 문 입 니 다.
    private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
      //     master url master
      activeMasterUrl = masterRef.address.toSparkURL
      activeMasterWebUiUrl = uiUrl
      master = Some(masterRef)
      connected = true //       true
      if (conf.getBoolean("spark.ui.reverseProxy", false)) {
        logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
      }
      //             ,        master
      cancelLastRegistrationRetry()
    }
    

    2.11 handleExecutor 상태 변경 처리 실행 기 상태 변경
    private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
      Unit = {
      //    master  ExecutorStateChanged  
      sendToMaster(executorStateChanged)
      //   Executor  
      val state = executorStateChanged.state
      //        
      if (ExecutorState.isFinished(state)) {
        //    Executor   application id
        val appId = executorStateChanged.appId
        // appId/execId
        val fullId = appId + "/" + executorStateChanged.execId
        val message = executorStateChanged.message
        val exitStatus = executorStateChanged.exitStatus
        //  worker    executor id ExecuteRunner   ExecuteRunner
        executors.get(fullId) match {
          case Some(executor) =>
            logInfo("Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
            //      ExecuteRunner  executors    
            executors -= fullId
            //                 finishedExecutors
            finishedExecutors(fullId) = executor
            //     ,        executors
            trimFinishedExecutorsIfNecessary()
            //   CPU   
            coresUsed -= executor.cores
            memoryUsed -= executor.memory
          case None =>
            logInfo("Unknown Executor " + fullId + " finished with state " + state +
              message.map(" message " + _).getOrElse("") +
              exitStatus.map(" exitStatus " + _).getOrElse(""))
        }
        //         application    
        maybeCleanupApplication(appId)
      }
    }
    

    2.12 handleDriverStateChanged 처리 드라이버 상태 변경
    private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
      //   driver id
      val driverId = driverStateChanged.driverId
      val exception = driverStateChanged.exception
      //   driver   
      val state = driverStateChanged.state
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FAILED =>
          logWarning(s"Driver $driverId exited with failure")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
        case _ =>
          logDebug(s"Driver $driverId changed state to $state")
      }
      //  master  DriverStateChanged  
      sendToMaster(driverStateChanged)
      //  drivers    ,               finishedDrivers
      val driver = drivers.remove(driverId).get
      finishedDrivers(driverId) = driver
      //     ,        executors
      trimFinishedDriversIfNecessary()
      //   CPU   
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores
    }
    

    2.13 reregisterWithMaster 재 등록
    네트워크 이상 이나 master 에 실 패 했 을 때 master 에 다시 등록 해 야 합 니 다. 지정 한 횟수 를 초과 하면 worker 가 종료 합 니 다.
    private def reregisterWithMaster(): Unit = {
      Utils.tryOrExit {
        //           1
        connectionAttemptCount += 1
        //            ,            
        if (registered) {
          cancelLastRegistrationRetry()
        }
        //                   ,       ,    
        else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
          logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
          //     master    ,    ,     worker        ,    master    
          //       ,        master    
          master match {
            //   master  ,  registered  false,       master   ,          
            // Master RpcEndpoint
            case Some(masterRef) =>
              if (registerMasterFutures != null) {
                registerMasterFutures.foreach(_.cancel(true))
              }
              val masterAddress = masterRef.address
              registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
                override def run(): Unit = {
                  try {
                    logInfo("Connecting to master " + masterAddress + "...")
                    //     masterEndpoint
                    val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                    //       master         
                    registerWithMaster(masterEndpoint)
                  } catch {
                    case ie: InterruptedException => // Cancelled
                    case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                  }
                }
              }))
            //         master  ,         worker  
            case None =>
              if (registerMasterFutures != null) {
                registerMasterFutures.foreach(_.cancel(true))
              }
              registerMasterFutures = tryRegisterAllMasters()
          }
          //                ,               
          if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
            registrationRetryTimer.foreach(_.cancel(true))
            registrationRetryTimer = Some(
              forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                override def run(): Unit = Utils.tryLogNonFatalError {
                  self.send(ReregisterWithMaster)
                }
              }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
                PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
                TimeUnit.SECONDS))
          }
        } else {
          logError("All masters are unresponsive! Giving up.")
          System.exit(1)
        }
      }
    }

    좋은 웹페이지 즐겨찾기