spark 2.2.0 소스 코드 읽 기 - spark 코어 패키지 - storage
11621 단어 spark 소스 코드
본 고 는 주로 storage 패키지 아래 의 종 류 를 소개 한다.
2. storage 패키지 아래 의 데이터 구조 설명
sealed abstract class BlockId {
。 rddblockid / shuffle / broadcast / task / stream / temlocal/
temshuffle 등등
private[storage] class BlockInfo(
val level: StorageLevel,
val classTag: ClassTag[_],
val tellMaster: Boolean) {
데이터 블록 의 메타 정 보 를 유지 하고 추적 합 니 다.
private[storage] class BlockInfoManager extends Logging {
이 관리 자 는 스 레 드 (작업 ID: taskAttemptId) 의 읽 기, 쓰기 속도 (Blockid) 의 동기 화 정 보 를 동시에 유지 합 니 다. blockInfo 에는 읽 기 및 쓰기 잠 금 접근 정보 가 포함 되 어 있 습 니 다.
private[spark] trait BlockData {
데이터 블록 에 대한 추상 적 이 고 추상 적 인 데 이 터 를 어떻게 저장 하 는 지, 그리고 잠재 적 인 데 이 터 를 방문 하 는 방법 을 제공 했다.
private[spark] class ByteBufferBlockData(
val buffer: ChunkedByteBuffer,
val shouldDispose: Boolean) extends BlockData {
BlockData 의 구현 클래스, 바이트 버퍼 블록 데이터.
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {
로 컬 및 원 격 데이터 블록 을 저장 하고 가 져 오 는 인 터 페 이 스 를 제공 합 니 다. 저장 방식 은 메모리, 디스크, 그리고 of - heap 세 가지 로 나 눌 수 있 습 니 다.memory Store / diskStore 를 통 해 데 이 터 를 저장 합 니 다.
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
private var topologyInfo_ : Option[String])
extends Externalizable {
BlockManager 의 유일한 표식 을 대표 합 니 다.
private[storage] class BlockManagerManagedBuffer(
blockInfoManager: BlockInfoManager,
blockId: BlockId,
data: BlockData,
dispose: Boolean) extends ManagedBuffer {
바 텀 호출 은 여전히 BlockData 데이터 구조의 방법 으로 사실은 BlockData 에 대해 소 포 를 한 번 한 것 이다.
class BlockManagerMaster(
var driverEndpoint: RpcEndpointRef,
conf: SparkConf,
isDriver: Boolean)
extends Logging {
RpcEndpointRef , driver 。 blockmanager, ,rdd,
방송 변수, 등등 driver 단 에 있 는 상태 입 니 다.
private[spark]
class BlockManagerMasterEndpoint(
override val rpcEnv: RpcEnv,
val isLocal: Boolean,
conf: SparkConf,
listenerBus: LiveListenerBus)
extends ThreadSafeRpcEndpoint with Logging {
이 종 류 는 driver 단 에 만 존재 합 니 다. 바로 slave 의 blockmanager 를 관리 하 는 것 입 니 다.
sealed trait ToBlockManagerSlave
이것 은 master 에서 노드 로 보 내 는 메시지 입 니 다.
sealed trait ToBlockManagerMaster
이것 은 slaves 에서 master 노드 로 보 낸 메시지 입 니 다.
private[storage]
class BlockManagerSlaveEndpoint(
override val rpcEnv: RpcEnv,
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends ThreadSafeRpcEndpoint with Logging {
주로 master 에서 보 내 온 메 시 지 를 받 습 니 다. 보통 블록 삭제, 백업 블록, 블록 정보 획득 등 입 니 다.
private[spark] class BlockManagerSource(val blockManager: BlockManager)
extends Source {
데이터 원본, blockmanager 에서 유래 한 데이터 원본 정보
trait BlockReplicationPolicy {
백업 정책 도 하나의 방법 입 니 다. prioritize 는 선착순 으로 저 장 된 blockmanager 를 되 돌려 줍 니 다.
case class BlockUpdatedInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long)
블록 업데이트 정보
private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean)
extends Logging {
private[spark] class DiskBlockObjectWriter(
jvm 대상 을 디스크 에 쓰 고 추가 로 쓸 수 있 습 니 다.
private[spark] class DiskStore(
Blockmanager 안의 블록 을 디스크 에 저장 합 니 다.
class RDDInfo(
val id: Int,
var name: String,
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
val callSite: String = "",
val scope: Option[RDDOperationScope] = None)
extends Ordered[RDDInfo] {
이 세 가지 방법: isCached, toString, compare
4. 567913. 원 격 데이터 블록 을 캡 처 하여 현재 데이터 구조의 차단 대기 열 에 저장 합 니 다.이 차단 대기 열 에서 교체 되 었 습 니 다.
private[spark]
final class ShuffleBlockFetcherIterator(
context: TaskContext,
shuffleClient: ShuffleClient,
blockManager: BlockManager,
blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long)])],
streamWrapper: (BlockId, InputStream) => InputStream,
maxBytesInFlight: Long,
maxReqsInFlight: Int,
maxBlocksInFlightPerAddress: Int,
maxReqSizeShuffleToMem: Long,
detectCorrupt: Boolean)
extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging {
캡 처 요청 은 원 격 블록 관리자 와 해당 하 는 블록 입 니 다
case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long)]) {
val size = blocks.map(_._2).sum
}
캡 처 한 반환 결 과 는 두 가지 종류 가 있 습 니 다.
private[storage] sealed trait FetchResult {
val blockId: BlockId
val address: BlockManagerId
}
캡 처 성공, 데 이 터 를 되 돌려 줍 니 다.
private[storage] case class SuccessFetchResult(
blockId: BlockId,
address: BlockManagerId,
size: Long,
buf: ManagedBuffer,
isNetworkReqDone: Boolean) extends FetchResult {
require(buf != null)
require(size >= 0)
}
캡 처 실패, 이상 반환
private[storage] case class FailureFetchResult(
blockId: BlockId,
address: BlockManagerId,
e: Throwable)
extends FetchResult
현재 Blockmanager 에 대응 하 는 빠 른 상태 정보, 메모리 와 디스크 사용 상황 이 저장 되 어 있 습 니 다.