MapReduce가 Shuffle 단계에서 메모리 넘침 원인 분석 및 처리 방법

6357 단어

현상


Reduce 실행 중 메모리 넘침 오류가 발생할 수 있습니다. 다음과 같은 이상 메시지가 표시됩니다.
··· Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#1 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:387) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:56) at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:46) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.(InMemoryMapOutput.java:63) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:309) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:299) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:511) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:333) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:193) ···

Redue Shuffle 프로세스 및 매개변수 확인

  • EventFetcher는 맵 번호와 맵을 실행하는 서버를 포함하여 MRAppMaster에 이미 실행된 맵 정보를 얻는 것을 책임진다.
  • ShuffleScheduler는 Shuffle 스케줄링 작업을 담당합니다.
  • 각 Fetcher 라인은 Shuffle Scheduler에서 작업을 가져와 실제 맵 데이터를 가져옵니다.기본 Fetcher 스레드 5개.

  • MergeManager


    MergeManager는 shuffle의 데이터를 관리하는 중요한 데이터 구조입니다.그것은 가능한 한 메모리를 사용하여 shuffle의 데이터를 캐시하여 효율을 높이고 캐시가 되지 않으면 하드디스크에 출력합니다.MergeManager의 주요 매개변수
        mapreduce.reduce.shuffle.input.buffer.percent:
              shuffle , 0.7。Shuffle  * 0.7。
        mapreduce.reduce.shuffle.memory.limit.percent: 
             shuffle , 0.25,  Shuffle  * 0.25。
             , 。
       mapreduce.reduce.shuffle.merge.percent: 0.9。
           shuffle Shuffle  * 0.9 , 。
    

    MergeManager는 다음 매개변수를 통해 변수를 계산합니다.
    memoryLimit:shuffle 
    maxSingleShuffleLimit: shuffle 
    mergeThreshold: 
    

    Fetcher 스레드는 Map 서버를 실행하는 Shuffle 감청 프로그램을 연결하여 ShuffleHead를 가져옵니다. 정보는 다음과 같습니다.
      String mapId;  // Map  
      long uncompressedLength; //  
      long compressedLength;  //  
      int forReduce;    // Reduce 
    

    Fetcher 스레드에서 맵의 ShuffleHead 정보를 가져온 후 호출merger.reserve(mapId, decompressedLength, id);을 사용합니다.merge InMemoryMapOutput OnDiskMapOutput , null`를 사용하면 ShuffleScheduler에서 작업을 가져옵니다.
    Fetcher 스레드가 데이터를 가져온 후mapOutput의commit 작업을 진행하면 정보 읽기가 끝났다는 것을 설명합니다. 이 맵Output은 다른 맵Output과 통합할 수 있습니다.
    메모리 공간을 Fetcher에 나누어 주면 상태는 allocated,commit는 committed,commit 상태의 메모리만merge가 됩니다.따라서 MergeManager에는 다음과 같은 매개변수가 있습니다.
     usedMemory:  。
     commitMemory:  。
    

    MergeManager의 reserve는 다음과 같이 처리됩니다.
    public synchronized MapOutput reserve(TaskAttemptID mapId, 
                                                 long requestedSize,
                                                 int fetcher
                                                 ) throws IOException {
        if (requestedSize > maxSingleShuffleLimit) {
          return new OnDiskMapOutput(mapId, reduceId, this, requestedSize,
                                          jobConf, mapOutputFile, fetcher, true);
        }
        
        if (usedMemory > memoryLimit) {
          return null;
        }
       
        return unconditionalReserve(mapId, requestedSize, true);
      }
    

    unconditionalReserve 방법은 다음과 같습니다. usedMemory를 추가하고 InMemoryMapOutput 대상을 되돌려줍니다.
     private synchronized InMemoryMapOutput unconditionalReserve(
          TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
        usedMemory += requestedSize;
        return new InMemoryMapOutput(jobConf, mapId, this, (int)requestedSize,
                                          codec, primaryMapOutput);
      }
    

    이 코드 문제는 다음과 같습니다.
  • 만약에 이전 4개의 Fetcher가 전체 shuffle 메모리의 99%를 사용했다면 5번째 Fetcher가 추출한 데이터는 하나의 shuffle 작업이 사용할 수 있는 메모리 한도액에 가깝다.fetcher commit이 없습니다.이 때 다섯 번째fetcher에 25%의 메모리를 분배합니다.분배 메모리를 shuffle 메모리의 124%에 이르게 하고 메모리가 넘칩니다.
  • 앞에 있는fetcher 4개는 shuffle 메모리의 89를 사용하고 commit를 사용했습니다.Merge가 시작되지 않습니다.마지막 Fetcher의 데이터 양은 하나의 shuffle 작업이 사용할 수 있는 메모리 한도액에 가깝다.이때 총 shuffle 사용량은 114%이고 메모리가 넘칩니다.

  • 이 문제를 해결하기 위해서는 조정이 필요하다.목표는 Shuffle 메모리가 전체 메모리를 차지하는 비율이 70%를 초과하지 않는 것입니다. 그렇지 않으면 OutOfMemoryError가 발생합니다. ##시나리오 1.shuffle 메모리 0.7을 유지하면commit 메모리는 0.75로 변경됩니다.reserve 프로그램을 동시에 수정합니다.reserve 방법에서usedMemory가 Shuffle 메모리의 75%보다 작으면 항상 분배에 성공할 수 있습니다.75% 보다 크면 성공할 수도 있고 실패할 수도 있다.그러나 75% 를 할당했을 때, 이 Fetcher의 작업이commit 메모리를 끝낼 때, 모두merge 작업을 터치할 수 있습니다.merge 후에 메모리를 방출합니다.시스템 효율을 높이기 위해 mapreduce.reduce.shuffle.merge.percent를 0.5로 설정할 수 있으며,commit 메모리가 0.5에 이르면 메르그를 시작하고, 이때fetcher와 메모리를 신청할 때 충돌할 기회가 낮아진다.Reduce가 2G 메모리라고 하더라도merge의 데이터 양은 최소 2G * 0.7 * 0.5는 700MB입니다.Reduce가 커지면 Merge 수가 한 번에 더 많아집니다.
        mapreduce.reduce.shuffle.input.buffer.percent:
              shuffle , 0.7。
        mapreduce.reduce.shuffle.memory.limit.percent: 
             shuffle , 0.25,  Shuffle  * 0.25。
             , 。
       mapreduce.reduce.shuffle.merge.percent: 0.75。
           shuffle Shuffle  ** 0.75 , 。
    

    reserve 방법
        if (usedMemory  + requestedSize > memoryLimit) {   //    if (usedMemory  > memoryLimit) {
          return null;
        }
    

    방안 2 shuffle 메모리 비율 0.6, 단일 shuffle 최대 0.15,merge의 메모리 비율은 바꾸지 않고reserve 방법은 바꾸지 않습니다.이 방안의 셔플 메모리는 100%에 가까울 때 최대 15%의 셔플 메모리를 분배할 수 있다.총 Shuffle 메모리는 0.6 + 0.6 * 0.15 = 0.69를 초과하지 않습니다.reserve 방법에서usedMemory가 Shuffle 메모리보다 100퍼센트 작으면 항상 분배에 성공할 수 있습니다. 그렇지 않으면 실패합니다.그러나 이미 100%를 분배했을 때, 이 Fetcher의 작업이commit 메모리를 끝낼 때,merge 작업을 촉발할 수 있습니다.merge 후에 메모리를 방출합니다.
        mapreduce.reduce.shuffle.input.buffer.percent:
              shuffle 0.6。
        mapreduce.reduce.shuffle.memory.limit.percent: 
             shuffle , 0.15,  Shuffle  * 0.15。
             , 。
       mapreduce.reduce.shuffle.merge.percent: 0.9。
           shuffle Shuffle  ** 0.9 , 。
    

    좋은 웹페이지 즐겨찾기