spark 2.2.0 소스 코드 읽 기 - spark 코어 패키지 - storage

11621 단어 spark 소스 코드
1. 본 고의 목표 와 다른 설명:
    본 고 는 주로 storage 패키지 아래 의 종 류 를 소개 한다.
2. storage 패키지 아래 의 데이터 구조 설명
sealed abstract class BlockId { 
          。     rddblockid / shuffle / broadcast / task / stream / temlocal/ 

temshuffle 등등
private[storage] class BlockInfo(
    val level: StorageLevel,
    val classTag: ClassTag[_],
    val tellMaster: Boolean) {

데이터 블록 의 메타 정 보 를 유지 하고 추적 합 니 다.
private[storage] class BlockInfoManager extends Logging {

이 관리 자 는 스 레 드 (작업 ID: taskAttemptId) 의 읽 기, 쓰기 속도 (Blockid) 의 동기 화 정 보 를 동시에 유지 합 니 다. blockInfo 에는 읽 기 및 쓰기 잠 금 접근 정보 가 포함 되 어 있 습 니 다.
private[spark] trait BlockData {

데이터 블록 에 대한 추상 적 이 고 추상 적 인 데 이 터 를 어떻게 저장 하 는 지, 그리고 잠재 적 인 데 이 터 를 방문 하 는 방법 을 제공 했다.
private[spark] class ByteBufferBlockData(
    val buffer: ChunkedByteBuffer,
    val shouldDispose: Boolean) extends BlockData {

BlockData 의 구현 클래스, 바이트 버퍼 블록 데이터.
private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster,
    val serializerManager: SerializerManager,
    val conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    val blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

로 컬 및 원 격 데이터 블록 을 저장 하고 가 져 오 는 인 터 페 이 스 를 제공 합 니 다. 저장 방식 은 메모리, 디스크, 그리고 of - heap 세 가지 로 나 눌 수 있 습 니 다.memory Store / diskStore 를 통 해 데 이 터 를 저장 합 니 다.
class BlockManagerId private (
    private var executorId_ : String,
    private var host_ : String,
    private var port_ : Int,
    private var topologyInfo_ : Option[String])
  extends Externalizable {

BlockManager 의 유일한 표식 을 대표 합 니 다.
private[storage] class BlockManagerManagedBuffer(
    blockInfoManager: BlockInfoManager,
    blockId: BlockId,
    data: BlockData,
    dispose: Boolean) extends ManagedBuffer {

바 텀 호출 은 여전히 BlockData 데이터 구조의 방법 으로 사실은 BlockData 에 대해 소 포 를 한 번 한 것 이다.
class BlockManagerMaster(
    var driverEndpoint: RpcEndpointRef,
    conf: SparkConf,
    isDriver: Boolean)
  extends Logging {
              RpcEndpointRef   ,   driver   。    blockmanager, ,rdd,

방송 변수, 등등 driver 단 에 있 는 상태 입 니 다.
private[spark]
class BlockManagerMasterEndpoint(
    override val rpcEnv: RpcEnv,
    val isLocal: Boolean,
    conf: SparkConf,
    listenerBus: LiveListenerBus)
  extends ThreadSafeRpcEndpoint with Logging {

이 종 류 는 driver 단 에 만 존재 합 니 다. 바로 slave 의 blockmanager 를 관리 하 는 것 입 니 다.
sealed trait ToBlockManagerSlave

이것 은 master 에서 노드 로 보 내 는 메시지 입 니 다.
sealed trait ToBlockManagerMaster

이것 은 slaves 에서 master 노드 로 보 낸 메시지 입 니 다.
private[storage]
class BlockManagerSlaveEndpoint(
    override val rpcEnv: RpcEnv,
    blockManager: BlockManager,
    mapOutputTracker: MapOutputTracker)
  extends ThreadSafeRpcEndpoint with Logging {

주로 master 에서 보 내 온 메 시 지 를 받 습 니 다. 보통 블록 삭제, 백업 블록, 블록 정보 획득 등 입 니 다.
private[spark] class BlockManagerSource(val blockManager: BlockManager)
    extends Source {

데이터 원본, blockmanager 에서 유래 한 데이터 원본 정보
trait BlockReplicationPolicy {

백업 정책 도 하나의 방법 입 니 다. prioritize 는 선착순 으로 저 장 된 blockmanager 를 되 돌려 줍 니 다.
case class BlockUpdatedInfo(
    blockManagerId: BlockManagerId,
    blockId: BlockId,
    storageLevel: StorageLevel,
    memSize: Long,
    diskSize: Long)

블록 업데이트 정보
private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) 
extends Logging {
                  
private[spark] class DiskBlockObjectWriter(

jvm 대상 을 디스크 에 쓰 고 추가 로 쓸 수 있 습 니 다.
private[spark] class DiskStore(

Blockmanager 안의 블록 을 디스크 에 저장 합 니 다.
class RDDInfo(
    val id: Int,
    var name: String,
    val numPartitions: Int,
    var storageLevel: StorageLevel,
    val parentIds: Seq[Int],
    val callSite: String = "",
    val scope: Option[RDDOperationScope] = None)
  extends Ordered[RDDInfo] {

이 세 가지 방법: isCached, toString, compare
4. 567913. 원 격 데이터 블록 을 캡 처 하여 현재 데이터 구조의 차단 대기 열 에 저장 합 니 다.이 차단 대기 열 에서 교체 되 었 습 니 다.
private[spark]
final class ShuffleBlockFetcherIterator(
    context: TaskContext,
    shuffleClient: ShuffleClient,
    blockManager: BlockManager,
    blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long)])],
    streamWrapper: (BlockId, InputStream) => InputStream,
    maxBytesInFlight: Long,
    maxReqsInFlight: Int,
    maxBlocksInFlightPerAddress: Int,
    maxReqSizeShuffleToMem: Long,
    detectCorrupt: Boolean)
  extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging {

캡 처 요청 은 원 격 블록 관리자 와 해당 하 는 블록 입 니 다
case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long)]) {
  val size = blocks.map(_._2).sum
}

캡 처 한 반환 결 과 는 두 가지 종류 가 있 습 니 다.
private[storage] sealed trait FetchResult {
  val blockId: BlockId
  val address: BlockManagerId
}

캡 처 성공, 데 이 터 를 되 돌려 줍 니 다.
private[storage] case class SuccessFetchResult(
    blockId: BlockId,
    address: BlockManagerId,
    size: Long,
    buf: ManagedBuffer,
    isNetworkReqDone: Boolean) extends FetchResult {
  require(buf != null)
  require(size >= 0)
}

캡 처 실패, 이상 반환
private[storage] case class FailureFetchResult(
    blockId: BlockId,
    address: BlockManagerId,
    e: Throwable)
  extends FetchResult

현재 Blockmanager 에 대응 하 는 빠 른 상태 정보, 메모리 와 디스크 사용 상황 이 저장 되 어 있 습 니 다.

좋은 웹페이지 즐겨찾기