Spark 소스 코드 분석의 BlockManager
93548 단어 빅 데이터 / spark / 소스 코드
프로그램 이 시 작 될 때 SparkContext 는 Driver 엔 드 의 SpakEnv 를 만 들 고 이 SparkEnv 에서 BlockManager 와 BlockManager Master 를 예화 하여 내부 에 메시지 통신 의 터미널 BlockManager MasterEndpoint 를 만 듭 니 다.
Executor 가 시 작 될 때 도 SparkEnv 를 만 듭 니 다. 이 SparkEnv 에서 BlockManager 와 BlockTransfer Service 를 실례 화 합 니 다. BlockManager 초기 화 과정 에서 BlockManager SlaveEndpoint 의 메시지 단말 기 를 추가 하고 이 BlockManager SlaveEndpoint 의 이 종 단 을 Driver 에 참조 하여 등록 합 니 다.이렇게 하면 Driver 와 Executor 는 서로 통신 단말기 의 인용 을 가지 고 있 을 수 있다.
/ / BlockManager Masterval blockManager Master = new BlockManager Master (registerOrLookupEndpoint (registerOrLookupEndpoint) 만 들 기 BlockManagerMaster.DRIVER_ENDPOINT_NAME, // BlockManager MasterEndpoint 만 들 기 new BlockManagerMasterEndpoint(rpcEnv,isLocal, conf,listenerBus)), conf, isDriver) / / NB: blockManager 는 initialize () 가 나중에 호출 될 때 까지 유효 하지 않 습 니 다. / / BlockManagerval blockManager = new BlockManager (executorId, rpcEnv, blockManagerMaster, serializer, conf,mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
핵심 속성
String executorId: executorId 또는 driverId
RpcEnv rpcEnv: rpc 통신 환경
BlockManagerMaster master: 전체 프로그램 이 실행 되 는 동안 block 메타 데이터 의 관리 와 유지, 명령 의 전송 을 책임 집 니 다.
Serializer default Serializer: 기본 직렬 화 메커니즘
Long max Memory: 최대 사용 가능 한 메모리 할당
Int numUsableCore: cpu 핵 수 를 사용 할 수 있 습 니 다.
MapOutputTracker mapOutputTracker: map 단 shuffle 프로 세 스 의 출력 상태
셔 플 관리자 셔 플 관리자: 셔 플 관리자
BlockTransferService blockTransferService: 원 격 으로 데 이 터 를 전송 하고 침대 블록 을 가 져 오 는 데 사용 합 니 다.
DiskBlockManager diskBlockManager: 디스크 에 있 는 block 및 대응 하 는 파일 과 디 렉 터 리 를 관리 하 는 데 사 용 됩 니 다.
TimeStamped HashMap [BlockId, BlockInfo] blockInfo: 유지 보수 맵 구축
Boolean externalBlockStore 초기 화: 외부 저장 소 를 사용 할 지 여부
Memory Store memory Store: 메모리 대상
DiskStore diskStore: 디스크 저장 대상
ExternalBlockStore externalBlockStore: 외부 촌 처 대상
BlockManager Id blockManager Id: 현재 BlockManager 에 대응 하 는 id
ShuffleClient shuffleClient: 다른 executor 의 shuffle 파일 을 읽 는 클 라 이언 트 는 외부 서비스 일 수도 있 고 표준 데이터 블록 전송 서비스 일 수도 있 습 니 다. 외부 shuffle 서 비 스 를 사용 하면 External ShuffleClient 를 만 들 지 않 으 면 BlockTransferService 를 만 듭 니 다.
Boolean compressBroadcast: 방송 데 이 터 를 압축 할 지 여부
Boolean copressShuffle: map 출력 파일 을 압축 할 지 여 부 는 일반적으로 열 어 보 는 것 을 권장 합 니 다. 그러나 cpu 자원 소모 가 너무 많 으 면 true 로 설정 하 는 것 을 권장 하지 않 습 니 다.
Boolean compressRdds: 직렬 화 된 RDD 파 티 션 을 압축 할 지 여부
Boolean compressShuffle Spill: 맵 에 넘 쳐 쓰 인 임시 파일 을 압축 할 지 여부
BlockManager SlaveEndpoint slaveEndpoint: 보유 한 BlockManager SlaveEndpoint 통신 단말기
중요 한 방법
2.1initialize 초기 화 방법, 지정 한 application id 로 BlockManager 를 예화 합 니 다.
\ # BlockTransferService 초기 화, rpc server 구축 등 BlockTransferService 는 주로 노드 간 데이터 전송 에 사 용 됩 니 다.
\ # ShuffleClient 를 초기 화하 고 다른 executor 의 shuffle 파일 을 읽 는 클 라 이언 트
\ # blockManager Id 구축
\ # shuffleServerId 구축
\ # BlockManager Master 에 BlockManager 등록
\ # 외부 셔 플 서비스 가 시작 되 고 Executor 노드 가 되면 외부 셔 플 서 비 스 를 등록 합 니 다.
def initialize(appId: String): Unit = {
// BlockTransferService, rpc server ,BlockTransferService
blockTransferService.init(this)
// ShuffleClient, executor shuffle
shuffleClient.init(appId)
// blockManagerId
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
// shuffleServerId
shuffleServerId = if (externalShuffleServiceEnabled) {
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// BlockManagerMaster BlockManager, slaveEndpoint, BlockManagerMaster
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Shuffle Executor , Shuffle
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
2.2reportAllBlocks 는 BlockManager 의 모든 데이터 블록 을 보고 합 니 다.
private def reportAllBlocks(): Unit = {
logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
for ((blockId, info) blockInfo) {
// block
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
}
}
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
info.synchronized {
info.level match {
case null =>
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
val storageLevel =
StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val externalBlockStoreSize =
if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
}
}
}
2.3reregister 는 BlockManager 를 등록 하고 BlockManager 의 모든 데이터 블록 상 태 를 보고 합 니 다.
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo("BlockManager re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
reportAllBlocks()
}
2.4 getBlockData 로 컬 데이터 블록 데이터 가 져 오기
\ # shuffle 의 데이터 블록 이 라면 ShuffleBlock Resolver 를 통 해 데이터 블록 을 가 져 옵 니 다. 그렇지 않 으 면 doGetLocal 을 호출 하여 로 컬 에서 가 져 옵 니 다.
\ # 결 과 는 buffer 로 봉 인 된 다음 NioManaged Buffer 를 만 들 고 돌아 갑 니 다.
override def getBlockData(blockId: BlockId): ManagedBuffer = {
// shuffle
if (blockId.isShuffle) {
// shuffle , ShuffleBlockResolver
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
// block
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
.asInstanceOf[Option[ByteBuffer]]
if (blockBytesOpt.isDefined) {
// buffer, NioManagedBuffer
val buffer = blockBytesOpt.get
new NioManagedBuffer(buffer)
} else {
throw new BlockNotFoundException(blockId.toString)
}
}
}
\ # doGetLocal: 지정 한 BlockId 에 따라 로 컬 block 데 이 터 를 가 져 옵 니 다. 메모리 에 존재 하 는 경우 메모리 에서 직접 가 져 옵 니 다.디스크 에 저 장 된 것 이 라면 디스크 에서 가 져 오고, MEMORY 라면AND_DISK, 메모 리 를 먼저 넣 고 데 이 터 를 되 돌려 줍 니 다. 다음 에 메모리 에서 가 져 올 수 있 습 니 다. 그렇지 않 으 면 바로 되 돌려 줍 니 다.
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
// blockId blockInfo
val info = blockInfo.get(blockId).orNull
// blockInfo
if (info != null) {
info.synchronized {
// blockInfo
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
// block , block ready
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure.")
return None
}
// Block
val level = info.level
logDebug(s"Level for block $blockId is $level")
// , MemoryStore
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
}
// , ExternalBlockStore
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
// , DiskStore
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position())
// ,
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
if (asBlockResult) {
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
info.size))
} else {
return Some(bytes)
}
} else {
// , , block ,
if (!level.deserialized || !asBlockResult) {
memoryStore.putBytes(blockId, bytes.limit, () => {
val copyForMemory = ByteBuffer.allocate(bytes.limit)
// block , OOM, ByteBuffer
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
return Some(bytes)
} else {// BlockResult
//
val values = dataDeserialize(blockId, bytes)
//
if (level.deserialized) {
//
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
// This only happens if we dropped the values back to disk (which is never)
throw new SparkException("Memory store did not return an iterator!")
}
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
}
}
} else {
logDebug(s"Block $blockId not registered locally")
}
None
}
2.5 getLocal 로 컬 BlockManager 에서 데이터 가 져 오기
def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}
2.6 getLocalgetLocalbytes 로 컬 BlockManager 에서 데 이 터 를 가 져 오고 직렬 화 된 결과
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
logDebug(s"Getting local block $blockId as bytes")
if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
}
2.7doGetRemote 원 격 에서 데이터 가 져 오기
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
// blockId BlockManagerId,
val locations = Random.shuffle(master.getLocations(blockId))
// BlockManagerId, blockTransferService fetchBlockSync ,
for (loc locations) {
logDebug(s"Getting remote block $blockId from $loc")
val data = blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
if (data != null) {
if (asBlockResult) {
return Some(new BlockResult(
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.limit()))
} else {
return Some(data)
}
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}
2.8doPut 는 StorageLevel 에 따라 데 이 터 를 저장 합 니 다.
private def doPut(blockId: BlockId, data: BlockValues,
level: StorageLevel, tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {
// BlockId,StorageLevel
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
effectiveStorageLevel.foreach { level =>
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}
// , blockId block
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
// BlockInfo
val putBlockInfo = {
// BlockInfo
val tinfo = new BlockInfo(level, tellMaster)
// blockId BlockInfo , key value value,
// key value, , , ,
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
// BlockInfo, , (BlockId, BlockStatus)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
oldBlockOpt.get
} else {
tinfo // blockInfo
}
}
val startTimeMs = System.currentTimeMillis
var valuesAfterPut: Iterator[Any] = null
var bytesAfterPut: ByteBuffer = null
//
var size = 0L
//
val putLevel = effectiveStorageLevel.getOrElse(level)
// , ,
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// buffer
val bufferView = b.buffer.duplicate()
Future {
//
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
// put block, marked true
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
var marked = false
try {
// returnValues: put
// blockStore:
val (returnValues, blockStore: BlockStore) = {
// true MemoryStore
if (putLevel.useMemory) {
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// false ExternalBlockStore
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// , 1 true DiskStore, false DiskStore
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
// put , blockStore
val result = data match {
// , BlockStore putIterator
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
// , BlockStore putArray
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
// ByteBufferValues , BlockStore putBytes
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
//
size = result.size
result.data match {
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
// , result droppedBlocks, block updateBlock
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
// block
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// marked true
marked = true
// blockInfo markReady block
putBlockInfo.markReady(size)
// Master block
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
// updatedBlocks
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
// If we failed in putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
if (!marked) {
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
putBlockInfo.markFailure()
logWarning(s"Putting block $blockId failed")
}
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
// 1,
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
// , ByteBuffer
BlockManager.dispose(bytesAfterPut)
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
updatedBlocks
}
2. getPeers 클 러 스 터 에 있 는 현재 BlockManager Id 와 드라이버 가 아 닌 BlockManager Id 의 모든 BlockManager Id 가 져 오기
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
peerFetchLock.synchronized {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
if (cachedPeers == null || forceFetch || timeout) {
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
cachedPeers
}
}
2.10 replicate 데이터 블록 을 다른 노드 로 복사 합 니 다.
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
//
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
// BlockManagerId
val numPeersToReplicateTo = level.replication - 1
// BlockManagerId
val peersForReplication = new ArrayBuffer[BlockManagerId]
// BlockManagerId
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
// BlockManagerId
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
//
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
var replicationFailed = false
var failures = 0
var done = false
// Executor BlockManagerId
peersForReplication ++= getPeers(forceFetch = false)
// BlockManagerId
def getRandomPeer(): Option[BlockManagerId] = {
//
if (replicationFailed) {
//
peersForReplication.clear()
// Executor BlockManagerId
peersForReplication ++= getPeers(forceFetch = true)
// BlockManagerId
peersForReplication --= peersReplicatedTo
// BlockManagerId
peersForReplication --= peersFailedToReplicateTo
}
// BlockManagerId , BlockManagerId
if (!peersForReplication.isEmpty) {
Some(peersForReplication(random.nextInt(peersForReplication.size)))
} else {
None
}
}
// ,
while (!done) {
// BlockManagerId
getRandomPeer() match {
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
// BlockTransferService uploadBlockSync , block
blockTransferService.uploadBlockSync(
peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
// BlockManagerId
peersReplicatedTo += peer
// BlockManagerId BlockManagerId, BlockManager
peersForReplication -= peer
replicationFailed = false
// ,
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true
}
} catch {
case e: Exception =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
failures += 1
replicationFailed = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) { // too many failures in replcating to peers
done = true
}
}
case None => // no peer left to replicate to
done = true
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}
2.11 dropFromMemory 메모리 에서 어떤 block 을 포기 합 니 다. 메모리 가 가득 찼 는 지 디스크 에 두 는 것 이 좋 습 니 다.
def dropFromMemory(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
// BlockInfo
val info = blockInfo.get(blockId).orNull
// If the block has not already been dropped
if (info != null) {
info.synchronized {
if (!info.waitForReady()) {
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
var blockIsUpdated = false
val level = info.level
// block,
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
blockIsUpdated = true
}
// blockId, blocl
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
// MemoryStore block
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
}
// block
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
// master
reportBlockStatus(blockId, info, status, droppedMemorySize)
}
// , blockId
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
return Some(status)
}
}
}
None
}
2.12 removeRdd 현재 RDD 의 데이터 블록 을 모두 삭제 합 니 다.
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}
2.13 removeBlock 메모리 와 디스크 에서 Block 을 제거 합 니 다.
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
// blockId blockInfo
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
// block
val removedFromMemory = memoryStore.remove(blockId)
// block
val removedFromDisk = diskStore.remove(blockId)
// ExternalBlockStore block
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
// blockId
blockInfo.remove(blockId)
// , master
if (tellMaster && info.tellMaster) {
val status = getCurrentBlockStatus(blockId, info)
reportBlockStatus(blockId, info, status)
}
}
} else {
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
}
}