Spark 입문(6)-가장 완전한 Sark SQL 연산 자 소개 와 사용(상)

121122 단어 Spark빅 데이터
Spark SQL
  • Datasets&DataFrames 소개
  • 빠 른 입문
  • Dataset&DataFrame 실전
  • Dataset create
  • case-class
  • Tuple(원조)
  • json 데이터
  • RDD
  • Dataframe create
  • json 파일
  • case-class
  • Tuple(원조)
  • RDD 변환
  • DataFrame Operations(Untyped)DataFrame 무 유형 조작
  • printSchema 인쇄 Dataframe 의 표 구조(표 두)
  • show
  • select
  • selectExpr
  • withColumn
  • withColumnRenamed
  • drop
  • dropDuplicates
  • orderBy|sort
  • groupBy
  • agg
  • limit
  • where
  • pivot(행 열 전환)
  • na(현재 null 의 값 으로 교체)
  • join
  • cube(다 차원)
  • Dataset Oprations(Strong typed)데이터 세트 작업-강 한 유형
  • Spark SQL 은 RDD 위 에 구 축 된 ETL(Extract Transform Load)도구 입 니 다.SparkSQL 은 RDD 위 에서 추상 화Dataset/Dataframe라 는 두 가지 유형 은 RDD 와 유사 한 기능 을 제공 했다.즉,사용자 가 map,faltMap,filter 등 고급 연산 자 를 사용 할 수 있 음 을 의미 하 는 동시에 열 을 바탕 으로 하 는 명명 조 회 를 통 해 Dataset/Data Frame 은 두 가지 조작 수 거 리 를 제공 하 는 API 를 제공 했다.이런 API 는 Sark 엔진 에 더 많은 정 보 를 제공 할 수 있다.시스템 은 이러한 정보 에 근거 하여 계산 에 대해 일정한 최 적 화 를 실현 할 수 있다.현재 Spark SQL 은 두 가지 상호작용 방식 을 제공 합 니 다.
  • SQL
  • Dataset API(strong-type 유형,untyped 유형 조작)
  • Datasets&DataFrames 소개
    Dataset 은 분포 식 데이터 세트 로 Dataset 는 spark-1.6 에서 새로운 API 를 제시 합 니 다.이 API 는 RDD(strong type,lambda 표현 식 사용)에 구축 되 는 동시에 Spark SQL 이 실행 엔진 에 대한 장점 을 빌려 Dateset 을 사용 하여 일부 데 이 터 를 직접 사용 하 는 것 보다 RDD 연산 자 기능 과 성능 을 향상 시 킬 수 있 습 니 다.그래서 우 리 는 Dateset 이 강화 버 전의 RDD 라 고 볼 수 있다.Dataset 는 JVM 의 배열|집합 대상 을 만 드 는 것 외 에 도 임의의 RDD 를 Dataset 로 변환 할 수 있 습 니 다.Python does not have the support for the Dataset API.DataFrames 는 Dataset 의 특수 한 상황 이다.예 를 들 어 Dataset 에 서 는 임의의 대상 형식의 데 이 터 를 Dataset 요소 로 저장 할 수 있 습 니 다.그러나 Dataframe 의 요 소 는 한 가지 유형의 Row 유형 만 있 는데 이런 Row 조 회 는 전통 적 인 데이터 베이스 에서 ResultSet 작업 과 매우 비슷 하 다.Row 형식의 데 이 터 는 Dataframe 의 요 소 를 표시 하기 때문에 데이터베이스 의 한 줄 과 유사 합 니 다.이 줄 의 요 소 는 아래 표 시 를 하거나 column name 을 통 해 접근 할 수 있 습 니 다.Dateset 은 API 의 호환성 이나 지원 도가 그다지 좋 지 않 기 때문에 Dataframe 은 API 차원 에서 지원 하 는 Scala,Java,R,Python 지원 이 비교적 전면적 입 니 다.
    쾌속 입문
  • 도입 의존
  • <dependency> 
    	<groupId>org.apache.sparkgroupId> 
    	<artifactId>spark-core_2.11artifactId>
        <version>2.4.3version> 
    dependency> 
    <dependency> 
    	<groupId>org.apache.sparkgroupId> 
    	<artifactId>spark-sql_2.11artifactId> 
    	<version>2.4.3version> 
    dependency>
    
  • 문자 통계 만 들 기(untyped)
  • def main(args: Array[String]): Unit = {
    		//1.  SparkSeesion
    		val spark = SparkSession
    			.builder()
    			.appName("wordcount")
    			.master("local[6]")
    			.getOrCreate()
    
    		//2.  spark      |  
    		import spark.implicits._
    
    		//3.  dataset
    		val lines = Array("this is a demo", "hello spark")
    		val wordRDD = spark.sparkContext
    			.makeRDD(lines)
    			.flatMap(_.split("\\s+"))
    			.map((_, 1))
    
    		val ds: Dataset[(String, Int)] = wordRDD.toDS()
    		
    		//4. Dataset  sql    
    		ds.groupBy($"_1")
    			//     
    			.sum("_2")
    			.as("total")
    			.withColumnRenamed("_1", "word")
    			.withColumnRenamed("sum(_2)", "total")
    			.show()
    		//5.  spark
    		spark.stop()
    }
    
  • 문자 통계 만 들 기(strong typed)
  • def main(args: Array[String]): Unit = {
    		//1.  SparkSeesion 
    		val spark = SparkSession
    			.builder()
    			.appName("wordcount")
    			.master("local[6]")
    			.getOrCreate()
    
    		//2.  spark      |  
    
    		import spark.implicits._
    
    		//3.  dataset 
    		val lines = Array("this is a demo", "hello spark")
    		val wordRDD = spark.sparkContext
    			.makeRDD(lines)
    			.flatMap(_.split("\\s+"))
    			.map((_, 1))
    		val ds: Dataset[(String, Int)] = wordRDD.toDS()
    
    		//4. Dataset  sql     
    		ds.groupByKey(t => t._1)
    			.agg(typed.sum[(String, Int)](tuple => tuple._2).name("total"))
    			.show()
    
    		//5.  spark 
    		spark.stop()
    }
    

    Dataset&DataFrame 실전
    Dataset create
    Dataset 는 RDD 와 유사 하 며,다른 것 은 Spark SQL 은 Spark RDD(Java/Kryo 직렬 화)에 독립 된 직렬 화 규범 을 가지 고 있 으 며,Encoders 라 고 부른다.SparkRDD 직렬 화 와 달리 Dataset 는 형식 없 는 작업 을 지원 하기 때문에 사용 자 는 작업 의 유형 을 가 져 올 필요 가 없습니다.작업 은 열 이름 일 뿐 입 니 다.Spark SQL 은 연산 자 작업 을 수행 할 때 반 직렬 화 절 차 를 생략 하고 프로그램의 실행 효율 을 높 일 수 있 기 때 문 입 니 다.
    case-class
    // case-class
    case class Person(id: Int, name: String, age: Int, sex: Boolean)
    
    /**
    * case-class
    */
    val person: Dataset[Person] = List(
       Person(1, "zhangsan", 18, true),
       Person(2, "lisi", 28, true)
    )
       .toDS()
    person.select($"id", $"name")
       .show()
    

    주:
  • 암시 적 변환 import spark.implicits.이 문 구 는 spark 대상 의 문 구 를 가 져 온 후에 두 어야 합 니 다
  • case class Person(id:Int,name:String,age:Int,sex:Boolean)의 정 의 는 방법의 역할 영역 밖(즉Scala/Java의 구성원 변수 위치)
  • 에 두 어야 합 니 다.
  • case 류 가 자바 로 작 성 된 경우 자바 bean 은Serializable을 실현 하고get、set방법
  • 을 추가 해 야 합 니 다.
    결과:
    +---+--------+
    | id|    name|
    +---+--------+
    |  1|zhangsan|
    |  2|    lisi|
    +---+--------+
    

    Tuple(원본)
    /**
     * tuple
     */
    val person: Dataset[(Int, String, Int, Boolean)] = List(
    	(1, "zhangsan", 18, true),
    	(2, "lisi", 28, true)
    )
    	.toDS()
    person.select($"_1", $"_2").show()
    

    결과:
    +---+--------+
    | _1|      _2|
    +---+--------+
    |  1|zhangsan|
    |  2|    lisi|
    +---+--------+
    

    json 데이터
    데이터:
    {"name":"  ","age":18}
    {"name":"  ","age":28}
    {"name":"  ","age":38}
    
    //     long  
    case class User(name: String, age: Long)
    
    /**
     * json  
     */
    spark.read
    	.json("file:///Users/mashikang/IdeaProjects/spark_sql/src/main/resources/json/")
    	.as[User]
    	.show()
    

    결과:
    +---+----+
    |age|name|
    +---+----+
    | 18|  |
    | 28|  |
    | 38|  |
    +---+----+
    

    RDD
  • Tuple(원조)
  • /**
     * RDD   
     */
    val userRDD = spark.sparkContext.makeRDD(List((1,"  ",true,18,15000.0)))
    userRDD.toDS().show()
    

    결과:
    +---+----+----+---+-------+
    | _1|  _2|  _3| _4|     _5|
    +---+----+----+---+-------+
    |  1|  |true| 18|15000.0|
    +---+----+----+---+-------+
    
  • case-class
  • // case-class
    case class Person(id: Int, name: String, age: Int, sex: Boolean)
    
    /**
     * RDD case-class
     */
    val userRDD = spark.sparkContext.makeRDD(List(Person(1,"  ",18,true)))
    userRDD.toDS().show()
    

    결과:
    +---+----+---+----+
    | id|name|age| sex|
    +---+----+---+----+
    |  1|  | 18|true|
    +---+----+---+----+
    

    Dataframe create
    DataFrame 은 열 이름 을 가 진 데이터 세트 로 사용자 가 직접 조작 할 수 있 기 때문에 거의 모든 DataFrame 추천 작업 은column이다.사용자 도 하나의 DataFrame 을 형식의 데이터 세트 로 볼 수 있다.
    json 파일
    /**
     * json
     */
    val dataFrame: DataFrame = spark.read
    	.json("file:///Users/mashikang/IdeaProjects/spark_sql/src/main/resources/json/")
    dataFrame.printSchema()
    dataFrame.show()
    

    결과:
    root
     |-- age: long (nullable = true)
     |-- name: string (nullable = true)
    
    +---+----+
    |age|name|
    +---+----+
    | 18|  |
    | 28|  |
    | 38|  |
    +---+----+
    

    case-class
    case class User(id:Int,name:String,sex:Boolean)
    
    /**
     * case-class
     */
    var userDF=List(User(1,"  ",true))
    	.toDF()
    userDF.show()
    

    결과:
    +---+----+----+
    | id|name| sex|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    

    Tuple(원본)
    /**
     * Tuple
     */
    var userDF=List((1,"  ",true))
    	.toDF("id","name","sex") 
    userDF.show()
    

    결과:
    +---+----+----+
    | id|name| sex|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    

    RDD 변환
  • Tuple
  • /**
     * RDD Tuple
     */
    var userDF = spark.sparkContext.parallelize(List((1, "  ", true)))
    	.toDF("id", "name", "sex") //     
    userDF.show()
    

    결과:
    +---+----+----+
    | id|name| sex|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    
  • case-class
  • /**
     * RDD case-class
     */
    var userDF = spark.sparkContext.parallelize(List(User(1, "  ", true)))
    	.toDF()
    userDF.show()
    

    결과:
    +---+----+----+
    | id|name| sex|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    
  • RDD[Row]유형 이 DataFrame
  • 로 전환
    /**
     * RDD[Row]  
     */
    var userRDD: RDD[Row] = spark.sparkContext.parallelize(List(User(1, "  ", true)))
    	.map(u => Row(u.id, u.name, u.sex))
    var schema = new StructType()
    	.add("id", IntegerType)
    	.add("name", StringType)
    	.add("sex", BooleanType)
    var userDF = spark.createDataFrame(userRDD, schema)
    userDF.show()
    

    결과:
    +---+----+----+
    | id|name| sex|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    
  • RDD case-class
  • /**
     * RDD case-class
     */
    var userRDD: RDD[User] = spark.sparkContext
    	.makeRDD(List(User(1, "  ", true)))
    var userDF = spark.createDataFrame(userRDD)
    userDF.show()
    

    결과:
    +---+----+----+
    | id|name| sex|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    
  • RDD Tuple
  • /**
     * RDD Tuple
     */
    var userRDD:RDD[(Int,String,Boolean)]=spark.sparkContext
    	.makeRDD(List((1,"  ",true)))
    var userDF=spark.createDataFrame(userRDD)
    userDF.show()
    

    결과:
    +---+----+----+
    | _1|  _2|  _3|
    +---+----+----+
    |  1|  |true|
    +---+----+----+
    

    DataFrame Operations(Untyped)DataFrame 유형 조작 없 음
    printSchema 인쇄 Dataframe 의 표 구조(표 머리)
    var df = List((1, "  ", true))
    	.toDF("id", "name", "sex")
    df.printSchema()
    
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- sex: boolean (nullable = false)
    

    show
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 15000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    	
    df.select($"id", $"name", $"salary")
    	.show()
    
    +---+----+------+
    | id|name|salary|
    +---+----+------+
    |  1|  zs| 15000|
    |  2|  ls| 15000|
    +---+----+------+
    

    select
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 18000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    df.select($"id",$"name",$"sex",$"dept",$"salary" * 12 as "annual_salary")
    	.show()
    
    +---+----+-----+----+-------------+
    | id|name|  sex|dept|annual_salary|
    +---+----+-----+----+-------------+
    |  1|  zs| true|   1|       180000|
    |  2|  ls|false|   1|       216000|
    +---+----+-----+----+-------------+
    

    selectExpr
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 18000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    //   df.select($"id",$"name",$"sex",$"dept",$"salary" * 12 as "annual_salary")
    df.selectExpr("id", "name", "sex", "dept", "salary * 12 as annual_salary")
    	.show()
    
    +---+----+-----+----+-------------+
    | id|name|  sex|dept|annual_salary|
    +---+----+-----+----+-------------+
    |  1|  zs| true|   1|       180000|
    |  2|  ls|false|   1|       216000|
    +---+----+-----+----+-------------+
    

    withColumn
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 18000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.withColumn("annual_salary", $"salary" * 12)
    	.show()
    
    +---+----+-----+----+------+-------------+
    | id|name|  sex|dept|salary|annual_salary|
    +---+----+-----+----+------+-------------+
    |  1|  zs| true|   1| 15000|       180000|
    |  2|  ls|false|   1| 18000|       216000|
    +---+----+-----+----+------+-------------+
    

    withColumnRenamed
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 18000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.withColumn("annula_salary", $"salary" * 12)
    	.withColumnRenamed("dept", "department")
    	.withColumnRenamed("name", "username")
    	.show()
    
    +---+--------+-----+----------+------+-------------+
    | id|username|  sex|department|salary|annula_salary|
    +---+--------+-----+----------+------+-------------+
    |  1|      zs| true|         1| 15000|       180000|
    |  2|      ls|false|         1| 18000|       216000|
    +---+--------+-----+----------+------+-------------+
    

    drop
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 18000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    
    df.select($"id",$"name",$"sex",$"dept",$"salary")
    	.withColumn("annula_salary",$"salary" * 12)
    	.withColumnRenamed("dept","department")
    	.withColumnRenamed("name","username")
    	.drop("sex")
    	.show()
    
    +---+--------+----------+------+-------------+
    | id|username|department|salary|annula_salary|
    +---+--------+----------+------+-------------+
    |  1|      zs|         1| 15000|       180000|
    |  2|      ls|         1| 18000|       216000|
    +---+--------+----------+------+-------------+
    

    dropDuplicates
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 1, 18000),
    	(3, "ww", false, 1, 19000),
    	(4, "zl", false, 1, 18000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.dropDuplicates("sex", "salary")
    	.show()
    
    +---+----+-----+----+------+
    | id|name|  sex|dept|salary|
    +---+----+-----+----+------+
    |  3|  ww|false|   1| 19000|
    |  1|  zs| true|   1| 15000|
    |  2|  ls|false|   1| 18000|
    +---+----+-----+----+------+
    

    orderBy|sort
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 2, 18000),
    	(3, "ww", false, 2, 14000),
    	(4, "zl", false, 1, 18000),
    	(5, "zl", false, 1, 16000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    	
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.orderBy($"salary" desc, $"id" asc)
    	//.sort($"salary" desc,$"id" asc)
    	.show()
    
    +---+----+-----+----+------+
    | id|name|  sex|dept|salary|
    +---+----+-----+----+------+
    |  2|  ls|false|   2| 18000|
    |  4|  zl|false|   1| 18000|
    |  5|  zl|false|   1| 16000|
    |  1|  zs| true|   1| 15000|
    |  3|  ww|false|   2| 14000|
    +---+----+-----+----+------+
    

    groupBy
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 2, 18000),
    	(3, "ww", false, 2, 14000),
    	(4, "zl", false, 1, 18000),
    	(5, "zl", false, 1, 16000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    	
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.groupBy($"dept")
    	.max("salary")
    	.show()
    
    +----+-----------+
    |dept|max(salary)|
    +----+-----------+
    |   1|      18000|
    |   2|      18000|
    +----+-----------+
    

    유사 한 연산 자 는 max,min,avg|mean,sum,count 도 있 습 니 다.
    agg
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 2, 18000),
    	(3, "ww", false, 2, 14000),
    	(4, "zl", false, 1, 18000),
    	(5, "zl", false, 1, 16000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    	
    import org.apache.spark.sql.functions._
    
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.groupBy($"dept")
    	.agg(max("salary") as "max_salary", avg("salary") as "avg_salary")
    	.show()
    
    +----+----------+------------------+
    |dept|max_salary|        avg_salary|
    +----+----------+------------------+
    |   1|     18000|16333.333333333334|
    |   2|     18000|           16000.0|
    +----+----------+------------------+
    

    agg 는 표현 식 도 전달 할 수 있 습 니 다.
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 2, 18000),
    	(3, "ww", false, 2, 14000),
    	(4, "zl", false, 1, 18000),
    	(5, "zl", false, 1, 16000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    	
    import org.apache.spark.sql.functions._
    
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.groupBy($"dept")
    	.agg(Map("salary"->"max","id"->"count"))
    	.show()
    
    +----+-----------+---------+
    |dept|max(salary)|count(id)|
    +----+-----------+---------+
    |   1|      18000|        3|
    |   2|      18000|        2|
    +----+-----------+---------+
    

    limit
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 2, 18000),
    	(3, "ww", false, 2, 14000),
    	(4, "zl", false, 1, 18000),
    	(5, "zl", false, 1, 16000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	.orderBy($"id" desc)
    	.limit(4)
    	.show()
    
    +---+----+-----+----+------+
    | id|name|  sex|dept|salary|
    +---+----+-----+----+------+
    |  5|  zl|false|   1| 16000|
    |  4|  zl|false|   1| 18000|
    |  3|  ww|false|   2| 14000|
    |  2|  ls|false|   2| 18000|
    +---+----+-----+----+------+
    

    where
    var df = List(
    	(1, "zs", true, 1, 15000),
    	(2, "ls", false, 2, 18000),
    	(3, "ww", false, 2, 14000),
    	(4, "zl", false, 1, 18000),
    	(5, "win7", false, 1, 16000)
    )
    	.toDF("id", "name", "sex", "dept", "salary")
    	
    df.select($"id", $"name", $"sex", $"dept", $"salary")
    	//where("(name like '%s%' and salary > 15000) or name = 'win7'") 
    	.where(($"name" like "%s%" and $"salary" > 15000) or $"name" === "win7")
    	.show()
    
    +---+----+-----+----+------+
    | id|name|  sex|dept|salary|
    +---+----+-----+----+------+
    |  2|  ls|false|   2| 18000|
    |  5|win7|false|   1| 16000|
    +---+----+-----+----+------+
    

    pivot(줄 전환 열)
    var scoreDF = List(
    	(1, "math", 85),
    	(1, "chinese", 80),
    	(1, "english", 90),
    	(2, "math", 90),
    	(2, "chinese", 80)
    )
    	.toDF("id", "course", "score")
    	
    import org.apache.spark.sql.functions._
    
    //select id,max(case course when 'math' then score else 0 end )as math ,max(case course when 'chinese' then score else 0 end) as chinese from t_course group by id;
    scoreDF.selectExpr("id", "case course when 'math' then score else 0 end as math", "case course when 'chinese' then score else 0 end as chinese", "case course when 'english' then score else 0 end as english")
    	.groupBy("id")
    	.agg(max($"math"), max($"chinese"), max($"english"))
    	.show()
    
    +---+---------+------------+------------+
    | id|max(math)|max(chinese)|max(english)|
    +---+---------+------------+------------+
    |  1|       85|          80|          90|
    |  2|       90|          80|           0|
    +---+---------+------------+------------+
    

    간이 표기 법
    var scoreRDD = List(
    	(1, "math", 85),
    	(1, "chinese", 80),
    	(1, "english", 90),
    	(2, "math", 90),
    	(2, "chinese", 80)
    )
    scoreRDD.toDF("id", "course", "score")
    	.groupBy("id") 
    	// 					    				   
    	.pivot("course", scoreRDD.map(t => t._2).distinct)
    	.max("score")
    	.show()
    
    +---+----+-------+-------+
    | id|math|chinese|english|
    +---+----+-------+-------+
    |  1|  85|     80|     90|
    |  2|  90|     80|   null|
    +---+----+-------+-------+
    

    na(현재 null 값 으로 바 꾸 기)
    var scoreRDD = List(
    	(1, "math", 85),
    	(1, "chinese", 80),
    	(1, "english", 90),
    	(2, "math", 90),
    	(2, "chinese", 80),
    	(3, "math", 100)
    )
    
    scoreRDD.toDF("id", "course", "score")
    	.groupBy("id")
    	// 					    				   
    	.pivot("course", scoreRDD.map(t => t._2).distinct)
    	.max("score")
    	.na.fill(Map("english" -> -1, "chinese" -> 0))
    	.show()
    
    +---+----+-------+-------+
    | id|math|chinese|english|
    +---+----+-------+-------+
    |  1|  85|     80|     90|
    |  3| 100|      0|     -1|
    |  2|  90|     80|     -1|
    +---+----+-------+-------+
    

    join
    case class UserCost(id: Int, category: String, totalCost: Double)
    case class User(id: Int, name: String, sex: Boolean, age: Int, salary: Double)
    
    var userCostDF = spark.sparkContext
    	.parallelize(List(
    		UserCost(1, "    ", 100),
    		UserCost(1, "    ", 100),
    		UserCost(1, "    ", 100),
    		UserCost(2, "    ", 79),
    		UserCost(2, "    ", 80),
    		UserCost(2, "    ", 100)
    	))
    	.toDF()
    	.withColumnRenamed("id", "uid")
    val categories = userCostDF
    	.select("category")
    	.as[(String)]
    	.rdd
    	.distinct
    	.collect()
    var userDF = spark.sparkContext
    	.parallelize(List(
    		User(1, "   ", true, 18, 15000),
    		User(2, "   ", true, 18, 18000),
    		User(3, "   ", false, 18, 10000)
    	))
    	.toDF()
    userDF.join(userCostDF, $"id" === $"uid", "left_outer")
    	.drop("uid")
    	.groupBy("id", "name")
    	.pivot($"category", categories)
    	.sum("totalCost")
    	.na.fill(0.0)
    	.show()
    
    +---+------+--------+--------+--------+--------+--------+
    | id|  name|     |     |     |     |      |
    +---+------+--------+--------+--------+--------+--------+
    |  1|   |   100.0|   100.0|   100.0|     0.0|     0.0|
    |  3|   |     0.0|     0.0|     0.0|     0.0|     0.0|
    |  2|   |     0.0|   100.0|     0.0|    79.0|    80.0|
    +---+------+--------+--------+--------+--------+--------+
    

    cube(다 차원)
    import org.apache.spark.sql.functions._
    List(
    	(110, 50, 80, 80),
    	(120, 60, 95, 75),
    	(120, 50, 96, 70)
    )
    	.toDF("height", "weight", "IQ", "EQ")
    	.cube($"height", $"weight")
    	.agg(avg("IQ"), avg("EQ"))
    	.show()
    
    +------+------+-----------------+-------+
    |height|weight|          avg(IQ)|avg(EQ)|
    +------+------+-----------------+-------+
    |   110|    50|             80.0|   80.0|
    |   120|  null|             95.5|   72.5|
    |   120|    60|             95.0|   75.0|
    |  null|    60|             95.0|   75.0|
    |  null|  null|90.33333333333333|   75.0|
    |   120|    50|             96.0|   70.0|
    |   110|  null|             80.0|   80.0|
    |  null|    50|             88.0|   75.0|
    +------+------+-----------------+-------+
    

    Dataset Operations(Strong typed)데이터 세트 작업-강 한 유형
    강 한 타 입 작업 은 모두 타 입 기반 작업 이기 때문에 Spark SQL 작업 은 Dataframe 기반 열 작업 을 추천 하기 때문에 일반적인 경우 에는 추천 하지 않 습 니 다.
    val lines = Array("this is a demo", "hello spark")
    val wordRDD = spark.sparkContext.makeRDD(lines)
    	.flatMap(_.split("\\s+"))
    	.map((_, 1))
    	
    import org.apache.spark.sql.expressions.scalalang.typed
    
    val ds: Dataset[(String, Int)] = wordRDD.toDS()
    ds.groupByKey(t => t._1)
    	.agg(typed.sum[(String, Int)](tuple => tuple._2).name("total"))
    	.filter(tuple => tuple._1.contains("o"))
    	.show()
    
    +-----+-----+
    |value|total|
    +-----+-----+
    |hello|  1.0|
    | demo|  1.0|
    +-----+-----+
    

    좋은 웹페이지 즐겨찾기