Spark 구조 원리 - master 상태 변화 처리 메커니즘 원리 분석 및 소스 코드 분석

원본 주소:https://blog.csdn.net/zhanglh046/article/details/78485818
1. Master 고장 으로 지연 되 어 새로운 Master 선 거 를 촉발 할 수 있 습 니 다.
Leader 를 다시 선택 하면 클 러 스 터 복구 가 이 뤄 지고 복구 과정 에서 Worker 와 AppClient 에 MasterChanged 메 시 지 를 보 냅 니 다.
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
    storedWorkers: Seq[WorkerInfo]) {
  //         application,   application,    MasterChanged  
  for (app  logInfo("App " + app.id + " had exception on reconnect")
    }
  }
  //         driver,   master    driver  
  for (driver  logInfo("Worker " + worker.id + " had exception on reconnect")
    }
  }
}

2. Worker 와 AppClient 는 Master 에서 온 Master Changed 메 시 지 를 받 습 니 다.
2.1 Worker 가 MasterChanged 메 시 지 를 받 고 있 습 니 다.
  • 새로운 master 의 url 과 master 를 가 져 옵 니 다. 연결 상 태 는 true 로 설정 되 어 있 으 며, 이전 시 도 를 취소 하고 다시 등록 합 니 다.
  • 새로운 master 에 Worker Scheduler State Response 메 시 지 를 보 내 고 작업 을 합 니 다.
  • case MasterChanged(masterRef, masterWebUiUrl) =>
      logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
      //     master url master,      true,           
      changeMaster(masterRef, masterWebUiUrl)
      //       executors       ExecutorDescription
      val execs = executors.values.
        map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
      //    master  WorkerSchedulerStateResponse  ,        
      masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
    

    2.2 AppClient 에서 MasterChanged 메 시 지 를 받 고 있 습 니 다.
  • master 업데이트
  • 새로운 master 에 MasterChange Acknowledged 메 시 지 를 보 냅 니 다
  • case MasterChanged(masterRef, masterWebUiUrl) =>
      logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
      //   master
      master = Some(masterRef)
      alreadyDisconnected = false
      //    master  MasterChangeAcknowledged  
      masterRef.send(MasterChangeAcknowledged(appId.get))
    

    3. Master 는 Worker 와 AppClient 에서 온 소식 을 받 아들 일 것 입 니 다.
    3.1 Master 가 Worker 의 Worker Scheduler State Response 메 시 지 를 받 고 있 습 니 다.
    이것 은 새로운 master 이기 때문에 worker 는 다시 등록 한 다음 에 새로운 master 는 이전에 관련 된 프로그램 을 worker 에서 다시 복원 해 야 합 니 다.
    case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
      //   workerId  worker
      idToWorker.get(workerId) match {
        case Some(worker) =>
          logInfo("Worker has been re-registered: " + workerId)
          // worker    alive
          worker.state = WorkerState.ALIVE
          //     executor          executor
          val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
          //      executors
          for (exec 
              driver.worker = Some(worker)
              driver.state = DriverState.RUNNING
              worker.drivers(driverId) = driver
            }
          }
        case None =>
          logWarning("Scheduler state from unknown worker: " + workerId)
      }
      //           completeRecovery  ,      completeRecovery  
      if (canCompleteRecovery) { completeRecovery() }
    

    3.2 Master 는 AppClient 의 MasterChange Acknowledged 소식 을 받 았 다.
  • 애플 리 케 이 션 상 태 를 WAITTING 으로 업데이트 합 니 다.
  • 현재 complete Recovery 작업 을 할 수 있 는 지 판단 하고 complete Recovery 작업 을 할 수 있다 면.
  • case MasterChangeAcknowledged(appId) =>
      idToApp.get(appId) match {
        case Some(app) =>
          logInfo("Application has been re-registered: " + appId)
          app.state = ApplicationState.WAITING
        case None =>
          logWarning("Master change ack from unknown app: " + appId)
      }
        //           completeRecovery  ,      completeRecovery  
      if (canCompleteRecovery) { completeRecovery() }

    좋은 웹페이지 즐겨찾기