Spark 소스 코드 분석의 BlockManager

BlockManager 는 대외 적 으로 제공 하 는 블록 에 통일 적 으로 접근 하 는 인터페이스 로 Master 와 Slave 에 하나의 인 스 턴 스 가 있 습 니 다. 그 는 데 이 터 를 읽 고 쓰 는 방법 을 제공 하고 StorageLevel 에 따라 서로 다른 BlockStore 를 호출 하여 데 이 터 를 읽 고 씁 니 다.
프로그램 이 시 작 될 때 SparkContext 는 Driver 엔 드 의 SpakEnv 를 만 들 고 이 SparkEnv 에서 BlockManager 와 BlockManager Master 를 예화 하여 내부 에 메시지 통신 의 터미널 BlockManager MasterEndpoint 를 만 듭 니 다.
Executor 가 시 작 될 때 도 SparkEnv 를 만 듭 니 다. 이 SparkEnv 에서 BlockManager 와 BlockTransfer Service 를 실례 화 합 니 다. BlockManager 초기 화 과정 에서 BlockManager SlaveEndpoint 의 메시지 단말 기 를 추가 하고 이 BlockManager SlaveEndpoint 의 이 종 단 을 Driver 에 참조 하여 등록 합 니 다.이렇게 하면 Driver 와 Executor 는 서로 통신 단말기 의 인용 을 가지 고 있 을 수 있다.
 
/ / BlockManager Masterval blockManager Master = new BlockManager Master (registerOrLookupEndpoint (registerOrLookupEndpoint) 만 들 기  BlockManagerMaster.DRIVER_ENDPOINT_NAME,   // BlockManager MasterEndpoint 만 들 기  new BlockManagerMasterEndpoint(rpcEnv,isLocal, conf,listenerBus)),   conf, isDriver) / / NB: blockManager 는 initialize () 가 나중에 호출 될 때 까지 유효 하지 않 습 니 다. / / BlockManagerval blockManager = new BlockManager (executorId, rpcEnv, blockManagerMaster,  serializer, conf,mapOutputTracker, shuffleManager, blockTransferService, securityManager,   numUsableCores)
 
핵심 속성
String executorId: executorId 또는 driverId
RpcEnv rpcEnv: rpc 통신 환경
BlockManagerMaster master: 전체 프로그램 이 실행 되 는 동안 block 메타 데이터 의 관리 와 유지, 명령 의 전송 을 책임 집 니 다.
Serializer default Serializer: 기본 직렬 화 메커니즘
Long max Memory: 최대 사용 가능 한 메모리 할당
Int numUsableCore: cpu 핵 수 를 사용 할 수 있 습 니 다.
MapOutputTracker mapOutputTracker: map 단 shuffle 프로 세 스 의 출력 상태
셔 플 관리자 셔 플 관리자: 셔 플 관리자
BlockTransferService blockTransferService: 원 격 으로 데 이 터 를 전송 하고 침대 블록 을 가 져 오 는 데 사용 합 니 다.
DiskBlockManager diskBlockManager: 디스크 에 있 는 block 및 대응 하 는 파일 과 디 렉 터 리 를 관리 하 는 데 사 용 됩 니 다.
TimeStamped HashMap [BlockId, BlockInfo] blockInfo: 유지 보수 맵 구축
Boolean externalBlockStore 초기 화: 외부 저장 소 를 사용 할 지 여부
Memory Store memory Store: 메모리 대상
DiskStore diskStore: 디스크 저장 대상
ExternalBlockStore externalBlockStore: 외부 촌 처 대상
BlockManager Id blockManager Id: 현재 BlockManager 에 대응 하 는 id
ShuffleClient shuffleClient: 다른 executor 의 shuffle 파일 을 읽 는 클 라 이언 트 는 외부 서비스 일 수도 있 고 표준 데이터 블록 전송 서비스 일 수도 있 습 니 다. 외부 shuffle 서 비 스 를 사용 하면 External ShuffleClient 를 만 들 지 않 으 면 BlockTransferService 를 만 듭 니 다.
Boolean compressBroadcast: 방송 데 이 터 를 압축 할 지 여부
Boolean copressShuffle: map 출력 파일 을 압축 할 지 여 부 는 일반적으로 열 어 보 는 것 을 권장 합 니 다. 그러나 cpu 자원 소모 가 너무 많 으 면 true 로 설정 하 는 것 을 권장 하지 않 습 니 다.
Boolean compressRdds: 직렬 화 된 RDD 파 티 션 을 압축 할 지 여부
Boolean compressShuffle Spill: 맵 에 넘 쳐 쓰 인 임시 파일 을 압축 할 지 여부
BlockManager SlaveEndpoint slaveEndpoint: 보유 한 BlockManager SlaveEndpoint 통신 단말기
 
 
중요 한 방법
2.1initialize 초기 화 방법, 지정 한 application id 로 BlockManager 를 예화 합 니 다.
\ # BlockTransferService 초기 화, rpc server 구축 등 BlockTransferService 는 주로 노드 간 데이터 전송 에 사 용 됩 니 다.
\ # ShuffleClient 를 초기 화하 고 다른 executor 의 shuffle 파일 을 읽 는 클 라 이언 트
\ # blockManager Id 구축
\ # shuffleServerId 구축
\ # BlockManager Master 에 BlockManager 등록
\ # 외부 셔 플 서비스 가 시작 되 고 Executor 노드 가 되면 외부 셔 플 서 비 스 를 등록 합 니 다.
def initialize(appId: String): Unit = {
  //    BlockTransferServicerpc serverBlockTransferService            
  blockTransferService.init(this)
  //    ShuffleClientexecutor  shuffle      
  shuffleClient.init(appId)
  //   blockManagerId
  blockManagerId = BlockManagerId(
    executorId, blockTransferService.hostName, blockTransferService.port)
  //   shuffleServerId
  shuffleServerId = if (externalShuffleServiceEnabled) {
    BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
  } else {
    blockManagerId
  }
  //  BlockManagerMaster  BlockManagerslaveEndpoint,   BlockManagerMaster  
  master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

  //     Shuffle       ExecutorShuffle  
  if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
    registerWithExternalShuffleServer()
  }
}

 
2.2reportAllBlocks 는 BlockManager 의 모든 데이터 블록 을 보고 합 니 다.
private def reportAllBlocks(): Unit = {
  logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
  for ((blockId, info) blockInfo) {
    //      block     
    val status = getCurrentBlockStatus(blockId, info)
    if (!tryToReportBlockStatus(blockId, info, status)) {
      logError(s"Failed to report $blockId to master; giving up.")
      return
    }
  }
}

 
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
  info.synchronized {
    info.level match {
      case null =>
        BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
      case level =>
        val inMem = level.useMemory && memoryStore.contains(blockId)
        val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
        val onDisk = level.useDisk && diskStore.contains(blockId)
        val deserialized = if (inMem) level.deserialized else false
        val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
        val storageLevel =
          StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
        val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
        val externalBlockStoreSize =
          if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
        val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
        BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
    }
  }
}

 
2.3reregister 는 BlockManager 를 등록 하고 BlockManager 의 모든 데이터 블록 상 태 를 보고 합 니 다.
def reregister(): Unit = {
  // TODO: We might need to rate limit re-registering.
  logInfo("BlockManager re-registering with master")
  master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
  reportAllBlocks()
}

 
2.4 getBlockData 로 컬 데이터 블록 데이터 가 져 오기
\ # shuffle 의 데이터 블록 이 라면 ShuffleBlock Resolver 를 통 해 데이터 블록 을 가 져 옵 니 다. 그렇지 않 으 면 doGetLocal 을 호출 하여 로 컬 에서 가 져 옵 니 다.
\ # 결 과 는 buffer 로 봉 인 된 다음 NioManaged Buffer 를 만 들 고 돌아 갑 니 다.
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  //        shuffle    
  if (blockId.isShuffle) {
    //    shuffleShuffleBlockResolver     
    shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
    //     block  
    val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
      .asInstanceOf[Option[ByteBuffer]]
    if (blockBytesOpt.isDefined) {
      //   bufferNioManagedBuffer
      val buffer = blockBytesOpt.get
      new NioManagedBuffer(buffer)
    } else {
      throw new BlockNotFoundException(blockId.toString)
    }
  }
}

 
\ # doGetLocal: 지정 한 BlockId 에 따라 로 컬 block 데 이 터 를 가 져 옵 니 다. 메모리 에 존재 하 는 경우 메모리 에서 직접 가 져 옵 니 다.디스크 에 저 장 된 것 이 라면 디스크 에서 가 져 오고, MEMORY 라면AND_DISK, 메모 리 를 먼저 넣 고 데 이 터 를 되 돌려 줍 니 다. 다음 에 메모리 에서 가 져 올 수 있 습 니 다. 그렇지 않 으 면 바로 되 돌려 줍 니 다.
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
  //   blockId  blockInfo
  val info = blockInfo.get(blockId).orNull
  //   blockInfo   
  if (info != null) {
    info.synchronized {
      //     blockInfo    
      if (blockInfo.get(blockId).isEmpty) {
        logWarning(s"Block $blockId had been removed")
        return None
      }

      //    blockblock  ready  
      if (!info.waitForReady()) {
        // If we get here, the block write failed.
        logWarning(s"Block $blockId was marked as failure.")
        return None
      }
      //    Block     
      val level = info.level
      logDebug(s"Level for block $blockId is $level")

      // MemoryStore    
      if (level.useMemory) {
        logDebug(s"Getting block $blockId from memory")
        val result = if (asBlockResult) {
          memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
        } else {
          memoryStore.getBytes(blockId)
        }
        result match {
          case Some(values) =>
            return result
          case None =>
            logDebug(s"Block $blockId not found in memory")
        }
      }

      // ExternalBlockStore    
      if (level.useOffHeap) {
        logDebug(s"Getting block $blockId from ExternalBlockStore")
        if (externalBlockStore.contains(blockId)) {
          val result = if (asBlockResult) {
            externalBlockStore.getValues(blockId)
              .map(new BlockResult(_, DataReadMethod.Memory, info.size))
          } else {
            externalBlockStore.getBytes(blockId)
          }
          result match {
            case Some(values) =>
              return result
            case None =>
              logDebug(s"Block $blockId not found in ExternalBlockStore")
          }
        }
      }

      // DiskStore    
      if (level.useDisk) {
        logDebug(s"Getting block $blockId from disk")
        val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
          case Some(b) => b
          case None =>
            throw new BlockException(
              blockId, s"Block $blockId not found on disk, though it should be")
        }
        assert(0 == bytes.position())
        // 
        if (!level.useMemory) {
          // If the block shouldn't be stored in memory, we can just return it
          if (asBlockResult) {
            return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
              info.size))
          } else {
            return Some(bytes)
          }
        } else {
          //        ,            ,        block
          if (!level.deserialized || !asBlockResult) {
            memoryStore.putBytes(blockId, bytes.limit, () => {
              val copyForMemory = ByteBuffer.allocate(bytes.limit)
              //         blockOOMByteBuffer     
              copyForMemory.put(bytes)
            })
            bytes.rewind()
          }
          if (!asBlockResult) {
            return Some(bytes)
          } else {//      BlockResult  
            //            
            val values = dataDeserialize(blockId, bytes)
            //            
            if (level.deserialized) {
              //            
              val putResult = memoryStore.putIterator(
                blockId, values, level, returnValues = true, allowPersistToDisk = false)
              putResult.data match {
                case Left(it) =>
                  return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
                case _ =>
                  // This only happens if we dropped the values back to disk (which is never)
                  throw new SparkException("Memory store did not return an iterator!")
              }
            } else {
              return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
            }
          }
        }
      }
    }
  } else {
    logDebug(s"Block $blockId not registered locally")
  }
  None
}

 
2.5 getLocal 로 컬 BlockManager 에서 데이터 가 져 오기
def getLocal(blockId: BlockId): Option[BlockResult] = {
  logDebug(s"Getting local block $blockId")
  doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}

 
2.6 getLocalgetLocalbytes 로 컬 BlockManager 에서 데 이 터 를 가 져 오고 직렬 화 된 결과
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
  logDebug(s"Getting local block $blockId as bytes")

  if (blockId.isShuffle) {
    val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
    // TODO: This should gracefully handle case where local block is not available. Currently
    // downstream code will throw an exception.
    Option(
      shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
  } else {
    doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
  }
}

 
2.7doGetRemote  원 격 에서 데이터 가 져 오기
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
  require(blockId != null, "BlockId is null")
  //  blockId   BlockManagerId
  val locations = Random.shuffle(master.getLocations(blockId))
  //      BlockManagerId,  blockTransferService fetchBlockSync
  for (loc locations) {
    logDebug(s"Getting remote block $blockId from $loc")
    val data = blockTransferService.fetchBlockSync(
      loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()

    if (data != null) {
      if (asBlockResult) {
        return Some(new BlockResult(
          dataDeserialize(blockId, data),
          DataReadMethod.Network,
          data.limit()))
      } else {
        return Some(data)
      }
    }
    logDebug(s"The value of block $blockId is null")
  }
  logDebug(s"Block $blockId not found")
  None
}

 
2.8doPut 는 StorageLevel 에 따라 데 이 터 를 저장 합 니 다.
private def doPut(blockId: BlockId, data: BlockValues,
    level: StorageLevel, tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None)
  : Seq[(BlockId, BlockStatus)] = {
  //   BlockIdStorageLevel    
  require(blockId != null, "BlockId is null")
  require(level != null && level.isValid, "StorageLevel is null or invalid")
  effectiveStorageLevel.foreach { level =>
    require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
  }

  // blockId block  
  val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

  //   BlockInfo
  val putBlockInfo = {
    //   BlockInfo  
    val tinfo = new BlockInfo(level, tellMaster)
    //   blockId   BlockInfokey    value   value
    //      key   value,   ,         ,    ,       
    val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
    //        BlockInfo,         ,             (BlockId, BlockStatus)  
    if (oldBlockOpt.isDefined) {
      if (oldBlockOpt.get.waitForReady()) {
        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
        return updatedBlocks
      }
      oldBlockOpt.get
    } else {
      tinfo //       blockInfo
    }
  }

  val startTimeMs = System.currentTimeMillis
  var valuesAfterPut: Iterator[Any] = null
  var bytesAfterPut: ByteBuffer = null
  //      
  var size = 0L
  //      
  val putLevel = effectiveStorageLevel.getOrElse(level)
  //              ,               ,             
  val replicationFuture = data match {
    case b: ByteBufferValues if putLevel.replication > 1 =>
      //        buffer
      val bufferView = b.buffer.duplicate()
      Future {
        //          
        replicate(blockId, bufferView, putLevel)
      }(futureExecutionContext)
    case _ => null
  }
  //       put  block,            marked  true
  putBlockInfo.synchronized {
    logTrace("Put for block %s took %s to get into synchronized block"
      .format(blockId, Utils.getUsedTimeMs(startTimeMs)))

    var marked = false
    try {
      // returnValues:    put    
      // blockStore
      val (returnValues, blockStore: BlockStore) = {
        //       true MemoryStore
        if (putLevel.useMemory) {
          (true, memoryStore)
        } else if (putLevel.useOffHeap) {
          //      false ExternalBlockStore
          (false, externalBlockStore)
        } else if (putLevel.useDisk) {
          // 1  true DiskStorefalse DiskStore
          (putLevel.replication > 1, diskStore)
        } else {
          assert(putLevel == StorageLevel.NONE)
          throw new BlockException(
            blockId, s"Attempted to put block $blockId without specifying storage level!")
        }
      }

      //   putblockStore  
      val result = data match {
        // BlockStore putIterator  
        case IteratorValues(iterator) =>
          blockStore.putIterator(blockId, iterator, putLevel, returnValues)
        // BlockStore putArray  
        case ArrayValues(array) =>
          blockStore.putArray(blockId, array, putLevel, returnValues)
        // ByteBufferValues  ,  BlockStore putBytes  
        case ByteBufferValues(bytes) =>
          bytes.rewind()
          blockStore.putBytes(blockId, bytes, putLevel)
      }
      //       
      size = result.size
      result.data match {
        case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
        case Right (newBytes) => bytesAfterPut = newBytes
        case _ =>
      }

      // result    droppedBlocks,       block   updateBlock   
      if (putLevel.useMemory) {
        result.droppedBlocks.foreach { updatedBlocks += _ }
      }
      //     block  
      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
      if (putBlockStatus.storageLevel != StorageLevel.NONE) {
        //  marked    true
        marked = true
        //   blockInfo markReady   block     
        putBlockInfo.markReady(size)
        //  Master  block  
        if (tellMaster) {
          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
        }
        //   updatedBlocks
        updatedBlocks += ((blockId, putBlockStatus))
      }
    } finally {
      // If we failed in putting the block to memory/disk, notify other possible readers
      // that it has failed, and then remove it from the block info map.
      if (!marked) {
        // Note that the remove must happen before markFailure otherwise another thread
        // could've inserted a new BlockInfo before we remove it.
        blockInfo.remove(blockId)
        putBlockInfo.markFailure()
        logWarning(s"Putting block $blockId failed")
      }
    }
  }
  logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))

  //         1
  if (putLevel.replication > 1) {
    data match {
      case ByteBufferValues(bytes) =>
        if (replicationFuture != null) {
          Await.ready(replicationFuture, Duration.Inf)
        }
      case _ =>
        val remoteStartTime = System.currentTimeMillis
        // Serialize the block if not already done
        if (bytesAfterPut == null) {
          if (valuesAfterPut == null) {
            throw new SparkException(
              "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
          }
          bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
        }
        replicate(blockId, bytesAfterPut, putLevel)
        logDebug("Put block %s remotely took %s"
          .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
    }
  }
  // ByteBuffer
  BlockManager.dispose(bytesAfterPut)

  if (putLevel.replication > 1) {
    logDebug("Putting block %s with replication took %s"
      .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  } else {
    logDebug("Putting block %s without replication took %s"
      .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  }

  updatedBlocks
}

 
2. getPeers 클 러 스 터 에 있 는 현재 BlockManager Id 와 드라이버 가 아 닌 BlockManager Id 의 모든 BlockManager Id 가 져 오기
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
  peerFetchLock.synchronized {
    val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
    val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
    if (cachedPeers == null || forceFetch || timeout) {
      cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
      lastPeerFetchTime = System.currentTimeMillis
      logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
    }
    cachedPeers
  }
}

 
2.10 replicate 데이터 블록 을 다른 노드 로 복사 합 니 다.
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
  //            
  val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
  //         BlockManagerId  
  val numPeersToReplicateTo = level.replication - 1
  //       BlockManagerId  
  val peersForReplication = new ArrayBuffer[BlockManagerId]
  //         BlockManagerId  
  val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
  //      BlockManagerId  
  val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
  //       
  val tLevel = StorageLevel(
    level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
  val startTime = System.currentTimeMillis
  val random = new Random(blockId.hashCode)

  var replicationFailed = false
  var failures = 0
  var done = false

  //   Executor   BlockManagerId  
  peersForReplication ++= getPeers(forceFetch = false)
  //        BlockManagerId
  def getRandomPeer(): Option[BlockManagerId] = {
    //       
    if (replicationFailed) {
      //     
      peersForReplication.clear()
      //    Executor   BlockManagerId       
      peersForReplication ++= getPeers(forceFetch = true)
      //           BlockManagerId
      peersForReplication --= peersReplicatedTo
      //           BlockManagerId
      peersForReplication --= peersFailedToReplicateTo
    }
    //         BlockManagerIdBlockManagerId
    if (!peersForReplication.isEmpty) {
      Some(peersForReplication(random.nextInt(peersForReplication.size)))
    } else {
      None
    }
  }
  // 
  while (!done) {
    //        BlockManagerId
    getRandomPeer() match {
      case Some(peer) =>
        try {
          val onePeerStartTime = System.currentTimeMillis
          data.rewind()
          logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
          //   BlockTransferService uploadBlockSyncblock
          blockTransferService.uploadBlockSync(
            peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
          logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
            .format(System.currentTimeMillis - onePeerStartTime))
          //       BlockManagerId   
          peersReplicatedTo += peer
          //      BlockManagerId        BlockManagerIdBlockManager
          peersForReplication -= peer
          replicationFailed = false
          // 
          if (peersReplicatedTo.size == numPeersToReplicateTo) {
            done = true
          }
        } catch {
          case e: Exception =>
            logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
            failures += 1
            replicationFailed = true
            peersFailedToReplicateTo += peer
            if (failures > maxReplicationFailures) { // too many failures in replcating to peers
              done = true
            }
        }
      case None => // no peer left to replicate to
        done = true
    }
  }
  val timeTakeMs = (System.currentTimeMillis - startTime)
  logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
    s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
  if (peersReplicatedTo.size < numPeersToReplicateTo) {
    logWarning(s"Block $blockId replicated to only " +
      s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
  }
}

 
2.11 dropFromMemory 메모리 에서 어떤 block 을 포기 합 니 다. 메모리 가 가득 찼 는 지 디스크 에 두 는 것 이 좋 습 니 다.
def dropFromMemory(
    blockId: BlockId,
    data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

  logInfo(s"Dropping block $blockId from memory")
  //   BlockInfo
  val info = blockInfo.get(blockId).orNull

  // If the block has not already been dropped
  if (info != null) {
    info.synchronized {
      if (!info.waitForReady()) {
        logWarning(s"Block $blockId was marked as failure. Nothing to drop")
        return None
      } else if (blockInfo.get(blockId).isEmpty) {
        logWarning(s"Block $blockId was already dropped.")
        return None
      }
      var blockIsUpdated = false
      val level = info.level

      //                block,     
      if (level.useDisk && !diskStore.contains(blockId)) {
        logInfo(s"Writing block $blockId to disk")
        data() match {
          case Left(elements) =>
            diskStore.putArray(blockId, elements, level, returnValues = false)
          case Right(bytes) =>
            diskStore.putBytes(blockId, bytes, level)
        }
        blockIsUpdated = true
      }

      //         blockId,     blocl  
      val droppedMemorySize =
        if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
      // MemoryStore    block
      val blockIsRemoved = memoryStore.remove(blockId)
      if (blockIsRemoved) {
        blockIsUpdated = true
      } else {
        logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
      }
      //       block  
      val status = getCurrentBlockStatus(blockId, info)
      if (info.tellMaster) {
        //  master       
        reportBlockStatus(blockId, info, status, droppedMemorySize)
      }
      //     blockId
      if (!level.useDisk) {
        // The block is completely gone from this node; forget it so we can put() it again later.
        blockInfo.remove(blockId)
      }
      if (blockIsUpdated) {
        return Some(status)
      }
    }
  }
  None
}

 
2.12 removeRdd 현재 RDD 의 데이터 블록 을 모두 삭제 합 니 다.
def removeRdd(rddId: Int): Int = {
  // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
  logInfo(s"Removing RDD $rddId")
  val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
  blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
  blocksToRemove.size
}

 
2.13 removeBlock 메모리 와 디스크 에서 Block 을 제거 합 니 다.
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
  logDebug(s"Removing block $blockId")
  //   blockId  blockInfo
  val info = blockInfo.get(blockId).orNull
  if (info != null) {
    info.synchronized {
      //       block
      val removedFromMemory = memoryStore.remove(blockId)
      //       block
      val removedFromDisk = diskStore.remove(blockId)
      // ExternalBlockStore    block
      val removedFromExternalBlockStore =
        if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
      if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
        logWarning(s"Block $blockId could not be removed as it was not found in either " +
          "the disk, memory, or external block store")
      }
      //       blockId
      blockInfo.remove(blockId)
      // master       
      if (tellMaster && info.tellMaster) {
        val status = getCurrentBlockStatus(blockId, info)
        reportBlockStatus(blockId, info, status)
      }
    }
  } else {
    // The block has already been removed; do nothing.
    logWarning(s"Asked to remove block $blockId, which does not exist")
  }
}

 
 
 

좋은 웹페이지 즐겨찾기