RDD를 DataFrame으로 변환하는 두 가지 방법

4014 단어 Spark

개술


Spark SQL은 RDD를 DataFrame으로 변환하는 두 가지 방법을 지원합니다.첫 번째는 반사로 특정 유형의 대상을 포함하는 RDD를 추정하는 모델이다. 반사에 기반한 방식은 더욱 간결한 코드를 제공할 수 있다. 만약에 스파크 프로그램을 작성할 때 schema가 명확해지면 이런 방식을 사용할 수 있다.두 번째 방법은 프로그래밍 가능한 인터페이스를 통해 schema를 구축하고 이를 기존 RDD에 응용하는 것이다.이 방식으로 작성된 코드는 더 지루하지만,colum과 type을 모르는 상황에서 사용할 수 있습니다.
다음 사례의 데이터 집합은 다음과 같다people.txt:
Michael, 29
Andy, 30 
Justin, 19

2. RDD에서 DataFrame으로 전환하는 사례


    1.반사식


Spark SQL의 Scala 인터페이스는 샘플 클래스를 포함하는 RDD를 DataFrame으로 자동 변환할 수 있습니다.샘플 클래스 정의표의 schema.샘플 클래스의 매개 변수 이름을 반사해서 읽고column의 이름으로 비추십시오.
package com.company.sparksql

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object RDD2DF_m1 {
  //     
  case class  Person(name: String, age: Int)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    //      ,  RDD  DataFrame
    import spark.implicits._
    //        RDD,      DataFrame
    val peopleDF = spark.sparkContext
      .textFile("file:///E:/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    // DataFrame       
    peopleDF.createOrReplaceTempView("people")
    //   SQL  
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    //          
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+

    //         
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
  }
}

  2.schema를 구축하는 방식을 통해


schema를 구성하여 DataFrame을 생성하려면 주로 3단계가 포함됩니다.
(1) 원시 RDD에서 RDD를 만드는 것(2) StructType을 사용하고, schema를 만드는 것(3)createDataFrame 방법을 통해 RDD에 schema를 적용한다.
package com.company.sparksql

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object RDD2DF_m2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("RDD2DF_m1")
      .master("local")
      .getOrCreate()
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    runRDD2DF(spark)
  }

  private def runRDD2DF(spark: SparkSession) = {
    //      ,  RDD  DataFrame
    import spark.implicits._
    //    RDD
    val peopleRDD = spark.sparkContext.textFile("file:///E:/people.txt")
    //step 1    RDD   ROW   RDD
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim.toInt))
    //step 2   schema
    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))
    //step 3   DF
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    //  DataFrame       
    peopleDF.createOrReplaceTempView("people")
    //   SQL  
    val results = spark.sql("SELECT name FROM people")
    //          
    results.map(attributes => "Name: " + attributes(0)).show()
    // +-------------+
    // |        value|
    // +-------------+
    // |Name: Michael|
    // |   Name: Andy|
    // | Name: Justin|
    // +-------------+
  }
}

좋은 웹페이지 즐겨찾기