Spark의 RDD 일반 작업

67470 단어 Spark
본고는 SPARK 조작 RDD의 일부 코드 사례를 공유하였다.

1. RDD란 무엇인가?


Resilient Distributed Datasets(resilient의 분산 데이터 세트)
RDDs are immutable, fault-tolerant,parallel data structures that let users explicitly persist intermediate results in memory,control their partitioning to optimize data placement, and manipulate them using a rich set of operators. (개인적으로는 영어를 아주 분명하게 썼으니 번역할 필요가 없다)

2. RDD의 특징

  • Immutable(변경 불가)
  • Fault Tolerant(내결함성)
  • Parallel Data Structures(병렬 데이터 구조)
  • In-Memory Computing(메모리 컴퓨팅)
  • Data Partitioning and Placement
  • Rich Set of Operations
  • 3. RDD 작업


    transformation vs action ?
    In short, RDDs are immutable, RDD transformations are lazily evaluated, and RDD actions are eagerly evaluated and trigger the computation of your data processing logic.(개인적으로는 영어를 아주 분명하게 썼으니 번역할 필요가 없다).

    4. RDD 생성 방법


    전반적으로 세 가지가 있어요.
  • 작성된 Object collection에서
  • HDFS와 같은 외부 파일에서 가져오기
  • 다른 RDD에서transformation을 통해
  • 사례 1.만든 Object collection에서 RDD 가져오기
     def main(args: Array[String]): Unit = {
        val conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.setAppName("create RDD by Array Object");
        conf.set("spark.testing.memory", "2147480000");
        val str_list = Array[String]("How to create RDD", "funny game","Spark is cool!");
        val sc = new SparkContext(conf);
        val RDD_str = sc.parallelize(str_list,2);
        print(RDD_str.count());
    
      }
    

    사례 2.HDFS에서 RDD 가져오기
    def main(args: Array[String]): Unit = {
        //   conf  
        val sparkConf = new SparkConf()
          .setAppName("WordCount sample")
         // .setMaster("192.168.1.10:4040")
          .setMaster("local[2]")
          .set("spark.testing.memory", "2147480000");
        val sc = new SparkContext(sparkConf);
        val rdd = sc.textFile("/user/hadoop/worddir/word.txt");
        val tupleRDD = rdd.flatMap(line => {line.split(" ")
            .toList.map(word => (word.trim,1))
        });
        val resultRDD :RDD[(String,Int)] =tupleRDD.reduceByKey((a,b)=> a + b);
        resultRDD.foreach(elm => println(elm._1+"="+elm._2));
    
        Thread.sleep(10000);
        sc.stop();
    
    
      }
    

    사례 3.다른 RDD에서 RDD 가져오기
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf();
        conf.setAppName("Get RDD from existed RDD");
        conf.setMaster("local[2]");
        conf.set("spark.testing.memory","2147480000");
        //conf.set("","")
        val str_array = Array("one","two","three");
        val sc = new SparkContext(conf);
        val RDD_list = sc.parallelize(str_array,2);
        val RDD_list1 = RDD_list.map(l=>{l+" ******"});
        RDD_list1.foreach(elm=>{println(elm)});
    
      }
    

    5. RDD transformation 작업


    일반적으로 사용되는 몇 가지 RDD 작업은 다음과 같습니다.
    이름:
    묘사
    map(func)
    this applies the provided function to each row as iterating through the rows in the dataset. the returned rDD will contain whatever the provided func returns
    flatMap(func)
    similar to map(func), the func should return a collection rather than a single element, and this method will flatten out the returned collection. this allows an input item to map to zero or more output items.
    filter(func)
    Only the elements that the func function returns true will be collected in the returned rDD. in other words, collect only the rows that meet the condition defined in the given func function.
    mapPartitions(func)
    similar to map(func), but this applies at the partition (chunk) level. this requires the func function to take the input as an iterator to iterate through each row in the partition.
    mapParitionsWithIndex(func)
    this is similar to mapPartitions, but an additional partition index number is provided to the func function.
    mapParitionsWithIndex(func)
    this is similar to mapPartitions, but an additional partition index number is provided to the func function.
    union(otherRDD)
    this is similar to mapPartitions, but an additional partition index number is provided to the func function.
    intersection(otherRDD)
    Only the rows that exist in both the source rDD and Only the rows that exist in both the source rDD and
    substract(otherRDD)
    this subtracts the rows in otherRDD from the source rDD.
    distinct([numTasks])
    this removes duplicate rows from the source rDD.
    sample(withReplace, fraction,seed)
    this is usually used to reduce a large dataset to a smaller one by randomly selecting a fraction of rows using the given seed and with or without replacements.
    사례 1.map (func): 집합의 줄마다 교체합니다. 되돌아오는 RDD 형식은func의 논리를 포함합니다.
    object RDDTest08 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.setAppName("RDD map transformation");
        conf.set("spark.testing.memory","2147480000");
        // define class Person
        case class Person(id:Int,name:String,phoneNum:String);
        val personArray = Array("1#Jason#1233242432","2#Mike#1232131312");
        val sc = new SparkContext(conf);
        val personRDD = sc.parallelize(personArray);
        val personObjectRDD = personRDD.map(person=>{
          val personInfo = person.split("#");
          Person(personInfo(0).toInt,personInfo(1),personInfo(2));
        })
        personObjectRDD.collect.foreach(println);
      }
    

    결과 내보내기
    Person(1,Jason,1233242432)
    Person(2,Mike,1232131312)
    

    사례 2.flatMap(func): 맵과 같은 동작으로 줄마다 되돌아오는 대상을 다시 편평하게 합니다
    ***map vs flatMap
    예를 들어string 형식의collection array("this is one", "this is two") 역시 공백으로 문자열을 분할하여 되돌아오는 결과가 다르다
  • 맵은 두 개의 집합array("this", "is", "one),array("this","is","two")
  • 를 되돌려줍니다.
  • flatMap의 경우 큰 집합으로 되돌아온다("this", "is", "one", "this", "is", "two") 즉 맵의 되돌아오는 결과를 편평하게 출력
  • 맵이 반환하는 유형은 Array[Array[string]입니다.
    val strArray = Array("this is one","this is two");
       val sc = new SparkContext(conf);
       val strRDD = sc.parallelize(strArray);
       val resultRDD = strRDD.map(line=>{
         line.split(" ")
       })
      val array:Array[Array[String]] = resultRDD.collect();
       array.foreach(print);
    

    flatMap이 반환하는 유형은 Array[string]입니다.
     val strArray = Array("this is one","this is two");
        val sc = new SparkContext(conf);
        val strRDD = sc.parallelize(strArray);
        val resultRDD = strRDD.flatMap(line=>{
          line.split(" ")
        })
       val array:Array[String] = resultRDD.collect();
        array.foreach(print);
    

    사례 3.filter(func): 행 기반 필터링, RDD 구조로 반환
        val strArray = Array("this is one","this is two");
        val sc = new SparkContext(conf);
        val strRDD = sc.parallelize(strArray);
        val resultRDD = strRDD.filter(line=>{line.contains("two")});
        resultRDD.foreach(print);
    

    사례 4.mapPartitions(func):맵을 파티션 처리
     case class Person(id:Int,name:String,phoneNum:String);
        val personArray = Array("1#Jason#1233242432","2#Mike#1232131312","3#James#01902992888","4#Tom#1231232222");
        val sc = new SparkContext(conf);
        val personRDD = sc.parallelize(personArray,2);
        val personObjectRDD = personRDD.mapPartitions((iter: Iterator[String])=> {
          iter.map(person=>{
            val personInfo = person.split("#");
            Person(personInfo(0).toInt,personInfo(1),personInfo(2))
          });
    
        });
    
        personObjectRDD.collect.foreach(println);
    

    사례 5.mapPartitionsWithIndex: 맵을 섹션 처리하기 (섹션 키 포함)
    case class Person(id:Int,name:String,phoneNum:String,key:Int);
        val personArray = Array("1#Jason#1233242432","2#Mike#1232131312","3#James#01902992888","4#Tom#1231232222");
        val sc = new SparkContext(conf);
        val personRDD = sc.parallelize(personArray,2);
        val personObjectRDD = personRDD.mapPartitionsWithIndex((idx:Int,iter: Iterator[String])=> {
          iter.map(person=>{
            val personInfo = person.split("#");
            Person(personInfo(0).toInt,personInfo(1),personInfo(2),idx);
          });
    
        });
    
        personObjectRDD.collect.foreach(println);
    

    4개의 개인 개체에 대한 partition 조회: Jason과 Mike는 partiton 0, James와 Tom은 partiton 1
    Person(1,Jason,1233242432,0)
    Person(2,Mike,1232131312,0)
    Person(3,James,01902992888,1)
    Person(4,Tom,1231232222,1)
    

    사례 6.union(otherRDD): 통합 작업기존 SQL의 union과 달리 이 작업은 무겁지 않습니다.
     val intArray1 = Array(0,1,3,5,7,9);
     val intArray2 = Array(0,2,4,6,8,10);
     val sc = new SparkContext(conf);
     val intRDD1 = sc.parallelize(intArray1);
     val intRDD2 = sc.parallelize(intArray2);
     val unionRDD = intRDD1.union(intRDD2);
     println(unionRDD.collect().toList);
    

    결과 내보내기
    List(0, 1, 3, 5, 7, 9, 0, 2, 4, 6, 8, 10) 
    

    사례 6.intersection(otherRDD):교차 작업
     val sc = new SparkContext(conf);
     val strRDD1 = sc.parallelize(Array("one","two","three"));
     val strRDD2 = sc.parallelize(Array("two","three"));
     val intersectionRDD = strRDD1.intersection(strRDD2);
     println(intersectionRDD.collect().toList);
    

    결과 내보내기
    List(two, three)
    

    사례 7.substract(otherRDD):차집합 작업
     val sc = new SparkContext(conf);
     val strRDD1 = sc.parallelize(Array("one","two","three"));
     val strRDD2 = sc.parallelize(Array("two","three"));
     val subtractRDD = strRDD1.subtract(strRDD2);
     println(subtractRDD.collect().toList);
    

    결과 내보내기
    List(one)
    

    사례 8.distinct () 중복된 값 제거
    val sc = new SparkContext(conf);
    val duplicatedRDD = sc.parallelize(List("one",1,"two",2,"three",3,"four","four"));
    print(duplicatedRDD.distinct().collect().toList);
    

    결과 내보내기
    List(two, one, three, four, 2, 1, 3)
    

    사례 9.sample(withReplacement, fraction, seed)
    이 transform 은 분석할 데이터의 통계 정보를 나타냅니다.
    withReplacement 필수 옵션:true 는 중복 값을 사용할 수 있음을 나타내고false 는 중복 값을 사용할 수 없음을 나타냅니다.
    fraction 필수 옵션: 0-1 사이에서 Sample의 백분율을 얻습니다
    seed 옵션: 무작위 값을 추출하는 함수를 정의합니다. 참고하지 않으면 기본적으로 하나를 지정합니다.
    val sc = new SparkContext(conf);
    val intArray= sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10));
    print(intArray.sample(false,0.2).collect().toList);
    

    결과 내보내기
    List(1, 4)
    

    6. RDD action 작업


    사례 1.collect (): RDD를 집합으로 변환
    *** 만약 당신의 결과가 너무 크면 out-of-memory를 일으킬 수 있으니 주의하십시오
    ***결과 집합 세척 여과 후 변환하는 것이 좋습니다
    val sc = new SparkContext(conf);
    print(sc.parallelize(Array(1,2,3,4,5)).collect().toList);
    

    결과 내보내기
    List(1, 2, 3, 4, 5)
    

    사례 2.count(): 카운트
    val sc = new SparkContext(conf);
    print(sc.parallelize(Array(1,2,3,4,5)).count());
    

    결과 내보내기
    5
    

    사례 3.first (): 첫 번째
    val sc = new SparkContext(conf);
    print(sc.parallelize(Array(1,2,3,4,5)).first());
    

    결과 내보내기
    1
    

    사례 4.take(n): N 요소 가져오기
    val sc = new SparkContext(conf);
    sc.parallelize(Array(1,2,3,4,5)).take(3).foreach(print);
    

    결과 내보내기
    123
    

    사례 6.reduce(func): 데이터 처리 병합
    val sc = new SparkContext(conf);
    val sumRDD = sc.parallelize(Array(1,2,3,4,5)).reduce((a1:Int,a2:Int)=>{a1+a2});
    print(sumRDD.toInt);
    

    결과 내보내기
    15
    

    사례 7.takeSample(withReplacement, n, [seed])
    transform의 Sample과 유사
    val sc = new SparkContext(conf);
    print(sc.parallelize(Array(1,2,3,4,5)).takeSample(false,2).toList);
    

    결과 내보내기
    List(1, 5)
    

    사례 8.takeOrdered(n, [ordering])
    출력 정렬
    val sc = new SparkContext(conf);
    sc.setLogLevel("ERROR");
    println(sc.parallelize(Array(1,2,3,4,5)).takeOrdered(3) (Ordering[Int].reverse).toList);
    println(sc.parallelize(Array(1,2,3,4,5)).takeOrdered(3).toList);
    

    결과 내보내기
    List(5, 4, 3)
    List(1, 2, 3)
    

    사례 9.top(n, [ordering])
    top N 요소 내보내기
    val sc = new SparkContext(conf);
    sc.setLogLevel("ERROR");
    println(sc.parallelize(Array(1,2,3,4,5)).top(3).toList);
    println(sc.parallelize(Array(1,2,3,4,5)).top(3)(Ordering[Int]).toList);
    

    결과 내보내기
    List(5, 4, 3)
    List(5, 4, 3)
    

    좋은 웹페이지 즐겨찾기