Spark 심층 분석 (9): SparkCore 의 RDD 전환 - 더 블 Value 유형

17878 단어 [빅 데이터] Spark
목차
  • union (otherDataset) 사례
  • subtract (otherDataset) 사례
  • 교차로 (otherDataset) 사례
  • cartesian (otherDataset) 사례
  • zip (otherDataset) 사례
  • union (otherDataset) 사례
    역할: 원본 RDD 와 인자 RDD 를 합 친 후 새로운 RDD 로 되 돌려 줍 니 다.
    필요: RDD 2 개 만 들 기, 병합
    (1)     RDD
    scala> val rdd1 = sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
    
    (2)     RDD
    scala> val rdd2 = sc.parallelize(5 to 10)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
    
    (3)    RDD   
    scala> val rdd3 = rdd1.union(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28
    
    (4)      
    scala> rdd3.collect()
    res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
    

    subtract (otherDataset) 사례
    역할: 계산 이 떨 어 지 는 함수 입 니 다. 두 RDD 에서 같은 요 소 를 제거 하고 서로 다른 RDD 는 유 지 됩 니 다.
    필요: RDD 두 개 를 만 들 고 첫 번 째 RDD 와 두 번 째 RDD 의 차 이 를 구 합 니 다.
    (1)     RDD
    scala> val rdd = sc.parallelize(3 to 8)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24
    
    (2)     RDD
    scala> val rdd1 = sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24
    
    (3)     RDD    RDD      
    scala> rdd.subtract(rdd1).collect()
    res27: Array[Int] = Array(8, 6, 7)
    

    intersection (otherDataset) 사례
    역할: 원본 RDD 와 인자 RDD 를 교차 시 킨 후 새로운 RDD 를 되 돌려 줍 니 다.
    필요: RDD 2 개 만 들 기, RDD 2 개 만 들 기
    (1)     RDD
    scala> val rdd1 = sc.parallelize(1 to 7)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
    
    (2)     RDD
    scala> val rdd2 = sc.parallelize(5 to 10)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
    
    (3)    RDD   
    scala> val rdd3 = rdd1.intersection(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28
    
    (4)      
    scala> rdd3.collect()
    res19: Array[Int] = Array(5, 6, 7)
    

    cartesian (otherDataset) 사례
    역할: 피리 칼 적 필요: RDD 두 개 를 만 들 고 RDD 두 개의 피리 칼 적 을 계산 합 니 다.
    (1)     RDD
    scala> val rdd1 = sc.parallelize(1 to 3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24
    
    (2)     RDD
    scala> val rdd2 = sc.parallelize(2 to 5)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24
    
    (3)    RDD        
    scala> rdd1.cartesian(rdd2).collect()
    res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))
    

    zip (otherDataset) 사례
    역할: 두 RDD 를 Key / Value 형식의 RDD 로 조합 합 니 다. 여 기 는 기본적으로 두 RDD 의 partition 수량 과 요소 수량 이 같 습 니 다. 그렇지 않 으 면 이상 을 던 집 니 다.
    필요: RDD 2 개 를 만 들 고, RDD 2 개 를 하나 로 묶 어서 (k, v) RDD 를 만 듭 니 다.
    (1)     RDD
    scala> val rdd1 = sc.parallelize(Array(1,2,3),3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    (2)     RDD( 1     )
    scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
    
    (3)   RDD     RDD   
    scala> rdd1.zip(rdd2).collect
    res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
    
    (4)   RDD     RDD   
    scala> rdd2.zip(rdd1).collect
    res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3))
    
    (5)     RDD( 1,2     )
    scala> val rdd3 = sc.parallelize(Array("a","b","c"),2)
    rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
    
    (6)   RDD     RDD   
    scala> rdd1.zip(rdd3).collect
    java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2)
    
    //       RDD    RDD      ,     
      at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      at scala.Option.getOrElse(Option.scala:121)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
      at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
      ... 48 elided
    

    좋은 웹페이지 즐겨찾기