Spark Shuffle Write 단계 디스크 파일 분석

4122 단어
전언
전편에서 Spark Shuffle 메모리 분석을 쓴 후, 많은 사람들이 의문을 제기했다. 모두들 파일을 어떻게 낙하하는지에 대해 매우 흥미를 느끼기 때문에 이 글은Sort Based Shuffle Write 단계에서 디스크를 어떻게 낙하하는지 상세하게 소개할 것이다.
프로세스 분석
입구:
org.apache.spark.scheduler.ShuffleMapTask.runTask

runTask 의 코드는 다음과 같습니다.
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
                              dep.shuffleHandle, 
                              partitionId, 
                              context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: product2="" any="" writer.stop="" true=""/>

여기 관리자가 받은 건요.
   org.apache.spark.shuffle.sort.SortShuffleWriter

우리는 그가 어떻게 디스크를 쓸 수 있는 그sorter를 얻었는지 보았다.우리가 분석한 회선 가설은 맵시드 컴바인을 해야 한다고 가정한다
 sorter = if (dep.mapSideCombine) {  
 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")  
 new ExternalSorter[K, V, C](
                 dep.aggregator, 
                 Some(dep.partitioner), 
                 dep.keyOrdering, de.serializer)

이어서 맵의 출력을sorter에 넣습니다:
sorter.insertAll(records)

여기서 insertAll 프로세스는 다음과 같습니다.
 while (records.hasNext) {  
 addElementsRead()  kv = records.next() 
 map.changeValue((getPartition(kv._1), kv._1), update)
 maybeSpillCollection(usingMap = true)}

안에 있는 맵은 사실Partitioned Append Only Map입니다. 이것은 전체 메모리의 구조입니다.이것을 가득 써야만 spill 조작을 촉발할 수 있습니다.maybe Spill Collection이Partitioned Append Only Map에서 업데이트될 때마다 호출되는 것을 볼 수 있습니다.
어떤 spill이 발생하면 생성된 파일의 이름은 다음과 같습니다.
    "temp_shuffle_" + id

로직은 다음과 같습니다.
val (blockId, file) = diskBlockManager.createTempShuffleBlock() 

  def createTempShuffleBlock(): (TempShuffleBlockId, File) = {  
  var blockId = new TempShuffleBlockId(UUID.randomUUID()) 
        while (getFile(blockId).exists()) {   
           blockId = new TempShuffleBlockId(UUID.randomUUID())  
        }  
  (blockId, getFile(blockId))
  }

생성된 모든 spill 파일이 하나의 배열에 기록됩니다.
  private val spills = new ArrayBuffer[SpilledFile]

task에 대응하는partition 데이터를 교체한 후에merge 작업을 해서 디스크에 있는spill 파일과 메모리를 교체하여 처리하여 새로운iterator를 얻습니다. 이iterator의 요소는 다음과 같습니다.
 (p, mergeWithAggregation(  
             iterators, 
             aggregator.get.mergeCombiners, keyComparator,
             ordering.isDefined))

그 중에서 p는 Reduce에 대응하는partitionId이고 p에 대응하는 모든 데이터는 그에 대응하는iterator에 있다.
그러면 마지막 출력 파일 이름이 표시됩니다.
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

파일 이름 형식은 다음과 같습니다.
 "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

여기서 ReduceId는 고정값 NOOP 입니다.REDUCE_ID, 기본값은 0입니다.
그리고 실제 쓰기 시작
   val partitionLengths = sorter.writePartitionedFile(
     blockId, 
     context, 
     outputFile)

파일을 쓰는 프로세스는 다음과 같습니다.
for ((id, elements) 

방금 우리가 말했어, 이거this.partitionedIterator의 내부 요소는 ReducepartitionID->실제record의iterator이기 때문에 각 구역의 기록을 순서대로 쓰고 fileSegment를 형성하며 편이량을 기록합니다.이렇게 하면 후속적인 모든 Reduce는 편이량에 따라 자신이 필요로 하는 데이터를 얻을 수 있다.해당 파일 이름은 앞에서 설명한 대로 다음과 같습니다.
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"

방금 우리가 말한 편이량은 사실 메모리에 존재하기 때문에 계속해서 지속해야 한다. 아래의 writeIndexFile를 통해 완성한다.
 shuffleBlockResolver.writeIndexFile(
           dep.shuffleId,
           mapId, 
          partitionLengths)

구체적인 파일 이름은 다음과 같습니다.
  "shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"

이로써 하나의task 쓰기 작업이 완료되고 파일에 대응합니다.
최종 결론
그래서 마지막 결론은 Executor가 최종적으로 대응하는 파일 수는 다음과 같다는 것이다.
MapNum ( :   index  )

동시에 보유되어 쓰기 가능한 파일 수는 다음과 같습니다.
 CoreNum

좋은 웹페이지 즐겨찾기