How to establish a connection between the Master, Worker and Application of Spark analysis

7042 단어 application
Master.preStart(){



  webUi.bind()

  context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) // DEAD WORKER 



  case CheckForWorkerTimeOut => {

    timeOutDeadWorkers()

  }



  /** Check for, and remove, any timed-out workers */  

  def timeOutDeadWorkers() {

    ...

    if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {

      workers -= worker 

    }

  }



}


Worker.preStart(){



  override def preStart() {

    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))

    webUi.bind()

    registerWithMaster()  // Worker Master

  }



  def tryRegisterAllMasters() {

    for (masterUrl <- masterUrls) {

      logInfo("Connecting to master " + masterUrl + "...")

      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))

      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

    }

  }



}

 

Master.scala



case RegisterWorker(){  



  persistenceEngine.addWorker(worker)

  sender ! RegisteredWorker(masterUrl, masterWebUiUrl)  // Worker Worker 



  schedule()  //    



}


Worker.scala



case RegisteredWorker(){



  registered = true

  context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)  //Worker , Master 



}



case SendHeartbeat =>

  masterLock.synchronized {

  if (connected) { master ! Heartbeat(workerId) }

}


Master.scala



case Heartbeat(workerId) => {

  idToWorker.get(workerId) match {

  case Some(workerInfo) =>

    workerInfo.lastHeartbeat = System.currentTimeMillis()  // worker 

  case None =>

    logWarning("Got heartbeat from unregistered worker " + workerId)

  }

}

 
================== The above steps complete the connection between Worker and Master ========================= =========================
 
When SparkContext starts:

SparkContext.createTaskScheduler()



  ==>new SparkDeploySchedulerBackend()



    ==> AppClient 



      ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)}  // Master RegisterApplication 


Master.scala



case RegisterApplication(description) {



  val app = createApplication(description, sender)



  registerApplication(app)

  persistenceEngine.addApplication(app)

  sender ! RegisteredApplication(app.id, masterUrl)  // Worker RegisteredApplication Application 

  schedule()  // 
}

========================The above steps complete the connection between Application and Master==================== ===============================
 
Summary:
1. The main functions of Master:
1) Master Leader election;
2) Master's management of workers, applications, etc. (receives the registration of workers and manages all workers, receives applications submitted by clients, (FIFO) schedules waiting applications and submits them to workers);
2. Main functions of Worker:
1) Register to Master through RegisterWorker;
2) Send heartbeat to Master regularly;
3) Configure the process environment according to the application sent by the master, and start StandaloneExecutorBackend 3. Run spark-shell:
1) ClientActor registers to Master through RegisterApplication;
2) After the Master receives the RegisterApplication, it uses the scheduler method for scheduling. If there are workers that meet the requirements, it sends the LaunchExecutor to the corresponding Worker;
 
 
 
 

좋은 웹페이지 즐겨찾기