Spark Shuffle Write 단계 디스크 파일 분석
전편에서 Spark Shuffle 메모리 분석을 쓴 후, 많은 사람들이 의문을 제기했다. 모두들 파일을 어떻게 낙하하는지에 대해 매우 흥미를 느끼기 때문에 이 글은Sort Based Shuffle Write 단계에서 디스크를 어떻게 낙하하는지 상세하게 소개할 것이다.
프로세스 분석
입구:
org.apache.spark.scheduler.ShuffleMapTask.runTask
runTask 의 코드는 다음과 같습니다.
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
partitionId,
context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: product2="" any="" writer.stop="" true=""/>
여기 관리자가 받은 건요.
org.apache.spark.shuffle.sort.SortShuffleWriter
우리는 그가 어떻게 디스크를 쓸 수 있는 그sorter를 얻었는지 보았다.우리가 분석한 회선 가설은 맵시드 컴바인을 해야 한다고 가정한다
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
dep.aggregator,
Some(dep.partitioner),
dep.keyOrdering, de.serializer)
이어서 맵의 출력을sorter에 넣습니다:
sorter.insertAll(records)
여기서 insertAll 프로세스는 다음과 같습니다.
while (records.hasNext) {
addElementsRead() kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)}
안에 있는 맵은 사실Partitioned Append Only Map입니다. 이것은 전체 메모리의 구조입니다.이것을 가득 써야만 spill 조작을 촉발할 수 있습니다.maybe Spill Collection이Partitioned Append Only Map에서 업데이트될 때마다 호출되는 것을 볼 수 있습니다.
어떤 spill이 발생하면 생성된 파일의 이름은 다음과 같습니다.
"temp_shuffle_" + id
로직은 다음과 같습니다.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId))
}
생성된 모든 spill 파일이 하나의 배열에 기록됩니다.
private val spills = new ArrayBuffer[SpilledFile]
task에 대응하는partition 데이터를 교체한 후에merge 작업을 해서 디스크에 있는spill 파일과 메모리를 교체하여 처리하여 새로운iterator를 얻습니다. 이iterator의 요소는 다음과 같습니다.
(p, mergeWithAggregation(
iterators,
aggregator.get.mergeCombiners, keyComparator,
ordering.isDefined))
그 중에서 p는 Reduce에 대응하는partitionId이고 p에 대응하는 모든 데이터는 그에 대응하는iterator에 있다.
그러면 마지막 출력 파일 이름이 표시됩니다.
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
파일 이름 형식은 다음과 같습니다.
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
여기서 ReduceId는 고정값 NOOP 입니다.REDUCE_ID, 기본값은 0입니다.
그리고 실제 쓰기 시작
val partitionLengths = sorter.writePartitionedFile(
blockId,
context,
outputFile)
파일을 쓰는 프로세스는 다음과 같습니다.
for ((id, elements)
방금 우리가 말했어, 이거this.partitionedIterator의 내부 요소는 ReducepartitionID->실제record의iterator이기 때문에 각 구역의 기록을 순서대로 쓰고 fileSegment를 형성하며 편이량을 기록합니다.이렇게 하면 후속적인 모든 Reduce는 편이량에 따라 자신이 필요로 하는 데이터를 얻을 수 있다.해당 파일 이름은 앞에서 설명한 대로 다음과 같습니다.
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"
방금 우리가 말한 편이량은 사실 메모리에 존재하기 때문에 계속해서 지속해야 한다. 아래의 writeIndexFile를 통해 완성한다.
shuffleBlockResolver.writeIndexFile(
dep.shuffleId,
mapId,
partitionLengths)
구체적인 파일 이름은 다음과 같습니다.
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"
이로써 하나의task 쓰기 작업이 완료되고 파일에 대응합니다.
최종 결론
그래서 마지막 결론은 Executor가 최종적으로 대응하는 파일 수는 다음과 같다는 것이다.
MapNum ( : index )
동시에 보유되어 쓰기 가능한 파일 수는 다음과 같습니다.
CoreNum
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.