Spark 구조 원리 - Worker 소스 코드 분석
22309 단어 빅 데이터/스파크/스파크Core
Worker 는 spark 의 작업 노드 로 주로 Master 명령 을 받 고 Executor, Driver 등 을 시작 하거나 죽 이 는 것 을 책임 집 니 다.Driver 나 Executor 상 태 를 Master 에 보고 하고 심장 박동 요청 을 Master 에 보 내 는 등.
1. 중요 속성
중요 한 방법
2.1 main 방법
def main(argStrings:Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
//
val args= new WorkerArguments(argStrings,conf)
// Rpc
val rpcEnv= startRpcEnvAndEndpoint(args.host,args.port,args.webUiPort,args.cores,
args.memory,args.masters,args.workDir,conf = conf)
rpcEnv.awaitTermination()
}
2.2 온 스타 트 시작 워 커
override def onStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
//
createWorkDir()
// ExternalShuffleService , start
shuffleService.startIfEnabled()
// worker web ui
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
// Master
registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
2.3 createWorkDir 작업 디 렉 터 리 만 들 기
/**
* worker
* app-20170613113959-0000
* app-20170613114457-0001
* app-20170613114710-0002
*/
private def createWorkDir() {
//
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
//
workDir.mkdirs()
// ,
if ( !workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
assert (workDir.isDirectory)
} catch {
case e: Exception =>
logError("Failed to create work directory " + workDir, e)
System.exit(1)
}
}
2.4 register With Master (): master 에 등록
private def registerWithMaster() {
registrationRetryTimer match {
// , ,
case None =>
// false
registered = false
// master
registerMasterFutures = tryRegisterAllMasters()
// 0
connectionAttemptCount = 0
// , ReregisterWithMaster , , ,
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
// registrationRetryTimer,
case Some(_) =>
}
}
2.5 try Register AllMasters 는 모든 그룹 내 모든 master 에 등록 하려 고 시도 합 니 다.
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
// master RpcEndpoint, master
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// master
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
2.6 registerWithMaster(masterEndpoint: RpcEndpointRef)
// master
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
// master RegisterWorker
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// , handleRegisterResponse
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
// ,
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
2.7 handle 레지스터 Response 처리 반전 함수 의 결과
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
// RegisteredWorker ,
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true // registered
changeMaster(masterRef, masterWebUiUrl)
// master
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
// cleanup , WorkDirCleanup ,
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
// worker executor ExecutorDescription , executor
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
// master WorkerLatestState , worker
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
// RegisterWorkerFailed ,
case RegisterWorkerFailed(message) =>
// ,
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
// MasterInStandby ,
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
2.8 receive 는 메 시 지 를 받 지만 결 과 를 되 돌 릴 필요 가 없다.
override def receive: PartialFunction[Any, Unit] = synchronized {
// SendHeartbeat , master
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
// WorkDirCleanup ,
case WorkDirCleanup =>
// executors app id
val appIds = executors.values.map(_.appId).toSet
// application , , Future
val cleanupFuture = concurrent.Future {
//
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
//
appDirs.filter { dir =>
val appIdFromDir = dir.getName //
val isAppStillRunning = appIds.contains(appIdFromDir) // application
// , ,
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
}(cleanupThreadExecutor)
cleanupFuture.onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}(cleanupThreadExecutor)
// MasterChanged , master
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// master url master, true,
changeMaster(masterRef, masterWebUiUrl)
// executors ExecutorDescription
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
// master WorkerSchedulerStateResponse ,
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
// ReconnectWorker , worker ,
case ReconnectWorker(masterUrl) =>
logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
// , master
registerWithMaster()
// LaunchExecutor , executor
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
// master
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// executor ,appId/execId
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// application , ,
val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq)
appDirectories(appId) = appLocalDirs
// ExecutorRunner , executor
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
// worker executor id->ExecutorRunner ExecutorRunner
executors(appId + "/" + execId) = manager
// ExecutorRunner
manager.start()
// cpu
coresUsed += cores_
memoryUsed += memory_
// master ExecutorStateChanged
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
// ExecutorStateChanged , executor
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
handleExecutorStateChanged(executorStateChanged)
// KillExecutor , executor
case KillExecutor(masterUrl, appId, execId) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to kill executor " + execId)
} else {
val fullId = appId + "/" + execId
executors.get(fullId) match {
case Some(executor) =>
logInfo("Asked to kill executor " + fullId)
executor.kill()
case None =>
logInfo("Asked to kill unknown executor " + fullId)
}
}
// LaunchDriver , Driver
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
// DriverRunner,
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
// drivers
drivers(driverId) = driver
// driver
driver.start()
// worker cpu
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
// KillDriver , driver
case KillDriver(driverId) =>
logInfo(s"Asked to kill driver $driverId")
drivers.get(driverId) match {
case Some(runner) =>
runner.kill()
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
// DriverStateChanged , driver
case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)
// ReregisterWithMaster , master
case ReregisterWithMaster =>
reregisterWithMaster()
// ApplicationFinished , application
case ApplicationFinished(id) =>
finishedApps += id
// application
maybeCleanupApplication(id)
}
2.9 receiveAndReply 메 시 지 를 받 고 결 과 를 되 돌려 줍 니 다.
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// RequestWorkerState , worker
case RequestWorkerState =>
context.reply(WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, drivers.values.toList,
finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl))
}
2.10 changeMaster
새로운 master 의 url 과 master 를 가 져 옵 니 다. 이전 재 등록 시 도 를 취소 합 니 다. 새로운 master 가 발견 되 었 기 때 문 입 니 다.
private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
// master url master
activeMasterUrl = masterRef.address.toSparkURL
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true // true
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
}
// , master
cancelLastRegistrationRetry()
}
2.11 handleExecutor 상태 변경 처리 실행 기 상태 변경
private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
Unit = {
// master ExecutorStateChanged
sendToMaster(executorStateChanged)
// Executor
val state = executorStateChanged.state
//
if (ExecutorState.isFinished(state)) {
// Executor application id
val appId = executorStateChanged.appId
// appId/execId
val fullId = appId + "/" + executorStateChanged.execId
val message = executorStateChanged.message
val exitStatus = executorStateChanged.exitStatus
// worker executor id ExecuteRunner ExecuteRunner
executors.get(fullId) match {
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
// ExecuteRunner executors
executors -= fullId
// finishedExecutors
finishedExecutors(fullId) = executor
// , executors
trimFinishedExecutorsIfNecessary()
// CPU
coresUsed -= executor.cores
memoryUsed -= executor.memory
case None =>
logInfo("Unknown Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
// application
maybeCleanupApplication(appId)
}
}
2.12 handleDriverStateChanged 처리 드라이버 상태 변경
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
// driver id
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
// driver
val state = driverStateChanged.state
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
// master DriverStateChanged
sendToMaster(driverStateChanged)
// drivers , finishedDrivers
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
// , executors
trimFinishedDriversIfNecessary()
// CPU
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}
2.13 reregisterWithMaster 재 등록
네트워크 이상 이나 master 에 실 패 했 을 때 master 에 다시 등록 해 야 합 니 다. 지정 한 횟수 를 초과 하면 worker 가 종료 합 니 다.
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
// 1
connectionAttemptCount += 1
// ,
if (registered) {
cancelLastRegistrationRetry()
}
// , ,
else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
// master , , worker , master
// , master
master match {
// master , registered false, master ,
// Master RpcEndpoint
case Some(masterRef) =>
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
val masterAddress = masterRef.address
registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
// masterEndpoint
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// master
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
}))
// master , worker
case None =>
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
registerMasterFutures = tryRegisterAllMasters()
}
// ,
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer = Some(
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ReregisterWithMaster)
}
}, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
}
} else {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming - OrdCount 프로그램텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.