Spark 소스 읽기 (1) - Spark 초기화
코드는 다음과 같습니다.
1
def main(args:Array[String]){
val sparkConf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("README.md",1)
val words = lines.flatMap(line => line.split(" "))
val wordOne = words.map(word => (word,1))
val wordCount = wordOne.reduceByKey(_ + _)
wordCount.foreach(println)
1.1 SparkConf 만들기
SparkConf는 Spark가 실행될 때의 설정 정보를 가지고 있으며 SparkConf 구조 방법에 파라미터를 제공하지 않습니다. 기본 파라미터를 사용하려면 아래 코드를 호출해서 기본 설정을 불러옵니다. (spark.로 자바 환경 변수를 시작합니다.)
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value)
SparkConf 내부 사용
private val settings = new ConcurrentHashMap[String, String]()
은name에서value로 설정된 맵을 저장합니다.1.2 SparkContext 만들기
SparkContext는 RDD를 바탕으로 하는 프로그래밍 모델의 핵심이자 Spark 응용 프로그램을 개발하는 입구이다. 데이터 원본에서 RDD를 만들면job가 실행될 때 SparkContext를 사용한다.Spark Streaming과 Spark SQL 프로그래밍에서 Streaming Context(Spark Streaming)와 SQLcontext를 먼저 만들 것이다. 실제로 내부는 모두 Spark Context를 가지고 있다. 왜냐하면streaming과 sql는 최종적으로 RDD의 작업으로 전환될 것이다.SparkContext는 실례화할 때 여러 개의 구성 요소의 생성과 관련되기 때문에, 여기서는 SparkContext만 대략적으로 이야기하고, 뒤에는 모든 구성 요소의 작업만 세밀하게 이야기할 것이다.
SparkContext가 제공하는 기본 방법은 코드 블록 1
val lines = sc.textFile("README.md",1)
에서 파일README.md
에서 하나의partition만 있는 RDD를 만드는 것입니다. 이것은 SparkContext가 제공하는 기본 기능으로 데이터 원본에서 RDD를 만드는 것입니다.SparkContext는hadoop,text,sequenceFile 및 집합 등 다양한 데이터 원본에서 RDD를 만드는 일련의 방법을 제공합니다.
이외에SparkContext는 일부runJob 방법으로 작업을 제출했다. 코드1에서
wordCount.foreach(println)
이런 action 조작을 통해job의 제출을 촉발하는 것은SparkContext#runJob을 호출해서 제출하는 것이다. 물론 최종적으로DAGscheduler 구성 요소를 통해job의 제출을 실현한다(Sparkjob 제출 참조)SparkContext 초기화 프로세스
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
Job Progress Listener가 listener Bus에 등록하면 spark 집단에서 Job 상태 변화, Stage 변화,executor 변화 등 정보는 모두 등록된 XXXXListener가 처리한다. 이 Job Progree Listener는 말 그대로 Job의 실행 과정 중의 정보를 추적하는데 이런 정보는 Spark UI에 사용된다.Listener Bus는 여러 개의 listener를 등록할 수 있으며, listener Bus에 보내는 이벤트 (Event) 는 각listener가 처리합니다.
SparkEnv는 다음 섹션에서 설명합니다.
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
Scheduler Backend는 일부 excutor 정보(executorId, 소재host, 분배된 cpu 수량 등)를 가지고 있으며,Task Scheduler와 Scheduler Backend는 일정한 규칙에 따라task를excutor에 전송하여 운행하고,Task Scheduler와Scheduler Backend에 대해 구체적으로 참고한다.
_dagScheduler = new DAGScheduler(this)
한 job은 RDD 변환에 대응하는 DAG입니다. job이 제출할 때 먼저 DAGscheduler에서 DAG를 Stage로 변환하고 Stage에서 task를 생성합니다.task는TaskScheduler를 통해 제출하고DAGscheduler는spark job를 참고하여 제출합니다.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
이것은job가 실행되는 동안task의 운행 진도를 감시하고 동적 확장이 필요한지 여부를 판단합니다. 설정 항목
spark.dynamicAllocation.enabled
에 따라true이고 현재spark배치 모드가local이 아닌 상황에서 작용합니다.1.2.1 SparkEnv
???????????????????
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.