MapReduce가 Shuffle 단계에서 메모리 넘침 원인 분석 및 처리 방법
현상
Reduce 실행 중 메모리 넘침 오류가 발생할 수 있습니다. 다음과 같은 이상 메시지가 표시됩니다.
··· Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:387) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:56) at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:46) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.(InMemoryMapOutput.java:63) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:309) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:299) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:511) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:333) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:193) ···
Redue Shuffle 프로세스 및 매개변수 확인
MergeManager
MergeManager는 shuffle의 데이터를 관리하는 중요한 데이터 구조입니다.그것은 가능한 한 메모리를 사용하여 shuffle의 데이터를 캐시하여 효율을 높이고 캐시가 되지 않으면 하드디스크에 출력합니다.MergeManager의 주요 매개변수
mapreduce.reduce.shuffle.input.buffer.percent:
shuffle , 0.7。Shuffle * 0.7。
mapreduce.reduce.shuffle.memory.limit.percent:
shuffle , 0.25, Shuffle * 0.25。
, 。
mapreduce.reduce.shuffle.merge.percent: 0.9。
shuffle Shuffle * 0.9 , 。
MergeManager는 다음 매개변수를 통해 변수를 계산합니다.
memoryLimit:shuffle
maxSingleShuffleLimit: shuffle
mergeThreshold:
Fetcher 스레드는 Map 서버를 실행하는 Shuffle 감청 프로그램을 연결하여 ShuffleHead를 가져옵니다. 정보는 다음과 같습니다.
String mapId; // Map
long uncompressedLength; //
long compressedLength; //
int forReduce; // Reduce
Fetcher 스레드에서 맵의 ShuffleHead 정보를 가져온 후 호출
merger.reserve(mapId, decompressedLength, id);
을 사용합니다.merge
InMemoryMapOutput
OnDiskMapOutput ,
null`를 사용하면 ShuffleScheduler에서 작업을 가져옵니다.Fetcher 스레드가 데이터를 가져온 후mapOutput의commit 작업을 진행하면 정보 읽기가 끝났다는 것을 설명합니다. 이 맵Output은 다른 맵Output과 통합할 수 있습니다.
메모리 공간을 Fetcher에 나누어 주면 상태는 allocated,commit는 committed,commit 상태의 메모리만merge가 됩니다.따라서 MergeManager에는 다음과 같은 매개변수가 있습니다.
usedMemory: 。
commitMemory: 。
MergeManager의 reserve는 다음과 같이 처리됩니다.
public synchronized MapOutput reserve(TaskAttemptID mapId,
long requestedSize,
int fetcher
) throws IOException {
if (requestedSize > maxSingleShuffleLimit) {
return new OnDiskMapOutput(mapId, reduceId, this, requestedSize,
jobConf, mapOutputFile, fetcher, true);
}
if (usedMemory > memoryLimit) {
return null;
}
return unconditionalReserve(mapId, requestedSize, true);
}
unconditionalReserve 방법은 다음과 같습니다. usedMemory를 추가하고 InMemoryMapOutput 대상을 되돌려줍니다.
private synchronized InMemoryMapOutput unconditionalReserve(
TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
usedMemory += requestedSize;
return new InMemoryMapOutput(jobConf, mapId, this, (int)requestedSize,
codec, primaryMapOutput);
}
이 코드 문제는 다음과 같습니다.
이 문제를 해결하기 위해서는 조정이 필요하다.목표는 Shuffle 메모리가 전체 메모리를 차지하는 비율이 70%를 초과하지 않는 것입니다. 그렇지 않으면 OutOfMemoryError가 발생합니다. ##시나리오 1.shuffle 메모리 0.7을 유지하면commit 메모리는 0.75로 변경됩니다.reserve 프로그램을 동시에 수정합니다.reserve 방법에서usedMemory가 Shuffle 메모리의 75%보다 작으면 항상 분배에 성공할 수 있습니다.75% 보다 크면 성공할 수도 있고 실패할 수도 있다.그러나 75% 를 할당했을 때, 이 Fetcher의 작업이commit 메모리를 끝낼 때, 모두merge 작업을 터치할 수 있습니다.merge 후에 메모리를 방출합니다.시스템 효율을 높이기 위해
mapreduce.reduce.shuffle.merge.percent
를 0.5로 설정할 수 있으며,commit 메모리가 0.5에 이르면 메르그를 시작하고, 이때fetcher와 메모리를 신청할 때 충돌할 기회가 낮아진다.Reduce가 2G 메모리라고 하더라도merge의 데이터 양은 최소 2G * 0.7 * 0.5는 700MB입니다.Reduce가 커지면 Merge 수가 한 번에 더 많아집니다. mapreduce.reduce.shuffle.input.buffer.percent:
shuffle , 0.7。
mapreduce.reduce.shuffle.memory.limit.percent:
shuffle , 0.25, Shuffle * 0.25。
, 。
mapreduce.reduce.shuffle.merge.percent: 0.75。
shuffle Shuffle ** 0.75 , 。
reserve 방법
if (usedMemory + requestedSize > memoryLimit) { // if (usedMemory > memoryLimit) {
return null;
}
방안 2 shuffle 메모리 비율 0.6, 단일 shuffle 최대 0.15,merge의 메모리 비율은 바꾸지 않고reserve 방법은 바꾸지 않습니다.이 방안의 셔플 메모리는 100%에 가까울 때 최대 15%의 셔플 메모리를 분배할 수 있다.총 Shuffle 메모리는 0.6 + 0.6 * 0.15 = 0.69를 초과하지 않습니다.reserve 방법에서usedMemory가 Shuffle 메모리보다 100퍼센트 작으면 항상 분배에 성공할 수 있습니다. 그렇지 않으면 실패합니다.그러나 이미 100%를 분배했을 때, 이 Fetcher의 작업이commit 메모리를 끝낼 때,merge 작업을 촉발할 수 있습니다.merge 후에 메모리를 방출합니다.
mapreduce.reduce.shuffle.input.buffer.percent:
shuffle 0.6。
mapreduce.reduce.shuffle.memory.limit.percent:
shuffle , 0.15, Shuffle * 0.15。
, 。
mapreduce.reduce.shuffle.merge.percent: 0.9。
shuffle Shuffle ** 0.9 , 。
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.