Spark의 RDD 탄력성

14799 단어
RDD는 신축성 분포식 데이터 집합으로서 그 신축성은 다음과 같은 7개 방면에 구체적으로 나타난다.
1. 메모리와 디스크 데이터 저장소의 자동 전환
Spark는 우선적으로 데이터를 메모리에 넣습니다. 만약 메모리가 정말 놓이지 않으면 디스크에 넣습니다. 메모리가 놓인 데이터를 계산할 수 있을 뿐만 아니라 메모리가 놓이지 않는 데이터도 계산할 수 있습니다.실제 데이터가 메모리보다 크면 데이터 배치 전략과 최적화 알고리즘을 고려해야 한다.응용 프로그램의 메모리가 부족할 때, 스파크 응용 프로그램은 데이터를 자동으로 메모리 저장소에서 디스크 저장소로 전환시켜 효율적인 운행을 보장한다.
2. Lineage(혈통) 기반의 효율적인 오류 처리 메커니즘
Lineage는 Spark RDD의 의존 관계를 바탕으로 이루어진 것(의존은 좁은 의존과 넓은 의존 두 가지 형태로 나뉘어진다)으로, 각 조작은 부모 조작만 관련되고 각 조각의 데이터 간에 서로 영향을 주지 않으며, 오류가 발생할 경우 단일 스플릿의 특정 부분만 복원하면 된다.일반적인 용량 오류는 두 가지 방식이 있는데 하나는 데이터 검사점이다.또 하나는 데이터를 기록하는 업데이트다.데이터 체크포인트의 기본적인 작업 방식은 데이터 센터의 네트워크를 통해 서로 다른 기계를 연결하고 매번 조작할 때 데이터 집합을 복제하는 것이다. 이것은 매번 복제하는 것과 같다. 복제는 네트워크를 통해 전송해야 한다. 네트워크 대역폭은 분포식 병목이고 저장 자원에도 큰 소모가 된다.기록 데이터 업데이트는 매번 데이터가 변할 때마다 기록하는 것이다. 이런 방식은 데이터를 다시 복제할 필요가 없지만 비교적 복잡하고 성능을 소모한다.Spark의 RDD는 데이터 업데이트를 기록하는 방식으로 왜 효율적입니까?① RDD는 변할 수 없고 레이지이기 때문이다.② RDD의 쓰기 작업은 굵은 입자입니다.그러나 RDD 읽기 작업은 굵은 입도일 수도 있고 가는 입도일 수도 있다.
3. Task 가 실패하면 특정 횟수의 재시도가 자동으로 수행됩니다.
기본 재시도 횟수는 4회입니다.TaskSchedulerImpl의 소스는 다음과 같습니다.
private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging
{
  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))

  val conf = sc.conf

TaskScheduler Impl은 밑바닥의 작업 스케줄링 인터페이스인 TaskScheduler의 실현이다. 이 Schedulers들은 모든 Stage의DAGscheduler에서 TaskSet을 가져와 실행하고 고장이 있는지 시도한다.DAGscheduler는 모든 Job의 Stage의 DAG를 계산한 다음 Stage를 제출하고 TaskSets 형식으로 밑바닥의 TaskScheduler 스케줄링을 시작하여 그룹에서 실행합니다.
4. Stage가 실패하면 특정 횟수의 재시도가 자동으로 수행됩니다.
이렇게 하면 Stage 대상은 여러 개의 StageInfo(SparkListeners가 감청한 Stage의 정보를 저장하고 Stage 정보를 Listeners나 웹 UI에 전달할 수 있다)를 추적할 수 있다.기본 재시도 횟수는 4회이며 계산이 실패한 단계를 직접 실행하고 실패한 데이터 슬라이스만 계산할 수 있습니다. Stage의 소스는 다음과 같습니다.
private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to.        Stage */
  val jobIds = new HashSet[Int]

  val pendingPartitions = new HashSet[Int]

  /** The ID to use for the next new attempt for this stage.   Stage     attempt ID */
  private var nextAttemptId: Int = 0

  val name: String = callSite.shortForm
  val details: String = callSite.longForm

  /**
   * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   *    [StageInfo] Object  ,      ,  attempts        ,  DAGScheduler  
   * StageInfo  SparkListeners      (             )
   */
  private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

  /**
   * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
   * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   *   Stage attempt IDs            ,      ,            
   *      attempt,          ,     stage       (SPARK-5945)
   */
  private val fetchFailedAttemptIds = new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    fetchFailedAttemptIds.clear()
  }

  /**
   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
   *                    stage
   * This method updates the running set of failed stage attempts and returns
   * true if the number of failures exceeds the allowable number of failures.
   *               ,       stage attempts       
   */
  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
    fetchFailedAttemptIds.add(stageAttemptId)
    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  //  stage       attempt
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    val metrics = new TaskMetrics
    metrics.register(rdd.sparkContext)
    _latestInfo = StageInfo.fromStage(
      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
    nextAttemptId += 1
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  //     stage    stageinfo
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  //                 
  def findMissingPartitions(): Seq[Int]
}

private[scheduler] object Stage {
  // The number of consecutive failures allowed before a stage is aborted
  //     stage        
  val MAX_CONSECUTIVE_FETCH_FAILURES = 4
}

Stage는 Spark Job이 동일한 논리 기능과 병렬 컴퓨팅 작업을 실행하는 기본 단위입니다.Stage의 모든 임무는 같은 Shuffle에 의존한다. 모든 DAG 임무는 DAGscheduler를 통해 Stage의 경계에서 Shuffle가 발생하여 Stage를 형성하고 DAGscheduler가 이 단계의 토폴로지 순서를 운행한다.모든 Stage는 ShuffleMapStage일 수 있습니다. ShuffleMapStage라면 출력 노드(nodes)의 출력 파일 구역을 추적합니다. 작업 결과는 다른 Stage(s)를 입력하거나 ResultStage를 입력하십시오. ResultStage를 입력하면 이 ResultStage의 작업은 이 RDDD에서 이 Spark Action을 계산하는 함수(예를 들어count(),save() 등을 직접 실행합니다.shuffleDep 등 필드에서 Stage를 설명하고 outputLocs와numAvailableOutputs와 같은 변수를 생성하여 맵 출력을 추적할 준비를 합니다.모든 Stage에firstjobid가 있습니다. 첫 번째로 Stage를 제출한 Job을 확인하고 FIFO 스케줄링을 사용할 때 앞의 Job를 먼저 계산하거나 빠르게 복구합니다. (실패할 때) 
ShuffleMap Stage는 DAG가 데이터를 생성하여 Shuffle을 진행하는 중간 단계로 매번 Shuffle 작업을 하기 전에 여러 개의 Pipelined 작업을 포함할 수 있다. ResultStage 단계에서 함수를 포획하여 RDD의 구역에서 Action 산자 계산 결과를 실행한다. 일부 Stage는 RDD의 모든 구역에서 실행되지 않는다. 예를 들어 first(),lookup() 등이다.SparkListener는 Spark 스케줄러의 이벤트 수신 인터페이스입니다.이 인터페이스는 Spark 버전에 따라 달라질 수 있습니다.
5.checkpoint와persist(검사점과 지속화), 주동적이거나 수동적으로 촉발할 수 있음
checkpoint는 RDD에 대한 표시로 일련의 파일을 생성하고 모든 부모 의존이 삭제되며 전체 의존(Lineage)의 종점입니다.checkpoint도 레이지급이야.persist 이후 RDD가 작업할 때 모든 작업 노드는 계산된 섹션 결과를 메모리나 디스크에 저장합니다. 다음에 같은 RDD를 다른 Action 계산을 하면 다시 사용할 수 있습니다.
사용자는 드라이브 프로그램과만 상호작용하기 때문에 RDD의 캐치(cache) 방법으로만 캐치 사용자가 볼 수 있는 RDD를 제거할 수 있다.보이는 것은 Transformation 산자 처리를 거쳐 생성된 RDD를 가리키며, Transformation 산자 중 스파크가 스스로 생성한 일부 RDD는 사용자가 직접 캐치할 수 없다.예를 들어 ReduceByKey () 에서 생성되는 ShuffleRDD,MapPartitionsRDD는 사용자가 직접 캐치할 수 없습니다.Driver Program에서 RDD.를 설정합니다.cache() 후 시스템은cache를 어떻게 진행합니까?우선 RDD의Partition을 계산하기 전에Partition이cache에 걸렸는지 아닌지를 판단하고,cache에 걸렸으면 Partition을 계산한 다음,cache를 메모리에 넣는다.cache는 memory를 사용할 수 있습니다. HDFS 디스크에 쓰면 checkpoint를 검사해야 합니다.RDD. 호출cache () 후 RDD는persistRDD로 변했고 Storage Level은MEMORYOnly, persist RDD는 드라이브에게 자신이 persist에 걸려야 한다고 알려준다.RDD가 호출됩니다.iterator(). RDD.scala의 iterator()의 소스는 다음과 같습니다.
  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ''not'' be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   * RDD     ,          ,     。           ,           RDD
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

RDD.iterator () 가 호출되었을 때, 이 RDD의 어떤Partition을 계산할 때, 우선cache Manager에 가서 BlockId를 가져오고, Block Manager에서 이Partition이 checkpoint에 일치하는지 여부를 계산합니다. 만약 그렇다면 이Partition을 계산하지 않고, 이Partition의 모든records를 checkpoint에서 읽고Array Buffer에 넣습니다.checkpoint가 없으면 Partition을 계산한 다음 모든 Records를cache에 넣습니다.전체적으로 RDD가 중복 사용될 때(너무 크면 안 된다) RDD는 캐치가 필요하다.스파크는 각 노드 캐시의 사용 상황을 자동으로 모니터링하고 최근 최소 사용 원칙을 이용하여 오래된 데이터를 삭제한다.RDD를 수동으로 제거하려면 RDD를 사용하십시오.unpersist () 방법입니다.  
또한 각각의 영구적인 RDD는 서로 다른 스토리지 레벨로 저장할 수 있습니다.예를 들어 디스크에 집합을 영구화하고, 집합을 서열화된 자바 대상으로 메모리에 영구화하며, 노드 간에 집합을 복사하거나, Alluxio에 저장할 수 있다.StorageLevel 객체를 persist()에 전달하여 이러한 스토리지 수준을 설정할 수 있습니다.cache() 메서드는 기본 스토리지 수준 - Storage Level을 사용합니다.MEMORY_ONLY.RDD는useDisk,useMemory,useOffHeap,deserialized,replication 5개 파라미터의 조합에 따라 자주 사용하는 12가지 기본 저장소를 제공하고 완전한 저장소 등급은 다음과 같다.StorageLevel.scala의 소스는 다음과 같습니다.
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

Storage Level은 RDD를 메모리로 사용하는지, External BlockStore 저장소를 사용하는지, RDDD가 메모리나 External BlockStore에서 벗어났는지, RDD를 버렸는지, 메모리에 있는 데이터의 서열화 형식을 보존했는지, 여러 노드의 RDD 구역을 복제했는지 여부를 제어하는 표지이다.그리고 org.apache.spark.storage.Storage Level은 단일 실례(singleton) 대상으로 정적 상수와 자주 사용하는 저장 수준을 포함하고 singleton 대상 공장 방법인 Storage Level(...)맞춤형 스토리지 레벨을 만듭니다.
Spark의 여러 스토리지 수준은 메모리 사용률과 CPU 사용률 간의 차이점을 의미합니다.다음 절차를 통해 적당한 저장 레벨을 선택하는 것을 추천합니다: ① RDD가 기본 저장 레벨(MEMORY ONLY)에 적합하면 기본 저장 레벨을 선택하십시오.이것은 CPU 사용률이 가장 높은 옵션이기 때문에 RDD의 조작을 최대한 빨리 할 수 있다.② 기본 레벨이 적합하지 않으면 MEMORYONLY_SER.더 빠른 서열화 라이브러리를 선택하여 대상의 공간 사용률을 높일 수 있지만, 상당히 빠르게 접근할 수 있습니다.③ 산자가 RDD를 계산하는 데 비용이 많이 들거나 대량의 데이터를 필터해야 하기 때문에 RDD를 디스크에 저장하지 마라. 그렇지 않으면 하나의 구역을 반복해서 계산하면 디스크에서 데이터를 읽는 것과 같이 느리다.④ 오류를 빨리 복구하고자 한다면 Replicated 메모리 메커니즘을 이용하여 모든 메모리 레벨은 Replicated를 통해 잃어버린 데이터를 계산하여 완전한 오류를 지원할 수 있다.또한 Replicated의 데이터는 RDD에서 작업을 계속 실행할 수 있으며, 잃어버린 데이터를 중복 계산할 필요가 없다.많은 메모리를 보유한 환경 또는 다중 어플리케이션 환경에서 OffHeap(개체를 무더기에서 분리하여 서열화한 후 디스크에 저장하는 것과 같지만 RAM 메모리에 저장됩니다. Off Heap 개체는 이 상태에서 직접 사용할 수 없으며 서열화 및 반서열화를 해야 합니다. 서열화와 반서열화는 성능에 영향을 줄 수 있으므로 Off Heap 무더기 밖의 메모리는 GC를 사용할 필요가 없습니다.)Off_Heap의 장점: OffHeap은 여러 수행자가 공유하는 Alluxio의 동일한 메모리 풀을 실행하여 GC를 현저하게 감소시킵니다.단일 Executor가 충돌하면 캐시된 데이터도 손실되지 않습니다.
6. 데이터 스케줄링 유연성, DAGscheduler, TASKScheduler는 자원 관리와 무관
Spark는 실행 모델을 일반적인 유방향 무환도 계획 (DAG) 으로 추상화합니다. 이것은 다중 Stage의 작업을 직렬로 연결하거나 병렬로 실행할 수 있으며, Stage의 중간 결과를 HDFS로 출력할 필요가 없습니다. 노드 운행 고장이 발생할 때 다른 사용 가능한 노드가 이 고장 노드를 대체하여 운행할 수 있습니다.
7. 데이터 슬라이스의 높은 탄력성(coalesce)
Spark에서 데이터를 조각화할 때 기본적으로 메모리에 데이터를 넣고, 안에 저장할 수 없으면 일부분은 디스크에 저장합니다.
  RDD.scala의 coalesce 산자 코드는 다음과 같습니다.
  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      //        ,             
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          // key     key  ,HashPartitioner             
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      //     shuffle  ,               
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

예를 들어 계산하는 과정에서 많은 데이터 파편이 발생할 수 있는데 이때 하나의 Partition이 발생하면 매우 작을 수 있다. 만약에 하나의 Partition이 매우 작으면 매번 하나의 라인을 소모하여 처리한다. 이때 그의 처리 효율을 떨어뜨릴 수 있기 때문에 많은 작은 Partition을 비교적 큰 Partition으로 합쳐서 처리하는 것을 고려해야 효율을 높일 수 있다.또한 메모리가 그렇게 많지 않고 각 Partition의 데이터 블록이 비교적 클 수 있으므로 Partition을 더 작은 데이터 블록으로 바꾸어 스파크가 더 많은 횟수를 처리하지만 OOM이 나타나지 않도록 고려해야 한다.  

좋은 웹페이지 즐겨찾기