DT 빅 데이터 드 림 웍 스 Spark 맞 춤 형 반 노트 (013)
개술
Driver 는 세 가지 측면 을 잘못 사용 합 니 다.
1. 데이터 차원: Received BlockTracker 는 Spark Streaming 애플 리 케 이 션 의 메타 데 이 터 를 관리 합 니 다.
2. 논리 적 차원: DStream
3. 작업 스케줄 링 차원 에서 JobGenerator 는 Job 스케줄 링 차원 에서 구체 적 으로 어느 정도 까지 스케줄 링 을 했 는 지 모니터링 한다.
소스 코드 분석
ReceivedBlockTracker 먼저 들 어가 기 (ReceivedBlockTracker. scala 55 - 71)
/**
* Class that keep track of all the received blocks, and allocate them to batches
* when required. All actions taken by this class can be saved to a write ahead log
* (if a checkpoint directory has been provided), so that the state of the tracker
* (received blocks and block-to-batch allocations) can be recovered after driver failure.
*
* Note that when any instance of this class is created with a checkpoint directory,
* it will try reading events from logs in the directory.
*/
private[streaming] class ReceivedBlockTracker(
conf: SparkConf,
hadoopConf: Configuration,
streamIds: Seq[Int],
clock: Clock,
recoverFromWriteAheadLog: Boolean,
checkpointDirOption: Option[String])
extends Logging
그 중에서 receover FromWrite Ahead Log 는 WAL 을 사용 한 명확 한 증거 이다.
ReceivedBlockTracker 。
ReceiverBlockTracker
ReceiverBlockTracker.addBlock (ReceiverBlockTracker.scala 87-106)
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) if (writeResult) { synchronized { getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") } else { logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") } writeResult } catch { case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false } }
ReceiverBlockTracker WAL , 。
allocateBlocksToBatch (receiverBlockTracker.scala 112-134)
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } }
writeToLog WAL 。
allocatedBlocks , job 。
Job WAL, job ,
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.