DT 빅 데이터 드 림 웍 스 Spark 맞 춤 형 반 노트 (013)

8981 단어
Spark Streaming 소스 코드 판독 드라이버 오류 안전성
개술
Driver 는 세 가지 측면 을 잘못 사용 합 니 다.
1. 데이터 차원: Received BlockTracker 는 Spark Streaming 애플 리 케 이 션 의 메타 데 이 터 를 관리 합 니 다.
2. 논리 적 차원: DStream
3. 작업 스케줄 링 차원 에서 JobGenerator 는 Job 스케줄 링 차원 에서 구체 적 으로 어느 정도 까지 스케줄 링 을 했 는 지 모니터링 한다.
소스 코드 분석
ReceivedBlockTracker 먼저 들 어가 기 (ReceivedBlockTracker. scala 55 - 71)
/**
 * Class that keep track of all the received blocks, and allocate them to batches
 * when required. All actions taken by this class can be saved to a write ahead log
 * (if a checkpoint directory has been provided), so that the state of the tracker
 * (received blocks and block-to-batch allocations) can be recovered after driver failure.
 *
 * Note that when any instance of this class is created with a checkpoint directory,
 * it will try reading events from logs in the directory.
 */
private[streaming] class ReceivedBlockTracker(
    conf: SparkConf,
    hadoopConf: Configuration,
    streamIds: Seq[Int],
    clock: Clock,
    recoverFromWriteAheadLog: Boolean,
    checkpointDirOption: Option[String])
  extends Logging

그 중에서 receover FromWrite Ahead Log 는 WAL 을 사용 한 명확 한 증거 이다.
ReceivedBlockTracker                。
 
  
    ReceiverBlockTracker            
ReceiverBlockTracker.addBlock     (ReceiverBlockTracker.scala 87-106)
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  try {
    val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
    if (writeResult) {
      synchronized {
        getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
    case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
      false
  }
}
ReceiverBlockTracker              WAL    ,             。

      allocateBlocksToBatch    (receiverBlockTracker.scala 112-134)
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    }.toMap
    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    } else {
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  } else {
    // This situation occurs when:
    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
    // possibly processed batch job or half-processed batch job need to be processed again,
    // so the batchTime will be equal to lastAllocatedBatchTime.
    // 2. Slow checkpointing makes recovered batch time older than WAL recovered
    // lastAllocatedBatchTime.
    // This situation will only occurs in recovery time.
    logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
  }
}
  writeToLog    WAL  。
 allocatedBlocks              ,     job  。
      Job         WAL,  job     ,             

    

좋은 웹페이지 즐겨찾기