Spark 구조 원리 - master 상태 변화 처리 메커니즘 원리 분석 및 소스 코드 분석
3778 단어 빅 데이터/스파크/스파크Core
1. Master 고장 으로 지연 되 어 새로운 Master 선 거 를 촉발 할 수 있 습 니 다.
Leader 를 다시 선택 하면 클 러 스 터 복구 가 이 뤄 지고 복구 과정 에서 Worker 와 AppClient 에 MasterChanged 메 시 지 를 보 냅 니 다.
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
// application, application, MasterChanged
for (app logInfo("App " + app.id + " had exception on reconnect")
}
}
// driver, master driver
for (driver logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
2. Worker 와 AppClient 는 Master 에서 온 Master Changed 메 시 지 를 받 습 니 다.
2.1 Worker 가 MasterChanged 메 시 지 를 받 고 있 습 니 다.
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// master url master, true,
changeMaster(masterRef, masterWebUiUrl)
// executors ExecutorDescription
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
// master WorkerSchedulerStateResponse ,
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
2.2 AppClient 에서 MasterChanged 메 시 지 를 받 고 있 습 니 다.
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// master
master = Some(masterRef)
alreadyDisconnected = false
// master MasterChangeAcknowledged
masterRef.send(MasterChangeAcknowledged(appId.get))
3. Master 는 Worker 와 AppClient 에서 온 소식 을 받 아들 일 것 입 니 다.
3.1 Master 가 Worker 의 Worker Scheduler State Response 메 시 지 를 받 고 있 습 니 다.
이것 은 새로운 master 이기 때문에 worker 는 다시 등록 한 다음 에 새로운 master 는 이전에 관련 된 프로그램 을 worker 에서 다시 복원 해 야 합 니 다.
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
// workerId worker
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
// worker alive
worker.state = WorkerState.ALIVE
// executor executor
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
// executors
for (exec
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
// completeRecovery , completeRecovery
if (canCompleteRecovery) { completeRecovery() }
3.2 Master 는 AppClient 의 MasterChange Acknowledged 소식 을 받 았 다.
case MasterChangeAcknowledged(appId) =>
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
// completeRecovery , completeRecovery
if (canCompleteRecovery) { completeRecovery() }
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming - OrdCount 프로그램텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.