Spark 입문(6)-가장 완전한 Sark SQL 연산 자 소개 와 사용(상)
Dataset/Dataframe
라 는 두 가지 유형 은 RDD 와 유사 한 기능 을 제공 했다.즉,사용자 가 map,faltMap,filter 등 고급 연산 자 를 사용 할 수 있 음 을 의미 하 는 동시에 열 을 바탕 으로 하 는 명명 조 회 를 통 해 Dataset/Data Frame 은 두 가지 조작 수 거 리 를 제공 하 는 API 를 제공 했다.이런 API 는 Sark 엔진 에 더 많은 정 보 를 제공 할 수 있다.시스템 은 이러한 정보 에 근거 하여 계산 에 대해 일정한 최 적 화 를 실현 할 수 있다.현재 Spark SQL 은 두 가지 상호작용 방식 을 제공 합 니 다.SQL
Dataset API
(strong-type 유형,untyped 유형 조작)Dataset 은 분포 식 데이터 세트 로 Dataset 는 spark-1.6 에서 새로운 API 를 제시 합 니 다.이 API 는 RDD(strong type,lambda 표현 식 사용)에 구축 되 는 동시에 Spark SQL 이 실행 엔진 에 대한 장점 을 빌려 Dateset 을 사용 하여 일부 데 이 터 를 직접 사용 하 는 것 보다 RDD 연산 자 기능 과 성능 을 향상 시 킬 수 있 습 니 다.그래서 우 리 는 Dateset 이 강화 버 전의 RDD 라 고 볼 수 있다.Dataset 는 JVM 의 배열|집합 대상 을 만 드 는 것 외 에 도 임의의 RDD 를 Dataset 로 변환 할 수 있 습 니 다.
Python does not have the support for the Dataset API.
DataFrames 는 Dataset 의 특수 한 상황 이다.예 를 들 어 Dataset 에 서 는 임의의 대상 형식의 데 이 터 를 Dataset 요소 로 저장 할 수 있 습 니 다.그러나 Dataframe 의 요 소 는 한 가지 유형의 Row 유형 만 있 는데 이런 Row 조 회 는 전통 적 인 데이터 베이스 에서 ResultSet 작업 과 매우 비슷 하 다.Row 형식의 데 이 터 는 Dataframe 의 요 소 를 표시 하기 때문에 데이터베이스 의 한 줄 과 유사 합 니 다.이 줄 의 요 소 는 아래 표 시 를 하거나 column name 을 통 해 접근 할 수 있 습 니 다.Dateset 은 API 의 호환성 이나 지원 도가 그다지 좋 지 않 기 때문에 Dataframe 은 API 차원 에서 지원 하 는 Scala,Java,R,Python 지원 이 비교적 전면적 입 니 다.쾌속 입문
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.11artifactId>
<version>2.4.3version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_2.11artifactId>
<version>2.4.3version>
dependency>
def main(args: Array[String]): Unit = {
//1. SparkSeesion
val spark = SparkSession
.builder()
.appName("wordcount")
.master("local[6]")
.getOrCreate()
//2. spark |
import spark.implicits._
//3. dataset
val lines = Array("this is a demo", "hello spark")
val wordRDD = spark.sparkContext
.makeRDD(lines)
.flatMap(_.split("\\s+"))
.map((_, 1))
val ds: Dataset[(String, Int)] = wordRDD.toDS()
//4. Dataset sql
ds.groupBy($"_1")
//
.sum("_2")
.as("total")
.withColumnRenamed("_1", "word")
.withColumnRenamed("sum(_2)", "total")
.show()
//5. spark
spark.stop()
}
def main(args: Array[String]): Unit = {
//1. SparkSeesion
val spark = SparkSession
.builder()
.appName("wordcount")
.master("local[6]")
.getOrCreate()
//2. spark |
import spark.implicits._
//3. dataset
val lines = Array("this is a demo", "hello spark")
val wordRDD = spark.sparkContext
.makeRDD(lines)
.flatMap(_.split("\\s+"))
.map((_, 1))
val ds: Dataset[(String, Int)] = wordRDD.toDS()
//4. Dataset sql
ds.groupByKey(t => t._1)
.agg(typed.sum[(String, Int)](tuple => tuple._2).name("total"))
.show()
//5. spark
spark.stop()
}
Dataset&DataFrame 실전
Dataset create
Dataset 는 RDD 와 유사 하 며,다른 것 은 Spark SQL 은 Spark RDD(Java/Kryo 직렬 화)에 독립 된 직렬 화 규범 을 가지 고 있 으 며,Encoders 라 고 부른다.SparkRDD 직렬 화 와 달리 Dataset 는 형식 없 는 작업 을 지원 하기 때문에 사용 자 는 작업 의 유형 을 가 져 올 필요 가 없습니다.작업 은 열 이름 일 뿐 입 니 다.Spark SQL 은 연산 자 작업 을 수행 할 때 반 직렬 화 절 차 를 생략 하고 프로그램의 실행 효율 을 높 일 수 있 기 때 문 입 니 다.
case-class
// case-class
case class Person(id: Int, name: String, age: Int, sex: Boolean)
/**
* case-class
*/
val person: Dataset[Person] = List(
Person(1, "zhangsan", 18, true),
Person(2, "lisi", 28, true)
)
.toDS()
person.select($"id", $"name")
.show()
주:
Scala/Java
의 구성원 변수 위치)Serializable
을 실현 하고get、set
방법결과:
+---+--------+
| id| name|
+---+--------+
| 1|zhangsan|
| 2| lisi|
+---+--------+
Tuple(원본)
/**
* tuple
*/
val person: Dataset[(Int, String, Int, Boolean)] = List(
(1, "zhangsan", 18, true),
(2, "lisi", 28, true)
)
.toDS()
person.select($"_1", $"_2").show()
결과:
+---+--------+
| _1| _2|
+---+--------+
| 1|zhangsan|
| 2| lisi|
+---+--------+
json 데이터
데이터:
{"name":" ","age":18}
{"name":" ","age":28}
{"name":" ","age":38}
// long
case class User(name: String, age: Long)
/**
* json
*/
spark.read
.json("file:///Users/mashikang/IdeaProjects/spark_sql/src/main/resources/json/")
.as[User]
.show()
결과:
+---+----+
|age|name|
+---+----+
| 18| |
| 28| |
| 38| |
+---+----+
RDD
/**
* RDD
*/
val userRDD = spark.sparkContext.makeRDD(List((1," ",true,18,15000.0)))
userRDD.toDS().show()
결과:
+---+----+----+---+-------+
| _1| _2| _3| _4| _5|
+---+----+----+---+-------+
| 1| |true| 18|15000.0|
+---+----+----+---+-------+
// case-class
case class Person(id: Int, name: String, age: Int, sex: Boolean)
/**
* RDD case-class
*/
val userRDD = spark.sparkContext.makeRDD(List(Person(1," ",18,true)))
userRDD.toDS().show()
결과:
+---+----+---+----+
| id|name|age| sex|
+---+----+---+----+
| 1| | 18|true|
+---+----+---+----+
Dataframe create
DataFrame 은 열 이름 을 가 진 데이터 세트 로 사용자 가 직접 조작 할 수 있 기 때문에 거의 모든 DataFrame 추천 작업 은
column
이다.사용자 도 하나의 DataFrame 을
형식의 데이터 세트 로 볼 수 있다.json 파일
/**
* json
*/
val dataFrame: DataFrame = spark.read
.json("file:///Users/mashikang/IdeaProjects/spark_sql/src/main/resources/json/")
dataFrame.printSchema()
dataFrame.show()
결과:
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+---+----+
|age|name|
+---+----+
| 18| |
| 28| |
| 38| |
+---+----+
case-class
case class User(id:Int,name:String,sex:Boolean)
/**
* case-class
*/
var userDF=List(User(1," ",true))
.toDF()
userDF.show()
결과:
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| |true|
+---+----+----+
Tuple(원본)
/**
* Tuple
*/
var userDF=List((1," ",true))
.toDF("id","name","sex")
userDF.show()
결과:
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| |true|
+---+----+----+
RDD 변환
/**
* RDD Tuple
*/
var userDF = spark.sparkContext.parallelize(List((1, " ", true)))
.toDF("id", "name", "sex") //
userDF.show()
결과:
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| |true|
+---+----+----+
/**
* RDD case-class
*/
var userDF = spark.sparkContext.parallelize(List(User(1, " ", true)))
.toDF()
userDF.show()
결과:
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| |true|
+---+----+----+
/**
* RDD[Row]
*/
var userRDD: RDD[Row] = spark.sparkContext.parallelize(List(User(1, " ", true)))
.map(u => Row(u.id, u.name, u.sex))
var schema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("sex", BooleanType)
var userDF = spark.createDataFrame(userRDD, schema)
userDF.show()
결과:
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| |true|
+---+----+----+
/**
* RDD case-class
*/
var userRDD: RDD[User] = spark.sparkContext
.makeRDD(List(User(1, " ", true)))
var userDF = spark.createDataFrame(userRDD)
userDF.show()
결과:
+---+----+----+
| id|name| sex|
+---+----+----+
| 1| |true|
+---+----+----+
/**
* RDD Tuple
*/
var userRDD:RDD[(Int,String,Boolean)]=spark.sparkContext
.makeRDD(List((1," ",true)))
var userDF=spark.createDataFrame(userRDD)
userDF.show()
결과:
+---+----+----+
| _1| _2| _3|
+---+----+----+
| 1| |true|
+---+----+----+
DataFrame Operations(Untyped)DataFrame 유형 조작 없 음
printSchema 인쇄 Dataframe 의 표 구조(표 머리)
var df = List((1, " ", true))
.toDF("id", "name", "sex")
df.printSchema()
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- sex: boolean (nullable = false)
show
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 15000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"salary")
.show()
+---+----+------+
| id|name|salary|
+---+----+------+
| 1| zs| 15000|
| 2| ls| 15000|
+---+----+------+
select
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 18000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id",$"name",$"sex",$"dept",$"salary" * 12 as "annual_salary")
.show()
+---+----+-----+----+-------------+
| id|name| sex|dept|annual_salary|
+---+----+-----+----+-------------+
| 1| zs| true| 1| 180000|
| 2| ls|false| 1| 216000|
+---+----+-----+----+-------------+
selectExpr
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 18000)
)
.toDF("id", "name", "sex", "dept", "salary")
// df.select($"id",$"name",$"sex",$"dept",$"salary" * 12 as "annual_salary")
df.selectExpr("id", "name", "sex", "dept", "salary * 12 as annual_salary")
.show()
+---+----+-----+----+-------------+
| id|name| sex|dept|annual_salary|
+---+----+-----+----+-------------+
| 1| zs| true| 1| 180000|
| 2| ls|false| 1| 216000|
+---+----+-----+----+-------------+
withColumn
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 18000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
.withColumn("annual_salary", $"salary" * 12)
.show()
+---+----+-----+----+------+-------------+
| id|name| sex|dept|salary|annual_salary|
+---+----+-----+----+------+-------------+
| 1| zs| true| 1| 15000| 180000|
| 2| ls|false| 1| 18000| 216000|
+---+----+-----+----+------+-------------+
withColumnRenamed
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 18000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
.withColumn("annula_salary", $"salary" * 12)
.withColumnRenamed("dept", "department")
.withColumnRenamed("name", "username")
.show()
+---+--------+-----+----------+------+-------------+
| id|username| sex|department|salary|annula_salary|
+---+--------+-----+----------+------+-------------+
| 1| zs| true| 1| 15000| 180000|
| 2| ls|false| 1| 18000| 216000|
+---+--------+-----+----------+------+-------------+
drop
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 18000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id",$"name",$"sex",$"dept",$"salary")
.withColumn("annula_salary",$"salary" * 12)
.withColumnRenamed("dept","department")
.withColumnRenamed("name","username")
.drop("sex")
.show()
+---+--------+----------+------+-------------+
| id|username|department|salary|annula_salary|
+---+--------+----------+------+-------------+
| 1| zs| 1| 15000| 180000|
| 2| ls| 1| 18000| 216000|
+---+--------+----------+------+-------------+
dropDuplicates
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 1, 18000),
(3, "ww", false, 1, 19000),
(4, "zl", false, 1, 18000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
.dropDuplicates("sex", "salary")
.show()
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 3| ww|false| 1| 19000|
| 1| zs| true| 1| 15000|
| 2| ls|false| 1| 18000|
+---+----+-----+----+------+
orderBy|sort
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 2, 18000),
(3, "ww", false, 2, 14000),
(4, "zl", false, 1, 18000),
(5, "zl", false, 1, 16000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
.orderBy($"salary" desc, $"id" asc)
//.sort($"salary" desc,$"id" asc)
.show()
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 2| ls|false| 2| 18000|
| 4| zl|false| 1| 18000|
| 5| zl|false| 1| 16000|
| 1| zs| true| 1| 15000|
| 3| ww|false| 2| 14000|
+---+----+-----+----+------+
groupBy
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 2, 18000),
(3, "ww", false, 2, 14000),
(4, "zl", false, 1, 18000),
(5, "zl", false, 1, 16000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
.groupBy($"dept")
.max("salary")
.show()
+----+-----------+
|dept|max(salary)|
+----+-----------+
| 1| 18000|
| 2| 18000|
+----+-----------+
유사 한 연산 자 는 max,min,avg|mean,sum,count 도 있 습 니 다.
agg
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 2, 18000),
(3, "ww", false, 2, 14000),
(4, "zl", false, 1, 18000),
(5, "zl", false, 1, 16000)
)
.toDF("id", "name", "sex", "dept", "salary")
import org.apache.spark.sql.functions._
df.select($"id", $"name", $"sex", $"dept", $"salary")
.groupBy($"dept")
.agg(max("salary") as "max_salary", avg("salary") as "avg_salary")
.show()
+----+----------+------------------+
|dept|max_salary| avg_salary|
+----+----------+------------------+
| 1| 18000|16333.333333333334|
| 2| 18000| 16000.0|
+----+----------+------------------+
agg 는 표현 식 도 전달 할 수 있 습 니 다.
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 2, 18000),
(3, "ww", false, 2, 14000),
(4, "zl", false, 1, 18000),
(5, "zl", false, 1, 16000)
)
.toDF("id", "name", "sex", "dept", "salary")
import org.apache.spark.sql.functions._
df.select($"id", $"name", $"sex", $"dept", $"salary")
.groupBy($"dept")
.agg(Map("salary"->"max","id"->"count"))
.show()
+----+-----------+---------+
|dept|max(salary)|count(id)|
+----+-----------+---------+
| 1| 18000| 3|
| 2| 18000| 2|
+----+-----------+---------+
limit
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 2, 18000),
(3, "ww", false, 2, 14000),
(4, "zl", false, 1, 18000),
(5, "zl", false, 1, 16000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
.orderBy($"id" desc)
.limit(4)
.show()
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 5| zl|false| 1| 16000|
| 4| zl|false| 1| 18000|
| 3| ww|false| 2| 14000|
| 2| ls|false| 2| 18000|
+---+----+-----+----+------+
where
var df = List(
(1, "zs", true, 1, 15000),
(2, "ls", false, 2, 18000),
(3, "ww", false, 2, 14000),
(4, "zl", false, 1, 18000),
(5, "win7", false, 1, 16000)
)
.toDF("id", "name", "sex", "dept", "salary")
df.select($"id", $"name", $"sex", $"dept", $"salary")
//where("(name like '%s%' and salary > 15000) or name = 'win7'")
.where(($"name" like "%s%" and $"salary" > 15000) or $"name" === "win7")
.show()
+---+----+-----+----+------+
| id|name| sex|dept|salary|
+---+----+-----+----+------+
| 2| ls|false| 2| 18000|
| 5|win7|false| 1| 16000|
+---+----+-----+----+------+
pivot(줄 전환 열)
var scoreDF = List(
(1, "math", 85),
(1, "chinese", 80),
(1, "english", 90),
(2, "math", 90),
(2, "chinese", 80)
)
.toDF("id", "course", "score")
import org.apache.spark.sql.functions._
//select id,max(case course when 'math' then score else 0 end )as math ,max(case course when 'chinese' then score else 0 end) as chinese from t_course group by id;
scoreDF.selectExpr("id", "case course when 'math' then score else 0 end as math", "case course when 'chinese' then score else 0 end as chinese", "case course when 'english' then score else 0 end as english")
.groupBy("id")
.agg(max($"math"), max($"chinese"), max($"english"))
.show()
+---+---------+------------+------------+
| id|max(math)|max(chinese)|max(english)|
+---+---------+------------+------------+
| 1| 85| 80| 90|
| 2| 90| 80| 0|
+---+---------+------------+------------+
간이 표기 법
var scoreRDD = List(
(1, "math", 85),
(1, "chinese", 80),
(1, "english", 90),
(2, "math", 90),
(2, "chinese", 80)
)
scoreRDD.toDF("id", "course", "score")
.groupBy("id")
//
.pivot("course", scoreRDD.map(t => t._2).distinct)
.max("score")
.show()
+---+----+-------+-------+
| id|math|chinese|english|
+---+----+-------+-------+
| 1| 85| 80| 90|
| 2| 90| 80| null|
+---+----+-------+-------+
na(현재 null 값 으로 바 꾸 기)
var scoreRDD = List(
(1, "math", 85),
(1, "chinese", 80),
(1, "english", 90),
(2, "math", 90),
(2, "chinese", 80),
(3, "math", 100)
)
scoreRDD.toDF("id", "course", "score")
.groupBy("id")
//
.pivot("course", scoreRDD.map(t => t._2).distinct)
.max("score")
.na.fill(Map("english" -> -1, "chinese" -> 0))
.show()
+---+----+-------+-------+
| id|math|chinese|english|
+---+----+-------+-------+
| 1| 85| 80| 90|
| 3| 100| 0| -1|
| 2| 90| 80| -1|
+---+----+-------+-------+
join
case class UserCost(id: Int, category: String, totalCost: Double)
case class User(id: Int, name: String, sex: Boolean, age: Int, salary: Double)
var userCostDF = spark.sparkContext
.parallelize(List(
UserCost(1, " ", 100),
UserCost(1, " ", 100),
UserCost(1, " ", 100),
UserCost(2, " ", 79),
UserCost(2, " ", 80),
UserCost(2, " ", 100)
))
.toDF()
.withColumnRenamed("id", "uid")
val categories = userCostDF
.select("category")
.as[(String)]
.rdd
.distinct
.collect()
var userDF = spark.sparkContext
.parallelize(List(
User(1, " ", true, 18, 15000),
User(2, " ", true, 18, 18000),
User(3, " ", false, 18, 10000)
))
.toDF()
userDF.join(userCostDF, $"id" === $"uid", "left_outer")
.drop("uid")
.groupBy("id", "name")
.pivot($"category", categories)
.sum("totalCost")
.na.fill(0.0)
.show()
+---+------+--------+--------+--------+--------+--------+
| id| name| | | | | |
+---+------+--------+--------+--------+--------+--------+
| 1| | 100.0| 100.0| 100.0| 0.0| 0.0|
| 3| | 0.0| 0.0| 0.0| 0.0| 0.0|
| 2| | 0.0| 100.0| 0.0| 79.0| 80.0|
+---+------+--------+--------+--------+--------+--------+
cube(다 차원)
import org.apache.spark.sql.functions._
List(
(110, 50, 80, 80),
(120, 60, 95, 75),
(120, 50, 96, 70)
)
.toDF("height", "weight", "IQ", "EQ")
.cube($"height", $"weight")
.agg(avg("IQ"), avg("EQ"))
.show()
+------+------+-----------------+-------+
|height|weight| avg(IQ)|avg(EQ)|
+------+------+-----------------+-------+
| 110| 50| 80.0| 80.0|
| 120| null| 95.5| 72.5|
| 120| 60| 95.0| 75.0|
| null| 60| 95.0| 75.0|
| null| null|90.33333333333333| 75.0|
| 120| 50| 96.0| 70.0|
| 110| null| 80.0| 80.0|
| null| 50| 88.0| 75.0|
+------+------+-----------------+-------+
Dataset Operations(Strong typed)데이터 세트 작업-강 한 유형
강 한 타 입 작업 은 모두 타 입 기반 작업 이기 때문에 Spark SQL 작업 은 Dataframe 기반 열 작업 을 추천 하기 때문에 일반적인 경우 에는 추천 하지 않 습 니 다.
val lines = Array("this is a demo", "hello spark")
val wordRDD = spark.sparkContext.makeRDD(lines)
.flatMap(_.split("\\s+"))
.map((_, 1))
import org.apache.spark.sql.expressions.scalalang.typed
val ds: Dataset[(String, Int)] = wordRDD.toDS()
ds.groupByKey(t => t._1)
.agg(typed.sum[(String, Int)](tuple => tuple._2).name("total"))
.filter(tuple => tuple._1.contains("o"))
.show()
+-----+-----+
|value|total|
+-----+-----+
|hello| 1.0|
| demo| 1.0|
+-----+-----+
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.