spark RDD 상용 함수 / 조작

28657 단어 spark빅 데이터
spark RDD 상용 함수 / 조작
글 의 코드 는 모두 spark - shell 에서 실 행 될 수 있 습 니 다.
transformations
map(func)
집합 내의 모든 요 소 는 function 을 통 해 새로운 요소 로 매 핑 됩 니 다.
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.map( _ + 1)

모든 transformation 작업 에 대해 생 성 된 것 은 새로운 RDD (여기 가 resultRdd) 입 니 다. 실제 연산 을 하지 않 고 RDD 에 대해 action 작업 을 할 때 만 실제 계산 하고 결 과 를 얻 을 수 있 습 니 다.
scala> resultRdd.collect
res3: Array[Int] = Array(2, 3, 4, 5)

다음 transformation 작업 은 동일 합 니 다.
filter(func) func 을 통 해 집합 요 소 를 걸 러 내 고 func 의 반환 값 은 Boolean 유형 이 어야 합 니 다.
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.filter( _ > 1)
scala> resultRdd.collect
res4: Array[Int] = Array(2, 3, 4)

flatmap(func) func 을 통 해 집합 안의 모든 요 소 를 하나의 서열 로 매 핑 합 니 다 (구체 적 으로 TraversableOnce [?] 입 니 다. 여 기 는 이 유형 을 상관 하지 않 아 도 됩 니 다. spark 는 스스로 암시 적 으로 전환 하고 일반적인 순서 로 교체 할 수 있 는 서열 이 모두 가능 합 니 다).말하자면 이해 하기 어 려 울 수도 있 으 니 예 를 들 어 보 자.아니면 [1,2,3,4] 입 니 다. 만약 에 func 가 이렇다 고 가정 합 니 다. x => Array(x+0.1, x+0.2) 즉, 하나의 순서 로 돌아 가 는 것 입 니 다. flatMap 절 차 는 모든 요소 에 대해 func 를 먼저 실행 한 것 으로 볼 수 있 습 니 다.
[(1.1, 1.2), (2.1, 2.2), (3.1, 3.2), (4.1, 4.2)]

마지막 으로 모든 서열 을 평평 하 게 펼 치면 다음 과 같은 것 을 얻 을 수 있 습 니 다.
[1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2]

코드 형식:
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.flatMap( x => Array(x+0.1,x+0.2) )
scala> resultRdd.collect
res8: Array[Double] = Array(1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2)

mapPartitions(func)
맵 과 유사 하지만 RDD 의 각 파 티 션 에서 각각 실 행 됩 니 다. 한 파 티 션 에 있 는 요 소 를 새로운 파 티 션 으로 표시 하고 마지막 으로 모든 새 파 티 션 을 합 쳐 새로운 RDD 로 만 드 는 것 으로 이해 할 수 있 습 니 다.T 형식의 RDD 에 이 함 수 를 사용 할 때 func 의 서명 은 Iterator => Iterator 이 어야 합 니 다.
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.mapPartitions(iter => iter.map(_+1)) //       iter.map RDD map   ,  scala          
scala> resultRdd.collect
res11: Array[Int] = Array(2, 3, 4, 5)

mapPartitionsWithIndex(func)
유사 mapPartitions(func) 하지만 하나의 파 티 션 의 색인 번호 정 보 를 더 제공 하기 때문에 요소 T 유형의 RDD, func 의 유형 서명 은 (Int, Iterator) => Iterator
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.mapPartitionsWithIndex( (index, iter) => iter.map( x => (x,s"  $index")) ) 
scala> resultRdd.collect
res14: Array[(Int, String)] = Array((1,  0), (2,  1), (3,  2), (4,  3))

각 분 구 의 상황 을 볼 수 있 는데 여기 서 군집 구조 가 다 를 때 결과 도 다 를 수 있 는데 이것 은 RDD 가 어떤 분 구 에 분포 하 느 냐 에 달 려 있다.
sample(withReplacement, fraction, seed)
샘플링 함수, 일정한 확률 로 데 이 터 를 샘플링 합 니 다.
  • with Replacement: 첫 번 째 매개 변 수 는 샘플링 이 끝 난 후에 샘플 을 다시 넣 을 지 여 부 를 결정 합 니 다. 추첨 이 끝 난 후에 다시 서명 을 뒤에 있 는 사람 에 게 남 겨 두 는 것 과 같 습 니 다.
  • fraction: 확률 로 이해 하 는 것 이 좋 습 니 다. 즉, 모든 요소 가 fraction 의 확률 로 추출 된 것 입 니 다.이 답안 은 비교적 잘 해석 되 었 다
  • seed: 난수 생 성기 의 씨앗
  • val a = Array(1,2,3,4)
    val pa = sc.parallelize(a)
    val resultRdd = pa.sample(true,0.5)
    resultRdd.collect

    여러 번 실 행 된 결과 가 다 릅 니 다:
    scala> pa.sample(false,0.5).collect
    res52: Array[Int] = Array(1, 3)
    
    scala> pa.sample(false,0.5).collect
    res53: Array[Int] = Array(1, 2)
    withReplacementtrue, 즉 돌려 놓 을 수 있 는 상황 으로 설정 하면 중복 요소 가 발생 할 수 있다.
    scala> pa.sample(true,0.5).collect
    res60: Array[Int] = Array(1, 2, 2)
    
    scala> pa.sample(true,0.5).collect
    res61: Array[Int] = Array(3, 4)

    union(otherDataset)
    병집 을 구하 다.
    예시:
    val pa = sc.parallelize( Array(1,2))
    val pb = sc.parallelize(Array(3,4))
    pa.union(pb).collect

    결과:
    res63: Array[Int] = Array(1, 2, 34

    intersection(otherDataset)
    교 집합 을 구하 다.
    예시:
    val pa = sc.parallelize( Array(1,2,3))
    val pb = sc.parallelize(Array(3,4,5))
    pa.intersection(pb).collect

    결과:
    res66: Array[Int] = Array(3)

    distinct([numTasks]))
    무 거 운 것 을 제거 하 다.numTasks 선택 할 수 있 는 매개 변수 로 몇 개의 작업 으로 분 배 된 것 을 나타 낸다.
    예시:
    val pa = sc.parallelize(Array(0,1,1,2,2,3))
    pa.distinct.collect

    결과:
    res92: Array[Int] = Array(0, 1, 2, 3)

    groupByKey([numTasks])
    패 킷 함수. -형식 (K, V) 의 데이터 세트 를 사용 하고 (K, Iterable) 형식의 데이터 세트 를 되 돌려 줍 니 다. - 그룹 을 나 눈 후에 sum,average 등 취 합 함 수 를 사용 하려 면 reduceByKey 또는 aggregateByKey 를 사용 하 는 것 이 좋 습 니 다. 더 좋 은 성능 을 얻 을 수 있 습 니 다. - 기본 병렬 도 는 부모 RDD 에 의존 하고 선택 가능 한 인자 numTasks 가 지정 한 병렬 작업 수량 을 입력 할 수 있 습 니 다.
    예시:
    val pa = sc.parallelize(Array( "a" -> 1,"a" ->2, "b" -> 3))
    pa.groupByKey.collect

    결과:
    res99: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))

    reduceByKey(func, [numTasks])
    key 에 따라 그룹 을 나 누 어 모 으 고 SQL 의 groupby 와 유사 한 후에 모 으 기 함 수 를 사용 합 니 다.
    (K, V) 형식의 데이터 세트 가 이 함 수 를 호출 하면 같은 (K, V) 형식의 데이터 세트 를 되 돌려 줍 니 다.
    예시:
    val pa = sc.parallelize(Array( "a" -> 1,"a" ->2,"a" ->3, "b" -> 4))
    pa.reduceByKey( (x,y) => x+y).collect

    결과:
    res110: Array[(String, Int)] = Array((a,6), (b,4))

    사실은 먼저 조 를 나 누고 각 조 내 에 reduce 를 하 는 것 이다.
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    먼저 조 를 나 눈 다음 에 각 조 에서 호출 aggregate 하여 접 으 면 reduceByKey 과 차이 점 은 접 은 결과 가 다른 유형 일 수 있다 는 것 이다.
    (K, V) 의 데이터 세트 를 호출 하고 (K, U) 형식의 데이터 세트 를 되 돌려 줍 니 다.
    참고 가능 aggregate.
    val pa = sc.parallelize(Array("a" -> 1,"a" ->2,"a" ->3, "b" -> 4))
    val r = pa.aggregateByKey("")(
      (x:String,y:Int) => x+y.toString*2,
      (x:String,y:String) => x+y
    )

    결과:
    scala> r.collect
    res135: Array[(String, String)] = Array((a,112233), (b,44))

    sortByKey([ascending], [numTasks])
    정렬 - ascending: 선택 할 수 있 습 니 다. 오름차 정렬 여부 - numTasks: 선택 할 수 있 습 니 다. 동시 작업 수량 은 (K, V) 의 데이터 세트 를 조작 하고 같은 (K, V) 형식의 데이터 세트 를 되 돌려 줍 니 다. 그 중에서 K 는 Ordered trait 를 실 현 했 습 니 다. 즉, 정렬 할 수 있 습 니 다.
    val pa = sc.parallelize(Array("b" -> 1,"d" ->2,"a" ->3, "c" -> 4))
    pa.sortByKey().collect

    결과:
    res139: Array[(String, Int)] = Array((a,3), (b,1), (c,4), (d,2))

    join(otherDataset, [numTasks])
    두 집합의 내 적 은 데이터베이스 안의 inner join 에 대응한다.
    (K, V) 와 (K, W) 형식의 데이터 세트 를 조작 하여 (K, (V, W) 형식의 데이터 세트 를 되 돌려 줍 니 다.
    또한 대외 적 인 지원 도 있다. leftOuterJoin, rightOuterJoin, fullOuterJoin 데이터베이스 에 있 는 해당 개념 과 같다.
    예 를 들 어:
    val pa = sc.parallelize(
      Array("a" -> 1,
            "b" -> 2, "b" -> 3)
    )
    val pb = sc.parallelize(
      Array("b" -> 2, "b" -> 3,
            "d" -> 4)
    )

    내 적:
    scala> pa.join(pb).collect
    res140: Array[(String, (Int, Int))] = Array((b,(2,2)), (b,(2,3)), (b,(3,2)), (b,(3,3)))

    왼쪽 외 적:
    scala> pa.leftOuterJoin(pb).collect
    res141: Array[(String, (Int, Option[Int]))] = Array((a,(1,None)), (b,(2,Some(2))), (b,(2,Some(3))), (b,(3,Some(2))), (b,(3,Some(3))))

    오른쪽 외 적:
    scala> pa.rightOuterJoin(pb).collect
    res142: Array[(String, (Option[Int], Int))] = Array((d,(None,4)), (b,(Some(2),2)), (b,(Some(2),3)), (b,(Some(3),2)), (b,(Some(3),3)))

    전 외적:
    scala> pa.fullOuterJoin(pb).collect
    res143: Array[(String, (Option[Int], Option[Int]))] = Array((d,(None,Some(4))), (a,(Some(1),None)), (b,(Some(2),Some(2))), (b,(Some(2),Some(3))), (b,(Some(3),Some(2))), (b,(Some(3),Some(3))))

    주의해 야 할 것 은 외적 결과 의 원소 가 Option 유형 으로 바 뀌 었 다 는 것 이다
    cogroup(otherDataset, [numTasks])
    두 데이터 세트 를 각각 그룹 으로 나 눈 다음 그룹 결 과 를 요소 로 연결 합 니 다.
    (K, V) 와 (K, W) 형식 을 조작 하고 (K, (Iterable, Iterable) 형식 을 되 돌려 줍 니 다. 별명 groupWith.
    val pa = sc.parallelize(Array("a" -> 1, "b" -> 2, "b" -> 3))
    val pb = sc.parallelize(Array("a" -> "a", "b" -> "b", "b" -> "c"))
    pa.cogroup(pb).collect

    결과:
    res152: Array[(String, (Iterable[Int], Iterable[String]))] = Array((a,(CompactBuffer(1),CompactBuffer(a))), (b,(CompactBuffer(2, 3),CompactBuffer(b, c))))

    cartesian(otherDataset)
    두 집합 에 대해 피리 칼 적 을 구하 고 T 와 U 유형 을 조작 하여 (T, U) 유형 을 되 돌려 줍 니 다.
    val pa = sc.parallelize(Array(1,2))
    val pb = sc.parallelize(Array(3,4))
    pa.cartesian(pb).collect

    결과:
    res156: Array[(Int, Int)] = Array((1,3), (1,4), (2,3), (2,4))

    pipe(command, [envVars])
    각 파 티 션 을 stdin 에 출력 한 다음 명령 을 실행 하고 마지막 으로 stdout 을 읽 어 모든 행동 요소 로 새로운 RDD 를 생 성 합 니 다.명령 을 수행 하 는 단 위 는 파 티 션 이지 요소 가 아 닙 니 다.
    / home / zeta / 디 렉 터 리 에 새 스 크 립 트 test. sh:
    #!/bin/bash
    while read LINE; do
       echo e${LINE}
    done

    그리고 spark - shell 에서 실행:
    val pa = sc.parallelize(Array(1,2,3,4,5,6))
    pa.pipe("/home/zeta/test.sh").collect

    결과:
    res181: Array[String] = Array(e1, e2, e3, e4, e5, e6)

    coalesce(numPartitions)
    RDD 의 파 티 션 수 를 numPartitions 개 로 줄 이 고 하나의 빅 데이터 세트 filter 작업 을 한 후에 파 티 션 수 를 줄 이면 효율 을 높 일 수 있다.
    repartition(numPartitions)
    RDD 내 모든 파 티 션 의 데 이 터 를 무 작위 로 흐 트 러 뜨리 고 균형 을 잡 으 세 요.
    actions
    collect()
    집합 내의 모든 요 소 를 배열 로 되 돌려 줍 니 다.
    count()
    데이터 세트 내의 요소 개 수 를 되 돌려 줍 니 다.
    foreach(func)
    데이터 세트 의 모든 요소 에 func 를 실행 합 니 다.
    몇 가지 주의: 1. 부작용: 분포 식 상황 에서 모든 executor 는 자신의 실행 공간 을 가지 기 때문에 변 수 는 전역 적 으로 공유 되 지 않 고 변수 에 대한 부작용 은 정의 되 지 않 은 행 위 를 초래 할 수 있 습 니 다.이 럴 때 는 Accumulator 를 사용 해 야 한다.2. 패 킷 닫 기: 패 킷 참조 문제 Understanding closures
    first()
    데이터 세트 의 첫 번 째 요 소 를 되 돌려 줍 니 다.
    take(n)
    데이터 세트 내 n 개의 요 소 를 배열 로 되 돌려 줍 니 다.
    reduce(func)
    reduce 작업 은 병렬 계산 에서 정확 한 결 과 를 얻 을 수 있 도록 이 함 수 는 교환 율 과 결합 율 을 만족 시 켜 야 합 니 다. 즉, 데이터 세트 와 이 연산 은 반드시 교환 군 을 구성 해 야 합 니 다.
    val pa = sc.parallelize(Array(1,2,3))
    pa.reduce( (x,y) => x+y )

    결과:
    res184: Int = 6

    aggregate(zeroValue)(seqOp, combOp)
    이 함수 의 작업 절 차 는 두 단계 로 볼 수 있 습 니 다. 1. RDD 의 각 파 티 션 에서 호출 seqOp 작업 을 접 습 니 다. fold 2. 호출 combOp 과 유사 합 니 다. 각 파 티 션 의 결 과 를 집합 합 니 다.
    그것 의 서명 은 다음 과 같다.
    def aggregate[U](zeroValue: U)(
      seqOp: (U, Int) => U,
      combOp: (U, U) => U
    )

    설명 하기 가 너무 귀찮아 서...한 마디 로 각 파 티 션 에서 fold 를 한 번 더 하고 결 과 를 모 으 는 것 이다.
    예:
    val pa = sc.parallelize(Array(1,2,3,4))
    def seqOp(x:String,y:Int) = x+y.toString
    def combOp(x:String,y:String) = x+y
    pa.aggregate("")(seqOp,combOp)

    취 합 시 파 티 션 의 순서 가 일정 하지 않 기 때문에 위의 코드 의 실행 결과 도 확실 하지 않 습 니 다.
    scala> pa.aggregate("")(seqOp,combOp)
    res201: String = 2413
    
    scala> pa.aggregate("")(seqOp,combOp)
    res202: String = 1234

    takeSample(withReplacement, num, [seed])
    무 작위 추출 num 개 샘플, - with Replacement: 추출 후 원 소 를 되 돌려 놓 을 지 여부 - num: 추출 개수 - [seed]: 선택 가능 한 매개 변수, 무 작위 피 드
    val pa = sc.parallelize(Array(1,2,3,4,5,6))
    pa.takeSample(false,3)

    무 작위 결과:
    res185: Array[Int] = Array(3, 2, 5)

    takeOrdered[T](n:Int)(implicit ord: Ordering[T])
    정렬 후 n 의 요 소 를 되 돌려 줍 니 다. 두 번 째 암시 적 매개 변수 ordering 는 컴 파일 러 가 스스로 찾 고 사용자 가 정의 할 수 있 습 니 다.
    val pa = sc.parallelize(Array(1,2,3,4,5,6))
    pa.takeOrdered(3)

    결과:
    res193: Array[Int] = Array(1, 2, 3)

    비교 기 사용자 정의 시도:
    object Ord extends Ordering[Int] {
      override def compare(x:Int,y:Int):Int = {
        if(x1 else -1;
      }
    }
    val pa = sc.parallelize(Array(1,2,3,4,5,6))
    pa.takeOrdered(3)(Ord)

    이번 결 과 는:
    res195: Array[Int] = Array(6, 5, 4)

    countByKey()
    키 값 은 종류 RDD 에 유효 하 며, 키 마다 대응 하 는 요소 개 수 를 집계 합 니 다.
    saveAsTextFile(path)
    모든 요 소 는 한 줄 로 텍스트 파일 (또는 일련의 텍스트 파일) 을 기록 합 니 다.인자 path 가 지정 한 기록 디 렉 터 리 로 로 컬 파일 시스템, HDFS 및 기타 Hadoop 이 지원 하 는 파일 시스템 을 지원 합 니 다.
    saveAsSequenceFile(path)
    자바 와 스칼라 지원) 모든 요 소 를 Hadoop SequenceFile 에 기록 하고 로 컬 파일 시스템, HDFS, Hadoop 이 지원 하 는 모든 파일 시스템 을 지원 합 니 다.
    Hadoop Writable 인 터 페 이 스 를 실현 하 는 키 값 만 이 형식의 RDD 를 지원 합 니 다.
    스칼라 에 서 는 암시 적 으로 Writable 으로 전환 할 수 있 는 형식 도 이 동작 을 지원 합 니 다. (Spark 는 기본 유형 인 Int, Double, String 등에 대해 암시 적 으로 변환 되 었 습 니 다)
    saveAsObjectFile(path)
    자바 의 직렬 화 형식 직렬 화 대상 을 사용 하여 자바 와 스칼라 를 지원 합 니 다. 불 러 오 려 면 SparkContext. object File () 을 사용 하 십시오.

    좋은 웹페이지 즐겨찾기