HDFS Centrailzed Cache 를 배치할 DataNode
사실 원리는 매우 간단하다. 여러분이 아래의 몇 개의 링크를 읽으면 어떻게 된 일인지 알 수 있다.
사실 위 두 번째 글은 Cache를 이 Block의 세 개의 Replica가 있는 Data Node 중 가장 많은 사용 가능한 메모리에 놓을 것이라고 소개했다.그러나 그때 나는 자세히 보지 않고 원본을 읽어서 이 문제를 탐구했다.
관련 코드는 주로
CacheReplicationMonitor.chooseDatanodesForCaching
: /**
* Chooses datanode locations for caching from a list of valid possibilities.
* Non-stale nodes are chosen before stale nodes.
*
* @param possibilities List of candidate datanodes
* @param neededCached Number of replicas needed
* @param staleInterval Age of a stale datanode
* @return A list of chosen datanodes
*/
private static List chooseDatanodesForCaching(
final List possibilities, final int neededCached,
final long staleInterval) {
// Make a copy that we can modify
List targets =
new ArrayList(possibilities);
// Selected targets
List chosen = new LinkedList();
// Filter out stale datanodes
List stale = new LinkedList();
Iterator it = targets.iterator();
while (it.hasNext()) {
DatanodeDescriptor d = it.next();
if (d.isStale(staleInterval)) {
it.remove();
stale.add(d);
}
}
// Select targets
while (chosen.size() < neededCached) {
// Try to use stale nodes if we're out of non-stale nodes, else we're done
if (targets.isEmpty()) {
if (!stale.isEmpty()) {
targets = stale;
} else {
break;
}
}
// Select a random target
DatanodeDescriptor target =
chooseRandomDatanodeByRemainingCapacity(targets);
chosen.add(target);
targets.remove(target);
}
return chosen;
}
그리고
CacheReplicationMonitor.addNewPendingCached
: /**
* Add new entries to the PendingCached list.
*
* @param neededCached The number of replicas that need to be cached.
* @param cachedBlock The block which needs to be cached.
* @param cached A list of DataNodes currently caching the block.
* @param pendingCached A list of DataNodes that will soon cache the
* block.
*/
private void addNewPendingCached(final int neededCached,
CachedBlock cachedBlock, List cached,
List pendingCached) {
// To figure out which replicas can be cached, we consult the
// blocksMap. We don't want to try to cache a corrupt replica, though.
BlockInfoContiguous blockInfo = blockManager.
getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) {
LOG.debug("Block {}: can't add new cached replicas," +
" because there is no record of this block " +
"on the NameNode.", cachedBlock.getBlockId());
return;
}
if (!blockInfo.isComplete()) {
LOG.debug("Block {}: can't cache this block, because it is not yet"
+ " complete.", cachedBlock.getBlockId());
return;
}
// Filter the list of replicas to only the valid targets
List possibilities =
new LinkedList();
int numReplicas = blockInfo.getCapacity();
Collection corrupt =
blockManager.getCorruptReplicas(blockInfo);
int outOfCapacity = 0;
for (int i = 0; i < numReplicas; i++) {
DatanodeDescriptor datanode = blockInfo.getDatanode(i);
if (datanode == null) {
continue;
}
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
continue;
}
if (corrupt != null && corrupt.contains(datanode)) {
continue;
}
if (pendingCached.contains(datanode) || cached.contains(datanode)) {
continue;
}
long pendingBytes = 0;
// Subtract pending cached blocks from effective capacity
Iterator it = datanode.getPendingCached().iterator();
while (it.hasNext()) {
CachedBlock cBlock = it.next();
BlockInfoContiguous info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes -= info.getNumBytes();
}
}
it = datanode.getPendingUncached().iterator();
// Add pending uncached blocks from effective capacity
while (it.hasNext()) {
CachedBlock cBlock = it.next();
BlockInfoContiguous info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes += info.getNumBytes();
}
}
long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
if (pendingCapacity < blockInfo.getNumBytes()) {
LOG.trace("Block {}: DataNode {} is not a valid possibility " +
"because the block has size {}, but the DataNode only has {}" +
"bytes of cache remaining ({} pending bytes, {} already cached.",
blockInfo.getBlockId(), datanode.getDatanodeUuid(),
blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
datanode.getCacheRemaining());
outOfCapacity++;
continue;
}
possibilities.add(datanode);
}
List chosen = chooseDatanodesForCaching(possibilities,
neededCached, blockManager.getDatanodeManager().getStaleInterval());
for (DatanodeDescriptor datanode : chosen) {
LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
blockInfo.getBlockId(), datanode.getDatanodeUuid());
pendingCached.add(datanode);
boolean added = datanode.getPendingCached().add(cachedBlock);
assert added;
}
// We were unable to satisfy the requested replication factor
if (neededCached > chosen.size()) {
LOG.debug("Block {}: we only have {} of {} cached replicas."
+ " {} DataNodes have insufficient cache capacity.",
blockInfo.getBlockId(),
(cachedBlock.getReplication() - neededCached + chosen.size()),
cachedBlock.getReplication(), outOfCapacity
);
}
}
다음과 같은 몇 가지 조건을 충족하는 DataNode 에서 가장 많은 Cache 메모리를 사용할 수 있는 노드를 선택하는 것이 논리적인 방법입니다.
Mapper 임무를 할당할 때 Centrailzed Cache를 고려하는지 여부에 관해서는 관련 원본을 보고 정리할 것입니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.