Spark 분석의 BlockManager

3519 단어
Block Manager에서 Block을 저장하는 프로세스:doPut() 방법
참조: BlockId, 데이터, level, tellMaster
1) Block에 대해 BlockInfo를 생성하고 다른 스레드에 액세스할 수 없도록 잠금을 설정합니다.
2) Block의 저장 수준에 따라:useMemory,useOffHeap,useDisk를 저장하고 이 Block이 다른 라인에 접근할 수 있음을 표시한다.
주:useMemory를 사용하면 useDisk를 사용해도 처음에는 메모리만 존재하고 하드디스크에 바로 저장되지 않으며 메모리가 부족할 때만 일부partition 데이터drop을 하드디스크에 저장합니다.
3)tellMaster=true(기본값은true):reportBlockStatus(BlockId,putBlockInfo,putBlockStatus)
BlockManagerMaster에 새 데이터 쓰기가 있음을 알리고 BlockManagerMaster에서 Block 정보를 업데이트합니다.
4) Block의 Replication 수에 따라 이 Block을 다른 노드에 백업할지 여부 결정 (비동기)
1) 메모리 결과는 서열화된 바이트 그룹입니다
2) 스토리지 결과는 서열화된 값이 없음
백업 데이터의 서열화: 바이트 그룹으로 서열화하기;먼저 압축한 다음에 서열화하다
기본 압축은 snappy입니다.spark를 통과할 수 있습니다.io.compression.codec 매개 변수를 설정합니다.
서열화는 기본적으로 org를 사용합니다.apache.spark.serializer.Java Serializer, spark를 통과할 수 있습니다.serializer 파라미터를 설정하기;BlockManager를 만들 때 설정합니다.
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)  //     
replicate(blockId, bytesAfterPut, putLevel){ //         
    val putBlock = PutBlock(blockId, data, eLevel)
    val cmId = new ConnectionManagerId(host, port)
    BlockManagerWorker.syncPutBlock(putBlock, cmId)
}

 
BlockManagerWorker는 데이터 분실을 방지할 때 복구할 수 있도록 데이터 백업 작업을 진행하고, 데이터를 다른 노드로 복사(비동기)ConnectionManager는 다른 계산 결점과 연결하며, 데이터의 발송과 수신을 책임진다.
 
BlockManager에서 Block을 가져오는 절차:get () 방법
//Get a block from the block manager (either local or remote).
def get(blockId: BlockId): Option[BlockResult] = {
    val local = getLocal(blockId)  //  doGetLocal()  
    if (local.isDefined) {
      return local
    }
    val remote = getRemote(blockId) //  doGetRemote()  
    if (remote.isDefined) {
      return remote
    }
    None
}

 
1) 로컬 블록 관리자에서 먼저 찾기:useMemory,useOffHeap,useDisk에서 순서대로 찾기;
Blockid에 따라 대응하는 Blockinfo(이 Blockinfo가 잠겼음)를 획득하고 이 Blockinfo의storage level을 획득하여 다음과 같은 지점에 들어가서 찾습니다.
  level.useMemory는 Memory에서 Block을 꺼내서 되돌려줍니다. 없으면 다음 지점으로 들어갑니다.
  level.useOffHeap은 Tachyon에서 Block을 꺼내서 되돌려줍니다. 없으면 다음 지점으로 들어갑니다.
  level.useDisk
    level.다음에 사용할 때 메모리에서 가져올 수 있도록 Block을 디스켓에서 읽고 쓰기;
    level.useMemory=false는 Block을 disk에서 읽고 되돌려줍니다.
2) 원격(executor)의 BlockManager에서 찾을 수 없음(BlockManagerWorker.syncGetBlock)
이 Block의location 정보 얻기;
location에 따라 원격으로 요청을 보내서 Block을 가져옵니다. 원격으로 Block을 되돌려주면 이 함수는 되돌려주고 요청을 계속 보내지 않습니다.
주: 일반적인 경우spark 작업의 분배는 Block의 분포에 따라 결정됩니다. 작업은 Block이 있는 노드에 분배되기 때문에 getLocal()에서 필요한 Block을 찾을 수 있습니다.그러나 자원에 한계가 있는 경우,spark는 Block과 다른 노드에 작업을 스케줄링합니다. 그러면 getRemote () 를 통해 Block을 얻어야 합니다.

좋은 웹페이지 즐겨찾기