Spark 세 가지 방식 으로 데 이 터 를 조회 합 니 다.

1. 표 한 장 씩 데이터: studentscores.txt
필드: 학급 번호, 학급 이름, 입학 날짜, 소속 학과 중국어 이름
  ,  ,  ,      ,    
170401011001 ,   , ,0101,467
170401011002 ,  , ,0101,518
170401011003 ,  , ,0101,509
170401011004 ,   , ,0101,508
170401011005 ,  , ,0101,494
170401011006 ,  , ,0101,500
170401011007 ,  , ,0101,490
170401011008 ,   , ,0101,466

 
세 가지 방식 을 사용 하 다
제 1 종: 열 이름 을 지정 하여 Schema 추가  조회 데이터
package SparkSql


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, sql}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Created by      on 2020/4/13.
  *      
  *           Schema
  */
object CreateDFDS {


  def main(args: Array[String]): Unit = {

    //    1   SparkSeesion
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("01").getOrCreate()

    //   sc
    val sc: SparkContext = spark.sparkContext

    //    
    val fileRDD: RDD[String] = sc.textFile("E:\\student_scores.txt")
    //                  
    val DataRDD: RDD[(String, String,String, String,String)] = fileRDD.map(a => a.split(",")).map(b => (b(0), b(1),b(2),b(3),b(4)))
    //    222
    import spark.implicits._
    val DataDF: DataFrame = DataRDD.toDF()

    DataDF.show()
    DataDF.printSchema()

    //  
    sc.stop()
    spark.stop()
  }


}

 
두 번 째: StructType 을 통 해 Schema 지정  조회 데이터
package SparkSql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, _}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


/**
  * Created by      on 2020/4/13.
  *
  *      
  *
  *
  *
  */
object CreateDFDS_02 {

  def main(args: Array[String]): Unit = {


    //    1   SparkSeesion
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("01").getOrCreate()

    //   sc
    val sc: SparkContext = spark.sparkContext

    //    
    val fileRDD: RDD[String] = sc.textFile("E:\\student_scores.txt")

    //                  
    val RowRDD: RDD[Row] = fileRDD.map(a => a.split(",")).map(b => Row(b(0), b(1),b(2),b(3),b(4)))

    //     
    val structType: StructType = StructType(Seq(
      StructField("id", StringType, true), //    
      StructField("name", StringType, true),
      StructField("rux", StringType, true),
      StructField("score", StringType, true)
    )
    )

    val DataDF: DataFrame = spark.createDataFrame(RowRDD, structType)


//    DataDF.show()
//    DataDF.printSchema()

    //            
    DataDF.createOrReplaceTempView("DataDF")
    spark.sql("select * from DataDF  ").show()

    //  
    sc.stop()
    spark.stop()


  }


}

 
세 번 째: 샘플 류 를 작성 하고 반사 체 제 를 이용 하여 Schema 를 추정 합 니 다.  조회 데이터
package SparkSql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * Created by      on 2020/4/13.
  */
object CreateDFDS_03 {

  //     
//  case class Student(id: String, name: String, sex: String,classname:String,day:String)
//    case class department(id:String,name:String)
  case class tudent_scores(id:String,name:String,rux:String,score:String)



  def main(args: Array[String]): Unit = {


    //    1   SparkSeesion
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("01").getOrCreate()

    //   sc
    val sc: SparkContext = spark.sparkContext

    //    

    val fileRDD: RDD[String] = sc.textFile("E:\\student_scores.txt")

    //                  
    val DataRDD: RDD[Array[String]] = fileRDD.map(x => x.split(","))
    val tudent_scoresRDD: RDD[tudent_scores] = DataRDD.map(a => tudent_scores(a(0), a(1),a(2),a(3)))
    //       
    import spark.implicits._

    //RDD   DF
    val  tudent_scoresDF: DataFrame = tudent_scoresRDD.toDF()


    //            
    tudent_scoresDF.show()
    tudent_scoresDF.printSchema()

    //json      json   
//    studentDF.write.json("  ")



    //       
    tudent_scoresDF.createOrReplaceTempView("tudent_scoresDF")

//    var  sql =
//      """
//        |select  value , count(value) as count
//        |from personDF
//        |group by value
//        |order by count desc
//      """.stripMargin

//    spark.sql(sql).show()

    spark.sql("select * from tudent_scoresDF").show()


    //           
//    personDF.select("name", "age").filter($"age" > 25).show()


    //  
    sc.stop()
    spark.stop()


  }


}

좋은 웹페이지 즐겨찾기