Hadoop 학습 노트: Spark 에 대하 여

8372 단어 Hadoopspark
스파크 는 빅 데이터 처리 에 사용 되 는 클 러 스 터 컴 퓨 팅 프레임 워 크 로, MapReduce 를 실행 엔진 으로 사용 하지 않 고 자체 분산 실행 환경 을 사용 해 클 러 스 터 에서 작업 을 수행 했다.Spark 는 YARN 에서 실행 할 수 있 고 Hadoop 파일 형식 과 저장 백 엔 드 (예: HDFS 등) 를 지원 하 는 Hadoop 과 긴밀 하 게 통합 되 어 있다.
Spark 는 작업 과 작업 사이 에 발생 하 는 대규모 작업 데이터 세트 를 메모리 에 저장 하고 MapReduce 보다 성능 이 한 단계 높 습 니 다.Spark 처리 모델 에서 가장 큰 이익 을 얻 은 것 은 교체 알고리즘 (데이터 세트 에 대해 특정한 함 수 를 중복 응용 하여 종료 조건 을 만족 시 킬 때 까지) 과 상호작용 분석 (사용자 가 데이터 세트 에 일련의 전용 탐색 적 조회 로 보 내 는 것) 이다.
Spark 소프트웨어 스 택
  • Spark Core 는 Spark 의 기본 기능 을 실현 하 는데 작업 스케줄 링, 메모리 관리, 오류 복구, 저장 시스템 과 의 상호작용 등 모듈 을 포함한다.탄성 분산 데이터 세트 (RDD) 에 대한 API 정의
  • Spark SQL 은 구조 화 된 데 이 터 를 조작 하 는 프로그램 패키지 로 SQL 이나 HQL 로 데 이 터 를 조회 할 수 있 으 며 Hive 표, Parquet, JSON 등 다양한 데이터 원본 을 지원 합 니 다.
  • Spark Streaming 은 실시 간 데 이 터 를 흐름 식 으로 계산한다.
  • MLib, 기계 학습 기능 창고.
  • Graph X, 조작 도 라 이브 러 리, 병렬 도 계산 이 가능 합 니 다.
  • 클 러 스 터 관리자, Hadoop YARN, Apache Mesos, Spark 자체 스케줄 러 를 지원 합 니 다.

  • 스파크 핵심
  • 모든 Spark 응용 프로그램 은 하나의 드라이브 프로그램 (driver program) 으로 클 러 스 터 의 각종 병행 작업 을 시작 합 니 다.
  • 드라이브 프로그램 은 Spark Context 대상 을 통 해 Spark 에 접근 합 니 다. 이 대상 은 계산 클 러 스 터 에 대한 연결 을 대표 합 니 다.
  • 드라이브 프로그램 은 일반적으로 여러 개의 실행 기 (executor) 노드 를 관리 해 야 한다.

  • 단어 주파수 통계
    object WordCount {
      def main(args: Array[String]) {
        val inputFile = args(0)
        val outputFile = args(1)
    
        val conf = new SparkConf().setAppName("wordCount")
        val sc = new SparkContext(conf)
    
        val input =  sc.textFile(inputFile)
        val words = input.flatMap(line => line.split(" "))
        val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
        counts.saveAsTextFile(outputFile)
      }
    }
    

    구축 및 실행
    sbt clean package spark-submit --class WordCount target/scala-2.11/scala_demo_2.11-0.1.jar file/1.txt file/wordcounts
    RDD 프로 그래 밍
    RDD 는 가 변 적 이지 않 은 분포 식 대상 집합 으로 각 RDD 는 여러 개의 파 티 션 으로 나 뉘 어 있 으 며 파 티 션 은 클 러 스 터 의 서로 다른 노드 에서 실 행 됩 니 다.
    RDD 를 만 드 는 두 가지 방법:
  • 외부 데이터 세트 textFile 등 을 읽 습 니 다.
  • 드라이버 에서 드라이브 프로그램의 대상 집합 을 나 누 어 줍 니 다. parallelize

  • RDD 가 지원 하 는 두 가지 동작:
  • 전환 작업: 하나의 RDD 에서 filter 와 같은 새로운 RDD 를 생 성 합 니 다.
  • 행동 조작: RDD 에 대한 결 과 를 계산 합 니 다. 예 를 들 어 count.
  • 차이 점: 새로운 RDD 를 정의 할 때 Spark 는 타성 계산 을 하고 첫 번 째 행동 조작 을 실행 할 때 만 진정한 계산 을 할 수 있 습 니 다.

  • RDD 는 기본적으로 영구 화 되 지 않 습 니 다. 여러 동작 에서 같은 RDD 를 다시 사용 하려 면 같은 RDD 를 여러 번 계산 하지 않도록 RDD.persist() Spark 를 사용 하여 이 RDD 를 캐 시 할 수 있 습 니 다.
    상용 전환 조작
    함수 명
    목적.
    실례
    map()
    모든 요소 에 함 수 를 적용 합 니 다.
    rdd.map(x => x + 1)
    flatMap()
    함수 와 모든 요 소 를 적용 합 니 다. 모든 반환 값 은 교체 기 입 니 다.
    rdd.flatMap(x => x.to(3))
    filter()
    조건 을 충족 시키다
    distinct()
    무 거 운 것 을 제거 하 다
    union()
    RDD 의 모든 요 소 를 포함 합 니 다.
    rdd.union(other)
    intersection()
    아울러
    rdd.intersection(other)
    subtract()
    첫 번 째 에 존재 하고 두 번 째 에 존재 하지 않 는 다.
    위 와 같다
    cartesian()
    피리 칼 적
    위 와 같다
    상용 행동 조작
    함수 명
    목적.
    collect()
    RDD 의 모든 요 소 를 되 돌려 줍 니 다.
    count()
    원소 개수
    countByValue()
    각 원소 출현 횟수
    take(num)
    num 개 요소 되 돌리 기
    top(num)
    맨 앞 에 있 는 num 개 요 소 를 되 돌려 줍 니 다.
    takerdered(num)(ordering)
    RDD 에서 제 공 된 순서대로 맨 앞 에 있 는 num 요 소 를 되 돌려 줍 니 다.
    reduce(func)
    sum 과 같은 RDD 의 모든 메타 데 이 터 를 병렬 통합 합 니 다.
    fold(zero)(func)
    reduce 와 마찬가지 로 초기 값 을 제공 해 야 합 니 다.
    aggregate(zeroValue)(seqOp, combOp)
    reduce 와 비슷 합 니 다. 보통 다른 유형의 함 수 를 되 돌려 줍 니 다.
    foreach(func)
    RDD 의 각 요소 에 주어진 함 수 를 사용 하고 결과 가 되 돌아 오지 않 습 니 다.
    키 쌍 조작
    키 쌍 을 포함 하 는 RDD 를 Pair RDD 라 고 하고 Pair RDD 는 reduceByKey () 방법 을 제공 하 며 각 키 에 대응 하 는 데 이 터 를 각각 요약 할 수 있 습 니 다.join () 은 두 개의 RDD 중 키 가 같은 요 소 를 조합 하여 하나의 RDD 로 합 칠 수 있다.
    맵 으로 일반 RDD 를 Pair RDD 로 변환 할 수 있 습 니 다.
    val pairs = linex.map(x => (x.split(" ")(0), x))
    

    Pair RDD 의 전환 작업
    함수.
    목적.
    reduceByKey(func)
    같은 키 를 가 진 값 을 합 칩 니 다.
    groupByKey()
    같은 키 를 가 진 값 을 그룹 으로 나 눕 니 다.
    combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
    서로 다른 반환 형식 을 사용 하여 같은 키 를 가 진 값 을 합 칩 니 다.
    mapValues(func)
    pair RDD 의 모든 값 에 키 를 바 꾸 지 않 고 함 수 를 적용 합 니 다.
    flatMapValues(func)
    pair RDD 의 모든 값 에 리 턴 교체 기 함 수 를 적용 한 다음 돌아 오 는 모든 요소 에 원 키 에 대응 하 는 키 쌍 기록 을 생 성 합 니 다.기호 화
    keys()
    키 만 포 함 된 RDD 되 돌리 기
    values()
    값 만 포 함 된 RDD 되 돌리 기
    sortByKey()
    키 에 따라 정렬 된 RDD 되 돌리 기
    pair RDD 두 개 에 대한 전환 작업
    함수.
    목적.
    subtractByKey
    RDD 의 키 와 other RDD 의 키 가 같은 요 소 를 삭제 합 니 다.
    join
    RDD 두 개 를 내부 연결 합 니 다.
    rightOuterJoin
    오른쪽 외부 연결, RDD 의 키 가 존재 해 야 합 니 다.
    leftOuterJoin
    왼쪽 외부 연결, other 의 키 가 존재 해 야 합 니 다.
    cogroup
    두 RDD 에서 같은 키 를 가 진 데 이 터 를 묶 습 니 다.
    combineByKey 상세 설명
    combineByKey 는 Spark 에서 비교적 핵심 적 인 고급 함수 로 다른 고급 키 값 은 함수 밑 에 모두 이 를 통 해 이 루어 집 니 다.groupByKey, reduceByKey 등
    정의:
    def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)
    

    인자:
  • createCombiner: V => C 각 파 티 션 에서 새로운 요 소 를 만 났 을 때 그 키 에 대응 하 는 누적 기의 초기 값
  • 을 만 듭 니 다.
  • mergeValue: (C, V) => C 이미 만난 키 에 대해 키 누산기 에 대응 하 는 현재 값 을 이 새 값 과 합 칩 니 다
  • mergeCombiners: (C, C) => C, 서로 다른 구역 의 같은 키 를 합 친 누산기
  • 흐름 설명도
    인 스 턴 스 코드
    val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
    val d1 = sc.parallelize(initialScores)
    type MVType = (Int, Double) //        (     ,  )
    d1.combineByKey(
      score => (1, score),
      (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
      (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
    ).map { case (name, (num, socre)) => (name, socre / num) }.collect
    

    매개 변수 의미 의 해석
  • score => (1, score) 우 리 는 점 수 를 매개 변수 로 하고 추 가 된 원조 유형 을 되 돌려 주 었 다."Fred" 를 열 로 현재 점 수 는 88.0 = > (1, 88.0) 1 로 현재 과목 의 계수 기 를 표시 합 니 다. 이 때 는 한 과목 만 있 습 니 다
  • .
  • (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore) 여기 있 는 c1 은 createCombiner 초기 화 된 (1, 88.0) 입 니 다.한 구역 내 에서 우 리 는 또 'Fred' 의 새로운 점수 91.0 을 만 났 다.물론 우 리 는 이전의 과목 점수 와 현재 의 점 수 를 합치 면 c1._2 + newScore, 그리고 과목 계산 기 를 1 즉 c1._1 + 1
  • (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) 'Fred' 가 학 패 일 수 있 으 니 주의 하 세 요. 그 가 선택 한 과목 이 너무 많아 서 서로 다른 구역 에 분 산 될 수 있 습 니 다.모든 분 구 를 진행 mergeValue 한 후, 그 다음은 분 구 간 을 합병 하 였 으 며, 분 구 간 과목 수 와 과목 수 를 더 하면 총 점 과 총 과목 수
  • 를 얻 었 다.
    실행 결 과 는 다음 과 같다. res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))데이터 파 티 션
    데이터 집합 이 노드 간 의 파 티 션 을 제어 합 니 다.
    Spark 가 제공 하 는 파 티 션 방식:
  • 해시 구역
  • 범위 구역 구분
  • 데이터 세트 가 어떻게 구분 되 는 지 모 르 면 두 데이터 세트 를 join 작업 을 할 때 두 데이터 세트 의 모든 요소 의 해시 값 을 구하 고 해시 값 과 같은 기록 을 네트워크 를 통 해 같은 기계 에 전송 한 다음 에 이 기계 에서 같은 키 로 기록 한 양해 작업 을 한다.한 측의 파 티 션 방식 을 알 게 되면 다른 측 이 대응 하 는 키 의 데 이 터 를 파 티 션 을 아 는 한 측의 키 에 대응 하 는 파 티 션 으로 전송 하여 많은 네트워크 통신 시간 을 절약 할 수 있다.사용 하 는 RDD 에 대해 지속 적 인 작업 을 해 야 합 니 다.
    파 티 션 에서 이득 을 보 는 작업
    데 이 터 를 키 의 크로스 노드 에 따라 혼합 하 는 것 을 도입 해 야 이득 을 볼 수 있다.
    예 를 들 어 cogroup (), groupWith (), join (), leftOuterJoin (), rightOuterJoin (), groupByKey (), reduceByKey (), combineByKey (), loopup ()
    파 티 션 방식 에 영향 을 주 는 작업
    파 티 션 을 설정 합 니 다: cogroup (), groupWith (), join (), left OuterJoin (), rightOuterJoin (), groupByKey (), reduceByKey (), combineByKey (), partition By (), sort (), mapValues () (부모 RDD 에 파 티 션 이 있 으 면), flatMapValues () (부모 RDD 에 파 티 션 이 있 으 면)
    데이터 읽 기와 저장
    Spark 는 다양한 입 출력 원 을 지원 합 니 다. 일부 원인 은 Spark 가 Hadoop 생태 권 을 기반 으로 구축 되 었 기 때 문 입 니 다. Hadoop MapReduce 에서 사용 하 는 InputFormat 과 OutputFormat 인 터 페 이 스 를 통 해 데 이 터 를 방문 할 수 있 고 대부분 흔히 볼 수 있 는 파일 형식 과 저장 시스템 이 이 인 터 페 이 스 를 지원 합 니 다.
    지원 하 는 파일 형식: 텍스트 파일, JSON, CSV, SequenceFiles, Protocol buffers, 대상 파일
    지원 하 는 파일 시스템: 로 컬 / "일반" 파일 시스템, Amazon S3, HDFS

    좋은 웹페이지 즐겨찾기