Spark 분석의 BlockManager
참조: BlockId, 데이터, level, tellMaster
1) Block에 대해 BlockInfo를 생성하고 다른 스레드에 액세스할 수 없도록 잠금을 설정합니다.
2) Block의 저장 수준에 따라:useMemory,useOffHeap,useDisk를 저장하고 이 Block이 다른 라인에 접근할 수 있음을 표시한다.
주:useMemory를 사용하면 useDisk를 사용해도 처음에는 메모리만 존재하고 하드디스크에 바로 저장되지 않으며 메모리가 부족할 때만 일부partition 데이터drop을 하드디스크에 저장합니다.
3)tellMaster=true(기본값은true):reportBlockStatus(BlockId,putBlockInfo,putBlockStatus)
BlockManagerMaster에 새 데이터 쓰기가 있음을 알리고 BlockManagerMaster에서 Block 정보를 업데이트합니다.
4) Block의 Replication 수에 따라 이 Block을 다른 노드에 백업할지 여부 결정 (비동기)
1) 메모리 결과는 서열화된 바이트 그룹입니다
2) 스토리지 결과는 서열화된 값이 없음
백업 데이터의 서열화: 바이트 그룹으로 서열화하기;먼저 압축한 다음에 서열화하다
기본 압축은 snappy입니다.spark를 통과할 수 있습니다.io.compression.codec 매개 변수를 설정합니다.
서열화는 기본적으로 org를 사용합니다.apache.spark.serializer.Java Serializer, spark를 통과할 수 있습니다.serializer 파라미터를 설정하기;BlockManager를 만들 때 설정합니다.
bytesAfterPut = dataSerialize(blockId, valuesAfterPut) //
replicate(blockId, bytesAfterPut, putLevel){ //
val putBlock = PutBlock(blockId, data, eLevel)
val cmId = new ConnectionManagerId(host, port)
BlockManagerWorker.syncPutBlock(putBlock, cmId)
}
BlockManagerWorker는 데이터 분실을 방지할 때 복구할 수 있도록 데이터 백업 작업을 진행하고, 데이터를 다른 노드로 복사(비동기)ConnectionManager는 다른 계산 결점과 연결하며, 데이터의 발송과 수신을 책임진다.
BlockManager에서 Block을 가져오는 절차:get () 방법
//Get a block from the block manager (either local or remote).
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId) // doGetLocal()
if (local.isDefined) {
return local
}
val remote = getRemote(blockId) // doGetRemote()
if (remote.isDefined) {
return remote
}
None
}
1) 로컬 블록 관리자에서 먼저 찾기:useMemory,useOffHeap,useDisk에서 순서대로 찾기;
Blockid에 따라 대응하는 Blockinfo(이 Blockinfo가 잠겼음)를 획득하고 이 Blockinfo의storage level을 획득하여 다음과 같은 지점에 들어가서 찾습니다.
level.useMemory는 Memory에서 Block을 꺼내서 되돌려줍니다. 없으면 다음 지점으로 들어갑니다.
level.useOffHeap은 Tachyon에서 Block을 꺼내서 되돌려줍니다. 없으면 다음 지점으로 들어갑니다.
level.useDisk
level.다음에 사용할 때 메모리에서 가져올 수 있도록 Block을 디스켓에서 읽고 쓰기;
level.useMemory=false는 Block을 disk에서 읽고 되돌려줍니다.
2) 원격(executor)의 BlockManager에서 찾을 수 없음(BlockManagerWorker.syncGetBlock)
이 Block의location 정보 얻기;
location에 따라 원격으로 요청을 보내서 Block을 가져옵니다. 원격으로 Block을 되돌려주면 이 함수는 되돌려주고 요청을 계속 보내지 않습니다.
주: 일반적인 경우spark 작업의 분배는 Block의 분포에 따라 결정됩니다. 작업은 Block이 있는 노드에 분배되기 때문에 getLocal()에서 필요한 Block을 찾을 수 있습니다.그러나 자원에 한계가 있는 경우,spark는 Block과 다른 노드에 작업을 스케줄링합니다. 그러면 getRemote () 를 통해 Block을 얻어야 합니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.