spark 소스 코드 분석 - master 프로 세 스 분석
69494 단어 spark빅 데이터spark 소스 코드 분석소스 코드
1. 개념
master 는 spark 의 주요 메타 데 이 터 를 관리 하고 클 러 스 터, 자원 관리 등에 사용 합 니 다.
2. master 시작 과정
2.1 Master. main 방법
start - master. sh 스 크 립 트 에서 최종 호출 된 것 은
org.apache.spark.deploy.master.Master
main 방법 임 을 알 수 있 습 니 다.이제 이 방법 을 분석 해 보 자. def main(argStrings: Array[String]) {
//
Utils.initDaemon(log)
// spark
val conf = new SparkConf
// master , , :--host ,--webui-port
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) =
// master ( )
startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
2.2 Master. startRpcEnvAndEndpoint 방법
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
//
val securityMgr = new SecurityManager(conf)
// rpc , netty
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
// master , 【1】
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
// Master , 【2】
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
핵심 위치 분석: 【 1 】
Dispatcher.scala
----------------------------
/**
* rpc
* @param name
* @param endpoint
* @return
*/
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
// rpc ,
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
// endpoint
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
// endpoint
endpointRefs.put(data.endpoint, data.ref)
// ,
receivers.offer(data) // for the OnStart message
}
endpointRef
}
위 에 가장 핵심 적 인 코드 가 있 습 니 다.
receivers.offer(data)
요청 한 데 이 터 를 receivers 대기 열 에 넣 는 것 처럼 보이 지만 정시 작업 처리 요청 을 실행 합 니 다. 자세 한 내용 은 다음 과 같 습 니 다.
Dispatcher.scala
-------------------
/** MessageLoop run */
private val threadpool: ThreadPoolExecutor = {
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, Runtime.getRuntime.availableProcessors()))
//
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
//
while (true) {
try {
val data = receivers.take()
//
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
//
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
위의
data.inbox.process(Dispatcher.this)
를 설명 하기 위해 data. inbox 속성 을 중점적으로 소개 합 니 다.Dispatcher.scala
-------------------
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
// , Inbox
val inbox = new Inbox(ref, endpoint)
}
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
// , , Dispatcher.receivers ,
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
/** True if the inbox (and its associated endpoint) is stopped. */
//
@GuardedBy("this")
private var stopped = false
/** Allow multiple threads to process messages at the same time. */
//
@GuardedBy("this")
private var enableConcurrent = false
/** The number of threads processing messages for this inbox. */
// inbox
@GuardedBy("this")
private var numActiveThreads = 0
// OnStart should be the first message to process
// Inbox , OnStart
inbox.synchronized {
messages.add(OnStart)
}
위의 분석 을 통 해 알 수 있 듯 이 Endpoint Data 대상 을 만 들 때마다 OnStart 메 시 지 를 inbox 대상 에 추가 합 니 다.그래서 등록 할 때
receivers.offer(data)
OnStart 메 시 지 를 추가 하여 처 리 를 기다 리 고 있 습 니 다. 이제 진정한 처리 방법 (즉, data. inbox. process (Dispatcher. this) 을 살 펴 보 겠 습 니 다. def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
//
if (!enableConcurrent && numActiveThreads != 0) {
return
}
//
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
/**
*
*/
message match {
.......
// OnStart
case OnStart =>
// endpoint Master , Master.onStart
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
.......
}
}
.......
}
이 어 위 에서 분석 한 리듬 에 이 어 Master. onStart 방법 을 분석 해 보 자.
Master.scala
----------------------
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
// jetty web ui
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
// rest server, rest , master
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
// ( , )
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
// web ui
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
// ------------ master HA , ---------------
// java ,
val serializer = new JavaSerializer(conf)
// , leader
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
// ZOOKEEPER, zookeeper
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
// ,
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
// , ,
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
//
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
그 중에서 master. onStart 는 매우 간단 합 니 다. 바로 감청 서 비 스 를 만 들 고 ui 포트 를 방문 하여 master HA 복구 모델 에 이렇게 많은 것 을 소 개 했 는 지 확인 하 는 것 입 니 다. 사실은
startRpcEnvAndEndpoint
방법 중의 핵심 코드 중 하나 인 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
만 소 개 했 습 니 다. 지금 소개 하 겠 습 니 다. val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
【2】: RpcEndpointRef.scala
-----------------------
/**
*
*/
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
//
while (attempts < maxRetries) {
attempts += 1
try {
// ( )
val future = ask[T](message, timeout)
//
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("RpcEndpoint returned null")
}
return result
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}
//
if (attempts < maxRetries) {
Thread.sleep(retryWaitMs)
}
}
throw new SparkException(
s"Error sending message [message = $message]", lastException)
}
요청 코드 처리
ask[T](message, timeout)
(message = BoundPorts Request) 를 분석 해 보 세 요.NettyRpcEnv.scala
---------------------
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
val promise = Promise[Any]()
//
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
logWarning(s"Ignored failure: $e")
}
}
def onSuccess(reply: Any): Unit = reply match {
case RpcFailure(e) => onFailure(e)
case rpcReply =>
if (!promise.trySuccess(rpcReply)) {
logWarning(s"Ignored message: $reply")
}
}
try {
//
if (remoteAddr == address) {
val p = Promise[Any]()
//
p.future.onComplete {
// , onSuccess ,promise.future
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
//
dispatcher.postLocalMessage(message, p)
} else {
// rpc
val rpcMessage = RpcOutboxMessage(serialize(message),
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
//
postToOutbox(message.receiver, rpcMessage)
promise.future.onFailure {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
}
//
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete {
v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
onFailure(e)
}
// , T ;
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}
위의 코드 는 매우 길지 만 주로 두 가지 요청 수신 자 를 구분 합 니 다. (1) reoteAddr = = address, 요청 과 수신 자 는 서버 의 핵심 코드 입 니 다.
dispatcher.postLocalMessage(message, p)
(2) remoteAddr != address, 서로 다른 서버 핵심 코드 는: postToOutbox(message.receiver, rpcMessage)
하지만 master 가 시작 되 기 때문에 보통 이 컴퓨터 에서 실행 되 기 때문에 먼저 reoteAddr = = address 의 요청 상황 을 분석 하고 나중에 outbox 처 리 를 소개 합 니 다.그 다음 에 저 는 이 코드 를 차례대로 분석 하고 보고 싶 습 니 다.
dispatcher.postLocalMessage(message, p)
메시지 배포 기 를 통 해 message 를 본 컴퓨터 에 보 내 는 것 을 표시 합 니 다.Dispatcher.scala
-------------------
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
val rpcCallContext =
new LocalNettyRpcCallContext(message.senderAddress, p)
// rpc
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
// **
postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
}
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
// inbox , receivers
data.inbox.post(message)
receivers.offer(data)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
이 코드 는 익숙 하지 않 습 니까? 사실은 message 를 endpoint 의 inbox 에 보 낸 다음 에 정기 적 으로 요청 을 처리 하 는 것 입 니 다.앞의 분석 에 따 르 면 최종 호출
inbox.process
방법 에 해당 하 는 것 을 알 수 있 습 니 다. 요청 유형 은 RpcMessage 즉: def process(dispatcher: Dispatcher): Unit = {
..... ,
message match {
case RpcMessage(_sender, content, context) =>
try {
// endpoint = master master.receiveAndReply
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, {
msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
......
Master.scala ->
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
.......
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
......
Master endpoint 는 BoundPorts Request 요청 처리 논리 가 매우 간단 하여 설명 을 많이 하지 않 습 니 다.
이로써 마스터 시작 과 관련 된 핵심 대상 과 방법 을 소개 했다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.