Spark 핵심 프로 그래 밍 - RDD 변환 작업

1. 기초 전환 작업
  • map RDD 의 모든 요소 에 대해 지정 한 함 수 를 실행 하여 새로운 RDD 를 만 듭 니 다. 원래 RDD 의 요 소 는 새 RDD 에 있 고 하나의 요소 만 대응 합 니 다.
  • public static void mapTest(JavaSparkContext sc) {
        List words = Arrays.asList("hello", "world");
        JavaRDD wordsRDD = sc.parallelize(words);
        // map         
        JavaRDD> wordCountRDD = wordsRDD
            .map(new Function>() {
              private static final long serialVersionUID = 4883828149185152684L;
    
              public Tuple2 call(String v1) throws Exception {
                return new Tuple2(v1, 1);
              }
            });
        //       ,         
        wordCountRDD.foreach(new VoidFunction>() {
          private static final long serialVersionUID = 4892545561975184834L;
    
          public void call(Tuple2 t) throws Exception {
            System.out.println("  :" + t._1 + ",  :" + t._2);
          }
    
        });
    }
  • distinct RDD 중복 요 소 를 제거 하고 모든 요소 가 중복 되 지 않 는 RDD 를 되 돌려 줍 니 다.
  • public static void distinctTest(JavaSparkContext sc) {
        List nums = Arrays.asList(1, 2, 2, 3, 5);
        JavaRDD numsRDD = sc.parallelize(nums);
        // distinct    
        JavaRDD distinceNumsRDD = numsRDD.distinct();
        distinceNumsRDD.foreach(new VoidFunction() {
          private static final long serialVersionUID = 647204360041943265L;
    
          public void call(Integer t) throws Exception {
            System.out.println(t);
          }
    
        });
    }
  • flatMap 먼저 flat 편평 화 를 하고 맵 작업 을 합 니 다.
  • public static void flatMapTest(JavaSparkContext sc) {
        List words = Arrays.asList("spark core", "spark sql", "spark streaming");
        JavaRDD wordsRDD = sc.parallelize(words, 3);
    
        JavaRDD splitedRDD = wordsRDD.flatMap(new FlatMapFunction() {
          private static final long serialVersionUID = 840597214907231645L;
    
          public Iterable call(String t) throws Exception {
            return Arrays.asList(t.split(" "));
          }
    
        });
        splitedRDD.foreach(new VoidFunction() {
          private static final long serialVersionUID = 4032309929532415386L;
    
          public void call(String t) throws Exception {
            System.out.println("  :"+t);
          }
    
        });
    }
  • coalesce RDD 는 지정 한 파 티 션 수 에 따라 재 파 티 션 을 하고 두 번 째 매개 변 수 는 shuffle 을 진행 할 지 여 부 를 지정 합 니 다.메모: 지정 한 파 티 션 수가 원 파 티 션 보다 작 으 면 순조롭게 진행 할 수 있 습 니 다. 그러나 원 파 티 션 수 보다 크 면 shuffle 인 자 를 true 로 지정 해 야 합 니 다. 그렇지 않 으 면 파 티 션 이 바 뀌 지 않 습 니 다.
  • repartition 도 지정 한 파 티 션 수 에 따라 재 파 티 션 을 진행 하지만 두 번 째 매개 변 수 는 기본적으로 true 입 니 다. 즉, 기본 값 은 shuffle 작업
  • 이 필요 합 니 다.
    public static void repartitionTest(JavaSparkContext sc) {
        List words = Arrays.asList("spark core", "spark sql", "spark streaming");
        JavaRDD wordsRDD = sc.parallelize(words, 3);
        System.out.println("         :" + wordsRDD.partitions().size());
        JavaRDD coalesce1RDD = wordsRDD.coalesce(2);
        System.out.println("       2     :"+coalesce1RDD.partitions().size());
        JavaRDD coalesce2RDD = wordsRDD.coalesce(4);
        System.out.println("       4    shuffle     :"+coalesce2RDD.partitions().size());
        JavaRDD coalesce3RDD = wordsRDD.coalesce(4,true);
        System.out.println("       4   shuffle     :"+coalesce3RDD.partitions().size());
        
        JavaRDD repartitionRDD = wordsRDD.repartition(4);
        System.out.println("repartition     :" + repartitionRDD.partitions().size());
    }
  • randomSplit (weights: Array [Double], seed: Long = Utils. random. nextLong): Array [RDD [T] 는 weights 가중치 에 따라 하나의 RDD 를 여러 개의 RDD 로 나 누 었 다.
  • public static void randomSplit(JavaSparkContext sc) {
        List nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        JavaRDD numsRDD = sc.parallelize(nums, 10);
        //      1
        double[] weights = new double[] {0.1,0.2,0.3,0.4};
        JavaRDD[] RDDs = numsRDD.randomSplit(weights);
        
        int index = 1;
        for (JavaRDD rdd : RDDs) {
          System.out.println(" "+(index++)+" RDD");
          rdd.foreach(new VoidFunction() {
            private static final long serialVersionUID = 510345733961440792L;
    
            public void call(Integer t) throws Exception {
              System.out.print(t+" ");
            }
            
          });
        }
    }
  • glom (): RDD [Array [T]] 는 RDD 의 각 파 티 션 의 모든 유형 이 T 인 데 이 터 를 요소 형식 이 T 인 배열 Array [T]
  • 로 변환 합 니 다.
    public static void glomTest(JavaSparkContext sc) {
        List nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        JavaRDD numsRDD = sc.parallelize(nums, 3);
        JavaRDD> glomRDD = numsRDD.glom();
        glomRDD.foreach(new VoidFunction>() {
    
          private static final long serialVersionUID = -3717022904720402200L;
    
          public void call(List t) throws Exception {
            System.out.println(t.toString());
          }
          
        });
    }
  • union (other: RDD [T]): RDD [T] RDD 를 합병 하여 두 RDD 의 집합 을 되 돌려 줍 니 다. 원 소 를 되 돌려 주 고 무 겁 지 않 습 니 다
  • intersection (other: RDD [T]): RDD [T] 는 두 개의 RDD 의 교 집합 을 되 돌려 줍 니 다. SQL 의 inner join 과 유사 하고 요 소 를 되 돌려 줍 니 다
  • intersection(other:RDD[T],numPartitions:Int):RDD[T]
  • intersection(other:RDD[T],partitioner:Partitioner):RDD[T]
  • subtract (other: RDD [T]): RDD [T] 는 RDD 에 나타 나 지만 other RDD 에 나타 나 지 않 는 원 소 를 되 돌려 줍 니 다. 원 소 를 되 돌려 주 는 것 은 무 겁 지 않 습 니 다
  • subtract(other:RDD[T],numPartitions:Int):RDD[T]
  • subtract(other:RDD[T],partitioner:Partitioner):RDD[T]
  • public static void unionTest(JavaSparkContext sc) {
        List num1s = Arrays.asList(1,2,3,4,5);
        List num2s = Arrays.asList(2,3,4,6,7);
        JavaRDD num1sRDD = sc.parallelize(num1s);
        JavaRDD num2sRDD = sc.parallelize(num2s);
        
        JavaRDD unionRDD = num1sRDD.union(num2sRDD);
        System.out.println("union(     )    :"+unionRDD.collect().toArray());
        
        JavaRDD intersectionRDD = num1sRDD.intersection(num2sRDD);
        System.out.println("intersection(    )    :"+intersectionRDD.collect().toArray());
        
        JavaRDD subtractRDD = num1sRDD.subtract(num2sRDD);
        System.out.println("subtract(     )    :"+subtractRDD.collect().toArray());
    }
  • mapPartitions[U](f: (Iterator[T]) =>Iterator[U],preserversPartitions:BooLean = false):RDD[U]

  • 맵 작업 과 유사 합 니 다. 맵 의 매개 변 수 는 RDD 의 모든 요소 에서 RDD 의 모든 파 티 션 의 교체 기 가 되 었 습 니 다. 그 중에서 preservers Partitions 는 부모 RDD 의 paritions 파 티 션 정 보 를 보존 할 지 여 부 를 표시 합 니 다.맵 과정 에서 자주 생 성 되 는 추가 대상 이 필요 하 다 면 맵 파 티 션 스 를 사용 하 는 것 이 맵 을 조작 하 는 것 보다 효율 적 입 니 다. 예 를 들 어 RDD 의 모든 데 이 터 는 JDBC 를 통 해 데이터 베 이 스 를 기록 합 니 다. 맵 함 수 를 사용 하면 모든 요소 에 연결 을 만 들 수 있 고 맵 파 티 션 스 를 사용 하면 각 파 티 션 에 연결 만 만들어 야 합 니 다.그러나 mapPartitions 는 큰 대상 에 적용 되 지 않 습 니 다. 메모리 에 한꺼번에 불 러 오 면 메모리 가 넘 치기 쉽 기 때 문 입 니 다.
    public static void mapPartitionsTest(JavaSparkContext sc) {
        List nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD numsRDD = sc.parallelize(nums, 3);
        //          
        JavaRDD totalRDD = numsRDD
            .mapPartitions(new FlatMapFunction, Integer>() {
              private static final long serialVersionUID = 8064196197615893270L;
    
              public Iterable call(Iterator t) throws Exception {
                Integer total = 0;
                while (t.hasNext()) {
                  total = total + t.next();
                }
                return Arrays.asList(total);
              }
    
            });
        System.out.println(totalRDD.collect().toArray());
    }
  • mapPartitionsWithIndex[U](f: (Int,Iterator[T] )=> Iterator[U],preserversPartitions:BooLean = false)):RDD[U]

  • mapPartitions 와 유사 합 니 다. 파 라 메 터 를 입력 하면 파 티 션 색인 이 하나 더 생 깁 니 다.
    public static void mapPartitionsWithIndexTest(JavaSparkContext sc) {
        List nums = Arrays.asList("1", "2", "3", "4", "5");
        JavaRDD numsRDD = sc.parallelize(nums, 2);
        JavaRDD totalRDD = numsRDD
            .mapPartitionsWithIndex(new Function2, Iterator>() {
              private static final long serialVersionUID = 2818512306761968511L;
    
              public Iterator call(Integer index, Iterator nums) throws Exception {
                StringBuilder builder = new StringBuilder();
                while (nums.hasNext()) {
                  builder.append(nums.next() + "、");
                }
                return Arrays.asList("      :" + index + ",   :" + builder.toString()).iterator();
              }
    
            }, false);
        System.out.println(totalRDD.collect());
    }
  • zip U: RDD [(T, U)] 두 RDD 를 Key / Value 형식의 RDD 로 조합 하 는 데 사용 되 며, 기본적으로 두 RDD 의 구분 수 와 요소 의 수량 이 같 습 니 다. 그렇지 않 으 면 이상 을 던 집 니 다. 이것 도 지퍼 조작
  • 입 니 다.
    val rdd1 = sc.makeRDD(1 to 5,2)
    val rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
    
    rdd1.zip(rdd2).collect
    //     
    res14: Array[(Int, String)] = Array((1,A), (2,B),(3,C), (4,D), (5,E))
    
    val rdd3 = sc.makeRDD(Seq("A","B","C"),3)
    rdd1.zip(rdd3).collect
    //     RDD     ,    。(Can't zip RDDs with unequal numbers of partitions:List(2,3))
    
  • zipPartitions: 여러 개의 RDD 를 partition 에 따라 새로운 RDD 로 조합 합 니 다. 이 작업 은 RDD 파 티 션 수가 같 아야 하지만 각 파 티 션 내 요소 의 수량 에 제한 이 없습니다.
  • zipWithindex (): RDD [(T, Long)] RDD 의 요소 와 이 요 소 를 RDD 의 id 색인 번호 로 조합 하여 키 쌍
  • public static void zipWithIndexTest(JavaSparkContext sc) {
        List strs = Arrays.asList("a", "b", "c", "d", "e");
    
        JavaRDD strsRDD = sc.parallelize(strs, 2);
        JavaPairRDD pairRDD = strsRDD.zipWithIndex();
        System.out.println(pairRDD.collect());
    }
  • zip With UniqueId (): RDD [(T, Long)] 는 RDD 의 요소 와 유일한 ID 를 키 쌍 으로 조합 합 니 다.

  • 이 유일한 ID 생 성 알고리즘 은 다음 과 같 습 니 다. (1) 각 파 티 션 의 첫 번 째 요소 의 유일한 ID 값 은: 이 파 티 션 색인 번호 입 니 다.(2) 각 파 티 션 에서 N 번 째 요소 의 유일한 ID 값 은: 이전 요소 의 유일한 ID 값 + 이 RDD 의 전체 파 티 션 수 입 니 다.
    그 중에서 zipWith Index 는 각 구역 의 시작 색인 번 호 를 계산 하기 위해 Spark 작업 을 시작 해 야 하 며, zipWith UniqueId 는 필요 하지 않 습 니 다.
    public static void zipWithUniqueIdTest(JavaSparkContext sc) {
        List strs = Arrays.asList("a", "b", "c", "d", "e");
    
        JavaRDD strsRDD = sc.parallelize(strs, 2);
        JavaPairRDD pairRDD = strsRDD.zipWithUniqueId();
        System.out.println(pairRDD.collect());
    }

    2. 키 값 변환 작업
  • partition By (p: Partitioner): RDD [(K, V)] Partition 함수 에 따라 새로운 ShuffleRDD 를 생 성하 여 원래 의 RDD 를 다시 구분 합 니 다
  • mapValues [U]: (f: (V) = > U): RDD [(K, V)] 맵 과 유사 한 것 은 [K, V] 의 value 값 에 대한 맵 작업
  • 에 불과 합 니 다.
  • flatMapValues [U]: (f: (V) = > TraversableOnce [U]): RDD [(K, V)] 는 flatMap 작업 과 유사 하 며 [K, V] 의 value 값 에 대해 flatMap 작업
  • 에 불과 합 니 다.
    public static void mapValuesTest(JavaSparkContext sc) {
        List strs = Arrays.asList("spark core", "spark sql", "spark streaming");
        JavaRDD strsRDD = sc.parallelize(strs);
    
        JavaRDD splitedRDD = strsRDD.flatMap(new FlatMapFunction() {
    
          private static final long serialVersionUID = -984130321206766818L;
    
          public Iterable call(String t) throws Exception {
            return Arrays.asList(t.split(" "));
          }
        });
    
        JavaPairRDD pairRDD = splitedRDD
            .mapToPair(new PairFunction() {
              private static final long serialVersionUID = 2043541493697396334L;
    
              public Tuple2 call(String t) throws Exception {
                return new Tuple2(1, t);
              }
    
            });
    
        JavaPairRDD resultRDD = pairRDD.mapValues(new Function() {
          private static final long serialVersionUID = 924538234523756151L;
    
          public String call(String value) throws Exception {
            return value.toUpperCase();
          }
    
        });
        System.out.println(resultRDD.collect());
    }
  • reduceByKey(func:(V,V) => V):RDD[(K,V)]
  • reduceByKey(func:(V,V) => V,numPartitions:Int):RDD[(K,V)]
  • reduceByKey(p:Partitioner,func:(V,V) =>V):RDD[(K,V)]
  • reduceByKeyLocally(func:(V,V) => V):Map[(K,V)]

  • reduceByKey: RDD [K, V] 의 모든 K 에 대응 하 는 V 값 을 매 핑 함수 에 따라 계산 하 는 데 사 용 됩 니 다. 솔직히 같은 key 의 value 를 reduce 작업 을 합 니 다. 내부 에서 사실은 combineByKey 를 호출 하고 numPartitions 는 지정 한 파 티 션 에 사 용 됩 니 다.reduceByKey Locally 는 연산 결 과 를 RDD 가 아 닌 맵 에 표시 합 니 다.
    public static void reduceByKeyTest(JavaSparkContext sc) {
        List strs = Arrays.asList("spark core", "spark sql", "spark streaming");
        JavaRDD strsRDD = sc.parallelize(strs);
    
        JavaRDD splitedRDD = strsRDD.flatMap(new FlatMapFunction() {
    
          private static final long serialVersionUID = -984130321206766818L;
    
          public Iterable call(String t) throws Exception {
            return Arrays.asList(t.split(" "));
          }
        });
    
        JavaPairRDD pairRDD = splitedRDD
            .mapToPair(new PairFunction() {
              private static final long serialVersionUID = 2043541493697396334L;
    
              public Tuple2 call(String t) throws Exception {
                return new Tuple2(t, 1);
              }
    
            });
    
        JavaPairRDD resultRDD = pairRDD
            .reduceByKey(new Function2() {
              private static final long serialVersionUID = 4852162726837426718L;
    
              public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
              }
    
            });
        System.out.println(resultRDD.collect());
    }
  • groupByKey():RDD[(K,Iterable[V])]
  • groupByKey(numPartitions:Int):RDD[(K,Iterable[V])]
  • groupByKey(p:Partitioner):RDD[(K,Iterable[V])]

  • groupbyKey: RDD [K, V] 의 각 K 에 대응 하 는 V 값 을 하나의 집합 Iterable [V] 에 합 치 는 데 사용 합 니 다. 즉, key 에 따라 그룹 을 나 누 는 것 입 니 다.
    public static void groupByKeyTest(JavaSparkContext sc) {
        List strs = Arrays.asList("spark core", "spark sql", "spark streaming");
        JavaRDD strsRDD = sc.parallelize(strs);
    
        JavaRDD splitedRDD = strsRDD.flatMap(new FlatMapFunction() {
    
          private static final long serialVersionUID = -984130321206766818L;
    
          public Iterable call(String t) throws Exception {
            return Arrays.asList(t.split(" "));
          }
        });
    
        JavaPairRDD pairRDD = splitedRDD
            .mapToPair(new PairFunction() {
              private static final long serialVersionUID = 2043541493697396334L;
    
              public Tuple2 call(String t) throws Exception {
                return new Tuple2(t, 1);
              }
    
            });
    
        JavaPairRDD> resultRDD = pairRDD.groupByKey();
        System.out.println(resultRDD.collect());
    }
  • cogroup SQL 구문 의 모든 외부 연결 에 해당 하 며 좌우 RDD 의 기록 을 되 돌려 줍 니 다. 연결 되 지 않 은 것 은 비어 있 습 니 다.
  • val sparkConf = new SparkConf().setAppName("Client Main").
            setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(Array(("A",1),("B",2),("C",3)),2)
    val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("D","d")),2)
    val rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2)
    val rdd4 = rdd1.cogroup(rdd2,rdd3).collect
    
    /**
     * Array[(String, (Iterable[Int],Iterable[String], Iterable[String]))] 
     * =Array((B,(CompactBuffer(2),CompactBuffer(b),CompactBuffer())), 
     *(D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())), 
     * (A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))),
     *(C,(CompactBuffer(3),CompactBuffer(),CompactBuffer())), 
     *(E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))
     */
    
  • join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  • join[W](other: RDD[(K, W)], numPartitions: Int):RDD[(K, (V, W))]

  • 내부 연결 은 cogroup 을 기반 으로 이 루어 집 니 다. 두 RDD 사이 의 똑 같은 key 를 연결 하고 서로 다른 것 을 버 립 니 다.
    val sparkConf = new SparkConf().setAppName("Client Main").
            setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
    val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
    rdd1.join(rdd2).collect
    //Array[(String, (String, String))] = Array((B,(2,b)), (A,(1,a)))
    
  • leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V,Option[W]))]
  • leftOuterJoin[W](other: RDD[(K, W)], numPartitions:Int): RDD[(K, (V, Option[W]))]

  • 왼쪽 외부 연결 은 cogroup 기반 으로 이 루어 지 며 왼쪽 RDD 의 key 를 기준 으로 연결 되 며 다른 RDD 가 없 으 면 None 입 니 다.
    val sparkConf = new SparkConf().setAppName("Client Main").
            setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
    val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
    rdd1.leftOuterJoin(rdd2).collect
    // Array[(String,(String, Option[String]))] = Array((B,(2,Some(b))), (A,(1,Some(a))),(C,(3,None)))
    
  • rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K,(Option[V], W))]
  • rightOuterJoin[W](other: RDD[(K, W)],numPartitions:Int): RDD[(K, (Option[V], W))]

  • 오른쪽 외부 연결 은 cogroup 기반 으로 이 루어 지 며 오른쪽 RDD 의 key 를 기준 으로 연결 되 며 다른 RDD 가 없 으 면 None 입 니 다.
    val sparkConf = new SparkConf().setAppName("Client Main").
            setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
    val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
    rdd1.rightOuterJoin(rdd2).collect
    //Array[(String, (Option[String], String))] = Array((B,(Some(2),b)),(F,(None,f)), (A,(Some(1),a)))
    
  • fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K,(Option[V], Option[W]))]
  • fullOuterJoin[W](other: RDD[(K, W)],numPartitions:Int): RDD[(K, (Option[V], Option[W]))]

  • 모든 연결 은 cogroup 을 기반 으로 이 루어 집 니 다. 두 RDD 의 모든 키 값 쌍 은 연결 이 필요 합 니 다. 다른 쪽 이 없 으 면 None 입 니 다.
    val sparkConf = new SparkConf().setAppName("Client Main").
            setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
    val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
    rdd1.fullOuterJoin(rdd2).collect
    //Array[(String, (Option[String], Option[String]))] =Array((B,(Some(2),Some(b))), 
    // (F,(None,Some(f))),(A,(Some(1),Some(a))), (C,(Some(3),None)))
    
  • subtractByKey[W: ClassTag](other: RDD[(K, W)]):RDD[(K, V)]
  • subtractByKey[W: ClassTag](other: RDD[(K,W)],numPartitions: Int): RDD[(K, V)]

  • 첫 번 째 RDD 와 두 번 째 RDD 의 차 집, 즉 첫 번 째 RDD 가 두 번 째 RDD 에 존재 하지 않 는 요소, 예 를 들 어 {1, 2, 3, 5} 과 {1, 2, 4} 은 1, 2 가 두 번 째 집합 에 있 기 때문에 되 돌아 오지 않 습 니 다. 3 과 5 는 두 번 째 집합 에 없어 서 돌아 갑 니 다.
    val sparkConf = new SparkConf().setAppName("Client Main").
            setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
    val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
    rdd1.subtractByKey(rdd2).collect
    //Array[(String, String)] = Array((C,3))

    좋은 웹페이지 즐겨찾기