spark 소스 코드 분석 - master 프로 세 스 분석

spark 버 전: 2.0.0
1. 개념
master 는 spark 의 주요 메타 데 이 터 를 관리 하고 클 러 스 터, 자원 관리 등에 사용 합 니 다.
2. master 시작 과정
2.1 Master. main 방법
start - master. sh 스 크 립 트 에서 최종 호출 된 것 은 org.apache.spark.deploy.master.Master main 방법 임 을 알 수 있 습 니 다.이제 이 방법 을 분석 해 보 자.
  def main(argStrings: Array[String]) {
     
  //   
    Utils.initDaemon(log)
    // spark     
    val conf = new SparkConf
    // master    ,        ,  :--host ,--webui-port 
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) =
    //   master   (    )
    startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }


2.2 Master. startRpcEnvAndEndpoint 방법
 def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
     
    //      
    val securityMgr = new SecurityManager(conf)
    //   rpc    ,     netty
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    //   master   ,         【1】   
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    //  Master         ,         【2】
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)

    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

핵심 위치 분석: 【 1 】
Dispatcher.scala
----------------------------
  /**
    *   rpc   
    * @param name
    * @param endpoint
    * @return
    */
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
     
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    //   rpc      ,      
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
     
      if (stopped) {
     
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      //   endpoint            
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
     
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      //   endpoint  
      endpointRefs.put(data.endpoint, data.ref)
      //           ,         
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

위 에 가장 핵심 적 인 코드 가 있 습 니 다.
receivers.offer(data)

요청 한 데 이 터 를 receivers 대기 열 에 넣 는 것 처럼 보이 지만 정시 작업 처리 요청 을 실행 합 니 다. 자세 한 내용 은 다음 과 같 습 니 다.
Dispatcher.scala
-------------------

 /**         MessageLoop run   */
  private val threadpool: ThreadPoolExecutor = {
     
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, Runtime.getRuntime.availableProcessors()))
    //           
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
     
      pool.execute(new MessageLoop)
    }
    pool
  }

  /** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
     
    override def run(): Unit = {
     
      try {
     
      //     
        while (true) {
     
          try {
     
            val data = receivers.take()
            //     
            if (data == PoisonPill) {
     
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            //         
            data.inbox.process(Dispatcher.this)
          } catch {
     
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
     
        case ie: InterruptedException => // exit
      }
    }
  }
  

위의 data.inbox.process(Dispatcher.this) 를 설명 하기 위해 data. inbox 속성 을 중점적으로 소개 합 니 다.
Dispatcher.scala
-------------------


  private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
     
      //           ,      Inbox  
    val inbox = new Inbox(ref, endpoint)
  }
  

private[netty] class Inbox(
    val endpointRef: NettyRpcEndpointRef,
    val endpoint: RpcEndpoint)
  extends Logging {
     

  inbox =>  // Give this an alias so we can use it more clearly in closures.

  //     ,              ,      Dispatcher.receivers ,         
  @GuardedBy("this")
  protected val messages = new java.util.LinkedList[InboxMessage]()

  /** True if the inbox (and its associated endpoint) is stopped. */
  //         
  @GuardedBy("this")
  private var stopped = false

  /** Allow multiple threads to process messages at the same time. */
  //       
  @GuardedBy("this")
  private var enableConcurrent = false

  /** The number of threads processing messages for this inbox. */
  // inbox      
  @GuardedBy("this")
  private var numActiveThreads = 0

  // OnStart should be the first message to process
  //     Inbox   ,       OnStart  
  inbox.synchronized {
     
    messages.add(OnStart)
  }

위의 분석 을 통 해 알 수 있 듯 이 Endpoint Data 대상 을 만 들 때마다 OnStart 메 시 지 를 inbox 대상 에 추가 합 니 다.그래서 등록 할 때 receivers.offer(data) OnStart 메 시 지 를 추가 하여 처 리 를 기다 리 고 있 습 니 다. 이제 진정한 처리 방법 (즉, data. inbox. process (Dispatcher. this) 을 살 펴 보 겠 습 니 다.
  def process(dispatcher: Dispatcher): Unit = {
     
    var message: InboxMessage = null
    inbox.synchronized {
     
      //       
      if (!enableConcurrent && numActiveThreads != 0) {
     
        return
      }
      //     
      message = messages.poll()
      if (message != null) {
     
        numActiveThreads += 1
      } else {
     
        return
      }
    }
    while (true) {
     
      safelyCall(endpoint) {
     
        /**
          *          
          */
        message match {
     
           .......
           //        OnStart    
          case OnStart =>
            //    endpoint Master  ,      Master.onStart   
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
     
              inbox.synchronized {
     
                if (!stopped) {
     
                  enableConcurrent = true
                }
              }
            }
           .......
        }
      }
 .......
   
  }


이 어 위 에서 분석 한 리듬 에 이 어 Master. onStart 방법 을 분석 해 보 자.
Master.scala
----------------------


 override def onStart(): Unit = {
     
    logInfo("Starting Spark master at " + masterUrl)
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    //   jetty  web ui    
    webUi = new MasterWebUI(this, webUiPort)
    webUi.bind()
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
    //     
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
     
      override def run(): Unit = Utils.tryLogNonFatalError {
     
        self.send(CheckForWorkerTimeOut)
      }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    //      rest server,    rest  ,        master      
    if (restServerEnabled) {
     
      val port = conf.getInt("spark.master.rest.port", 6066)
      restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
    }
    restServerBoundPort = restServer.map(_.start())
    //     (    ,      )
    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()
    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
    // started.
    //         web ui 
    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)


// ------------    master HA  ,      ---------------
    //    java     ,         
    val serializer = new JavaSerializer(conf)
    //         ,      leader  
    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
     
      //        ZOOKEEPER,    zookeeper        
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      //            ,                
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      //           ,             ,                
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      //       
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
  }

그 중에서 master. onStart 는 매우 간단 합 니 다. 바로 감청 서 비 스 를 만 들 고 ui 포트 를 방문 하여 master HA 복구 모델 에 이렇게 많은 것 을 소 개 했 는 지 확인 하 는 것 입 니 다. 사실은 startRpcEnvAndEndpoint 방법 중의 핵심 코드 중 하나 인 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) 만 소 개 했 습 니 다. 지금 소개 하 겠 습 니 다. val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)【2】:
RpcEndpointRef.scala
-----------------------



  /**
   *               
   */
  def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
     
    // TODO: Consider removing multiple attempts
    var attempts = 0
    var lastException: Exception = null
    //             
    while (attempts < maxRetries) {
     
      attempts += 1
      try {
     
        //     (  )
        val future = ask[T](message, timeout)
        //       
        val result = timeout.awaitResult(future)
        if (result == null) {
     
          throw new SparkException("RpcEndpoint returned null")
        }
        return result
      } catch {
     
        case ie: InterruptedException => throw ie
        case e: Exception =>
          lastException = e
          logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
      }
      //            
      if (attempts < maxRetries) {
     
        Thread.sleep(retryWaitMs)
      }
    }

    throw new SparkException(
      s"Error sending message [message = $message]", lastException)
  }

요청 코드 처리 ask[T](message, timeout) (message = BoundPorts Request) 를 분석 해 보 세 요.
NettyRpcEnv.scala 
---------------------


  private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
     
    val promise = Promise[Any]()
    //     
    val remoteAddr = message.receiver.address

    def onFailure(e: Throwable): Unit = {
     
      if (!promise.tryFailure(e)) {
     
        logWarning(s"Ignored failure: $e")
      }
    }

    def onSuccess(reply: Any): Unit = reply match {
     
      case RpcFailure(e) => onFailure(e)
      case rpcReply =>
        if (!promise.trySuccess(rpcReply)) {
     
          logWarning(s"Ignored message: $reply")
        }
    }

    try {
     
      //             
      if (remoteAddr == address) {
     
        val p = Promise[Any]()
        //       
        p.future.onComplete {
     
          //     ,   onSuccess  ,promise.future         
          case Success(response) => onSuccess(response)
          case Failure(e) => onFailure(e)
        }(ThreadUtils.sameThread)
        //       
        dispatcher.postLocalMessage(message, p)
      } else {
     
        //   rpc    
        val rpcMessage = RpcOutboxMessage(serialize(message),
          onFailure,
          (client, response) => onSuccess(deserialize[Any](client, response)))
        //
        postToOutbox(message.receiver, rpcMessage)
        promise.future.onFailure {
     
          case _: TimeoutException => rpcMessage.onTimeout()
          case _ =>
        }(ThreadUtils.sameThread)
      }
      //     
      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
     
        override def run(): Unit = {
     
          onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
        }
      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
      promise.future.onComplete {
      v =>
        timeoutCancelable.cancel(true)
      }(ThreadUtils.sameThread)
    } catch {
     
      case NonFatal(e) =>
        onFailure(e)
    }
    //          ,     T    ;          
    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
  }

위의 코드 는 매우 길지 만 주로 두 가지 요청 수신 자 를 구분 합 니 다. (1) reoteAddr = = address, 요청 과 수신 자 는 서버 의 핵심 코드 입 니 다. dispatcher.postLocalMessage(message, p)(2) remoteAddr != address, 서로 다른 서버 핵심 코드 는: postToOutbox(message.receiver, rpcMessage) 하지만 master 가 시작 되 기 때문에 보통 이 컴퓨터 에서 실행 되 기 때문에 먼저 reoteAddr = = address 의 요청 상황 을 분석 하고 나중에 outbox 처 리 를 소개 합 니 다.
그 다음 에 저 는 이 코드 를 차례대로 분석 하고 보고 싶 습 니 다. dispatcher.postLocalMessage(message, p) 메시지 배포 기 를 통 해 message 를 본 컴퓨터 에 보 내 는 것 을 표시 합 니 다.
Dispatcher.scala 
-------------------


  def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
     
    val rpcCallContext =
      new LocalNettyRpcCallContext(message.senderAddress, p)
    //   rpc    
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    //     **
    postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
  }


  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
     
    val error = synchronized {
     
      val data = endpoints.get(endpointName)
      if (stopped) {
     
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
     
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
     
        //          inbox       ,    receivers        
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

이 코드 는 익숙 하지 않 습 니까? 사실은 message 를 endpoint 의 inbox 에 보 낸 다음 에 정기 적 으로 요청 을 처리 하 는 것 입 니 다.앞의 분석 에 따 르 면 최종 호출 inbox.process 방법 에 해당 하 는 것 을 알 수 있 습 니 다. 요청 유형 은 RpcMessage 즉:
 def process(dispatcher: Dispatcher): Unit = {
     
 .....       ,         
  message match {
     
          case RpcMessage(_sender, content, context) =>
            try {
     
            //   endpoint = master    master.receiveAndReply  
              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, {
      msg =>
                throw new SparkException(s"Unsupported message $message from ${_sender}")
              })
            } catch {
     
              case NonFatal(e) =>
                context.sendFailure(e)
                // Throw the exception -- this exception will be caught by the safelyCall function.
                // The endpoint's onError function will be called.
                throw e
            }
......



Master.scala  -> 

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     
.......
    case BoundPortsRequest =>
      context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
......

Master endpoint 는 BoundPorts Request 요청 처리 논리 가 매우 간단 하여 설명 을 많이 하지 않 습 니 다.
이로써 마스터 시작 과 관련 된 핵심 대상 과 방법 을 소개 했다.

좋은 웹페이지 즐겨찾기