How to establish a connection between the Master, Worker and Application of Spark analysis
7042 단어 application
Master.preStart(){
webUi.bind()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) // DEAD WORKER
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
...
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
workers -= worker
}
}
}
Worker.preStart(){
override def preStart() {
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.bind()
registerWithMaster() // Worker Master
}
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
}
Master.scala
case RegisterWorker(){
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl) // Worker Worker
schedule() //
}
Worker.scala
case RegisteredWorker(){
registered = true
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) //Worker , Master
}
case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
Master.scala
case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis() // worker
case None =>
logWarning("Got heartbeat from unregistered worker " + workerId)
}
}
================== The above steps complete the connection between Worker and Master ========================= =========================
When SparkContext starts:
SparkContext.createTaskScheduler()
==>new SparkDeploySchedulerBackend()
==> AppClient
==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)} // Master RegisterApplication
Master.scala
case RegisterApplication(description) {
val app = createApplication(description, sender)
registerApplication(app)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl) // Worker RegisteredApplication Application
schedule() //
}
========================The above steps complete the connection between Application and Master==================== ===============================
Summary:
1. The main functions of Master:
1) Master Leader election;
2) Master's management of workers, applications, etc. (receives the registration of workers and manages all workers, receives applications submitted by clients, (FIFO) schedules waiting applications and submits them to workers);
2. Main functions of Worker:
1) Register to Master through RegisterWorker;
2) Send heartbeat to Master regularly;
3) Configure the process environment according to the application sent by the master, and start
StandaloneExecutorBackend
3. Run spark-shell:1) ClientActor registers to Master through RegisterApplication;
2) After the Master receives the RegisterApplication, it uses the scheduler method for scheduling. If there are workers that meet the requirements, it sends the LaunchExecutor to the corresponding Worker;
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Pre-Query SamplesValidate the current query criteria or provide additional query criteria programmatically, just before sending the SELEC...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.