Spark 소스 읽기 (1) - Spark 초기화

4403 단어
1. 워드 카운트부터
코드는 다음과 같습니다.
  1
    def main(args:Array[String]){
        val sparkConf = new SparkConf().setAppName("WordCount")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("README.md",1)
        val words = lines.flatMap(line => line.split(" "))
        val wordOne = words.map(word => (word,1))
        val wordCount = wordOne.reduceByKey(_ + _)
        wordCount.foreach(println)

1.1 SparkConf 만들기
SparkConf는 Spark가 실행될 때의 설정 정보를 가지고 있으며 SparkConf 구조 방법에 파라미터를 제공하지 않습니다. 기본 파라미터를 사용하려면 아래 코드를 호출해서 기본 설정을 불러옵니다. (spark.로 자바 환경 변수를 시작합니다.)
 if (loadDefaults) {
     loadFromSystemProperties(false)
 }
    
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
           // Load any spark.* system properties
           for ((key, value) 

SparkConf 내부 사용 private val settings = new ConcurrentHashMap[String, String]() 은name에서value로 설정된 맵을 저장합니다.
1.2 SparkContext 만들기
SparkContext는 RDD를 바탕으로 하는 프로그래밍 모델의 핵심이자 Spark 응용 프로그램을 개발하는 입구이다. 데이터 원본에서 RDD를 만들면job가 실행될 때 SparkContext를 사용한다.Spark Streaming과 Spark SQL 프로그래밍에서 Streaming Context(Spark Streaming)와 SQLcontext를 먼저 만들 것이다. 실제로 내부는 모두 Spark Context를 가지고 있다. 왜냐하면streaming과 sql는 최종적으로 RDD의 작업으로 전환될 것이다.SparkContext는 실례화할 때 여러 개의 구성 요소의 생성과 관련되기 때문에, 여기서는 SparkContext만 대략적으로 이야기하고, 뒤에는 모든 구성 요소의 작업만 세밀하게 이야기할 것이다.
SparkContext가 제공하는 기본 방법은 코드 블록 1val lines = sc.textFile("README.md",1)에서 파일README.md에서 하나의partition만 있는 RDD를 만드는 것입니다. 이것은 SparkContext가 제공하는 기본 기능으로 데이터 원본에서 RDD를 만드는 것입니다.
SparkContext는hadoop,text,sequenceFile 및 집합 등 다양한 데이터 원본에서 RDD를 만드는 일련의 방법을 제공합니다.
이외에SparkContext는 일부runJob 방법으로 작업을 제출했다. 코드1에서wordCount.foreach(println) 이런 action 조작을 통해job의 제출을 촉발하는 것은SparkContext#runJob을 호출해서 제출하는 것이다. 물론 최종적으로DAGscheduler 구성 요소를 통해job의 제출을 실현한다(Sparkjob 제출 참조)
SparkContext 초기화 프로세스
  • JobProgressListener 생성
  •  _jobProgressListener = new JobProgressListener(_conf)
     listenerBus.addListener(jobProgressListener)
    

    Job Progress Listener가 listener Bus에 등록하면 spark 집단에서 Job 상태 변화, Stage 변화,executor 변화 등 정보는 모두 등록된 XXXXListener가 처리한다. 이 Job Progree Listener는 말 그대로 Job의 실행 과정 중의 정보를 추적하는데 이런 정보는 Spark UI에 사용된다.Listener Bus는 여러 개의 listener를 등록할 수 있으며, listener Bus에 보내는 이벤트 (Event) 는 각listener가 처리합니다.
  • SparkEnvSparkEnv를 만드는 것은 매우 중요한 종류로 Spark job이 실행되는 동안 저장 블록 관리(BlockManager), 작업 상태 관리(Map OutputTracker), 데이터 블록 전송 서비스(BlockTransfer Service), 각Task 출력 중간 파일 관리 등 서비스를 가지고 있다.Spark job을 만들고 각excutor에 제출하여 실행할 때 Spark Env는driver단과executor단에서 만들지만driver단과executor단은 구체적인 실례화 과정이 다르다. 간단하게 말하면 위에서 언급한 Spark Env가 가지고 있는 각종 서비스는 기본적으로 서버-client의 모델이고driver단은 rpc 서버를 실행하고executor는 rpc client를 실행한다. 예를 들어executor가 임무를 보고하고자 하는 상태,rpc client를 통해 호출을 시작하여driver 측에 보고합니다.스파크에 대한 rpc는 스파크 rpc를 참고하여 실현할 수 있습니다.

  • SparkEnv는 다음 섹션에서 설명합니다.
  • TaskScheduler와 SchedulerBackend
  • 만들기
     val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
    

    Scheduler Backend는 일부 excutor 정보(executorId, 소재host, 분배된 cpu 수량 등)를 가지고 있으며,Task Scheduler와 Scheduler Backend는 일정한 규칙에 따라task를excutor에 전송하여 운행하고,Task Scheduler와Scheduler Backend에 대해 구체적으로 참고한다.
  • DAGscheduler
  • 작성
        _dagScheduler = new DAGScheduler(this)
    

    한 job은 RDD 변환에 대응하는 DAG입니다. job이 제출할 때 먼저 DAGscheduler에서 DAG를 Stage로 변환하고 Stage에서 task를 생성합니다.task는TaskScheduler를 통해 제출하고DAGscheduler는spark job를 참고하여 제출합니다.
  • ExecutorAllocationManager
  • 생성 및 시작
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
        _executorAllocationManager =
          if (dynamicAllocationEnabled) {
            schedulerBackend match {
              case b: ExecutorAllocationClient =>
                Some(new ExecutorAllocationManager(
                  schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
              case _ =>
                None
            }
          } else {
            None
          }
        _executorAllocationManager.foreach(_.start())
    

    이것은job가 실행되는 동안task의 운행 진도를 감시하고 동적 확장이 필요한지 여부를 판단합니다. 설정 항목 spark.dynamicAllocation.enabled 에 따라true이고 현재spark배치 모드가local이 아닌 상황에서 작용합니다.
    1.2.1 SparkEnv
    ???????????????????

    좋은 웹페이지 즐겨찾기