HDFS Centrailzed Cache 를 배치할 DataNode

7545 단어
며칠 전 Hadoop User Email List를 보던 중 HDFS Centrailzed Cache에 관한 문제가 발견되었습니다.마침 나는 이 덩어리에 익숙하지 않아서, 심지어는 이전에도 들어 본 적이 없어서, 한 번 잘 이해했다.
사실 원리는 매우 간단하다. 여러분이 아래의 몇 개의 링크를 읽으면 어떻게 된 일인지 알 수 있다.
  • Centralized Cache Management in HDFS
  • HDFS 중앙 집중식 캐시 관리 원리와 코드 분석
  • In-memory Caching in HDFS: Lower Latency, Same Great Taste

  • 사실 위 두 번째 글은 Cache를 이 Block의 세 개의 Replica가 있는 Data Node 중 가장 많은 사용 가능한 메모리에 놓을 것이라고 소개했다.그러나 그때 나는 자세히 보지 않고 원본을 읽어서 이 문제를 탐구했다.
    관련 코드는 주로 CacheReplicationMonitor.chooseDatanodesForCaching:
      /**
       * Chooses datanode locations for caching from a list of valid possibilities.
       * Non-stale nodes are chosen before stale nodes.
       *
       * @param possibilities List of candidate datanodes
       * @param neededCached Number of replicas needed
       * @param staleInterval Age of a stale datanode
       * @return A list of chosen datanodes
       */
      private static List chooseDatanodesForCaching(
          final List possibilities, final int neededCached,
          final long staleInterval) {
        // Make a copy that we can modify
        List targets =
            new ArrayList(possibilities);
        // Selected targets
        List chosen = new LinkedList();
    
        // Filter out stale datanodes
        List stale = new LinkedList();
        Iterator it = targets.iterator();
        while (it.hasNext()) {
          DatanodeDescriptor d = it.next();
          if (d.isStale(staleInterval)) {
            it.remove();
            stale.add(d);
          }
        }
        // Select targets
        while (chosen.size() < neededCached) {
          // Try to use stale nodes if we're out of non-stale nodes, else we're done
          if (targets.isEmpty()) {
            if (!stale.isEmpty()) {
              targets = stale;
            } else {
              break;
            }
          }
          // Select a random target
          DatanodeDescriptor target =
              chooseRandomDatanodeByRemainingCapacity(targets);
          chosen.add(target);
          targets.remove(target);
        }
        return chosen;
      }
    

    그리고 CacheReplicationMonitor.addNewPendingCached:
     /**
      * Add new entries to the PendingCached list.
      *
      * @param neededCached     The number of replicas that need to be cached.
      * @param cachedBlock      The block which needs to be cached.
      * @param cached           A list of DataNodes currently caching the block.
      * @param pendingCached    A list of DataNodes that will soon cache the
      *                         block.
      */
     private void addNewPendingCached(final int neededCached,
         CachedBlock cachedBlock, List cached,
         List pendingCached) {
       // To figure out which replicas can be cached, we consult the
       // blocksMap.  We don't want to try to cache a corrupt replica, though.
       BlockInfoContiguous blockInfo = blockManager.
             getStoredBlock(new Block(cachedBlock.getBlockId()));
       if (blockInfo == null) {
         LOG.debug("Block {}: can't add new cached replicas," +
             " because there is no record of this block " +
             "on the NameNode.", cachedBlock.getBlockId());
         return;
       }
       if (!blockInfo.isComplete()) {
         LOG.debug("Block {}: can't cache this block, because it is not yet"
             + " complete.", cachedBlock.getBlockId());
         return;
       }
       // Filter the list of replicas to only the valid targets
       List possibilities =
           new LinkedList();
       int numReplicas = blockInfo.getCapacity();
       Collection corrupt =
           blockManager.getCorruptReplicas(blockInfo);
       int outOfCapacity = 0;
       for (int i = 0; i < numReplicas; i++) {
         DatanodeDescriptor datanode = blockInfo.getDatanode(i);
         if (datanode == null) {
           continue;
         }
         if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
           continue;
         }
         if (corrupt != null && corrupt.contains(datanode)) {
           continue;
         }
         if (pendingCached.contains(datanode) || cached.contains(datanode)) {
           continue;
         }
         long pendingBytes = 0;
         // Subtract pending cached blocks from effective capacity
         Iterator it = datanode.getPendingCached().iterator();
         while (it.hasNext()) {
           CachedBlock cBlock = it.next();
           BlockInfoContiguous info =
               blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
           if (info != null) {
             pendingBytes -= info.getNumBytes();
           }
         }
         it = datanode.getPendingUncached().iterator();
         // Add pending uncached blocks from effective capacity
         while (it.hasNext()) {
           CachedBlock cBlock = it.next();
           BlockInfoContiguous info =
               blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
           if (info != null) {
             pendingBytes += info.getNumBytes();
           }
         }
         long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
         if (pendingCapacity < blockInfo.getNumBytes()) {
           LOG.trace("Block {}: DataNode {} is not a valid possibility " +
               "because the block has size {}, but the DataNode only has {}" +
               "bytes of cache remaining ({} pending bytes, {} already cached.",
               blockInfo.getBlockId(), datanode.getDatanodeUuid(),
               blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
               datanode.getCacheRemaining());
           outOfCapacity++;
           continue;
         }
         possibilities.add(datanode);
       }
       List chosen = chooseDatanodesForCaching(possibilities,
           neededCached, blockManager.getDatanodeManager().getStaleInterval());
       for (DatanodeDescriptor datanode : chosen) {
         LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
             blockInfo.getBlockId(), datanode.getDatanodeUuid());
         pendingCached.add(datanode);
         boolean added = datanode.getPendingCached().add(cachedBlock);
         assert added;
       }
       // We were unable to satisfy the requested replication factor
       if (neededCached > chosen.size()) {
         LOG.debug("Block {}: we only have {} of {} cached replicas."
                 + " {} DataNodes have insufficient cache capacity.",
             blockInfo.getBlockId(),
             (cachedBlock.getReplication() - neededCached + chosen.size()),
             cachedBlock.getReplication(), outOfCapacity
         );
       }
     }
    

    다음과 같은 몇 가지 조건을 충족하는 DataNode 에서 가장 많은 Cache 메모리를 사용할 수 있는 노드를 선택하는 것이 논리적인 방법입니다.
  • 이 DataNode의 상태는 정상이며 decommission
  • 이 없습니다.
  • 이 블록은 이 DataNode에서 corrupt 상태가 아니다
  • 이 블록은 DataNode에 캐시되지 않았습니다(pendingCached 및cached에 이 DataNode가 없습니다)
  • 이 블록이 닫혔습니다. (예를 들어 블록이 Replication을 진행할 때 두 번째 Replicate 2와 Replication 3이 완성되지 않으면 이 DataNode를 선택할 수 없습니다)
  • 그 책략은 소개가 끝났다.여기서 나는 새로운 의문이 하나 생겼다. 우리는 Map Reduce의 Mapper 임무가 해당 블록이 있는 노드에 최대한 분배될 수 있다는 것을 안다.그럼 여기서 Centrailzed Cache를 생각해 볼까요?스파크 등 다른 프레임은요?
    Mapper 임무를 할당할 때 Centrailzed Cache를 고려하는지 여부에 관해서는 관련 원본을 보고 정리할 것입니다.

    좋은 웹페이지 즐겨찾기