RDD를 DataFrame으로 변환하는 두 가지 방법
4014 단어 Spark
개술
Spark SQL은 RDD를 DataFrame으로 변환하는 두 가지 방법을 지원합니다.첫 번째는 반사로 특정 유형의 대상을 포함하는 RDD를 추정하는 모델이다. 반사에 기반한 방식은 더욱 간결한 코드를 제공할 수 있다. 만약에 스파크 프로그램을 작성할 때 schema가 명확해지면 이런 방식을 사용할 수 있다.두 번째 방법은 프로그래밍 가능한 인터페이스를 통해 schema를 구축하고 이를 기존 RDD에 응용하는 것이다.이 방식으로 작성된 코드는 더 지루하지만,colum과 type을 모르는 상황에서 사용할 수 있습니다.
다음 사례의 데이터 집합은 다음과 같다people.txt: Michael, 29
Andy, 30
Justin, 19
2. RDD에서 DataFrame으로 전환하는 사례
1.반사식
Spark SQL의 Scala 인터페이스는 샘플 클래스를 포함하는 RDD를 DataFrame으로 자동 변환할 수 있습니다.샘플 클래스 정의표의 schema.샘플 클래스의 매개 변수 이름을 반사해서 읽고column의 이름으로 비추십시오.package com.company.sparksql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object RDD2DF_m1 {
//
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RDD2DF_m1")
.master("local")
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runRDD2DF(spark)
}
private def runRDD2DF(spark: SparkSession) = {
// , RDD DataFrame
import spark.implicits._
// RDD, DataFrame
val peopleDF = spark.sparkContext
.textFile("file:///E:/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
//
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
//
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
}
}
2.schema를 구축하는 방식을 통해
schema를 구성하여 DataFrame을 생성하려면 주로 3단계가 포함됩니다.
(1) 원시 RDD에서 RDD를 만드는 것(2) StructType을 사용하고, schema를 만드는 것(3)createDataFrame 방법을 통해 RDD에 schema를 적용한다.package com.company.sparksql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object RDD2DF_m2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RDD2DF_m1")
.master("local")
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runRDD2DF(spark)
}
private def runRDD2DF(spark: SparkSession) = {
// , RDD DataFrame
import spark.implicits._
// RDD
val peopleRDD = spark.sparkContext.textFile("file:///E:/people.txt")
//step 1 RDD ROW RDD
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim.toInt))
//step 2 schema
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
//step 3 DF
val peopleDF = spark.createDataFrame(rowRDD, schema)
// DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL
val results = spark.sql("SELECT name FROM people")
//
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수
1. socket 단어 수 통계
TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다.
2. maven 설정
3. 프로그래밍 코드
입력 내용
결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.
Michael, 29
Andy, 30
Justin, 19
1.반사식
Spark SQL의 Scala 인터페이스는 샘플 클래스를 포함하는 RDD를 DataFrame으로 자동 변환할 수 있습니다.샘플 클래스 정의표의 schema.샘플 클래스의 매개 변수 이름을 반사해서 읽고column의 이름으로 비추십시오.
package com.company.sparksql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object RDD2DF_m1 {
//
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RDD2DF_m1")
.master("local")
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runRDD2DF(spark)
}
private def runRDD2DF(spark: SparkSession) = {
// , RDD DataFrame
import spark.implicits._
// RDD, DataFrame
val peopleDF = spark.sparkContext
.textFile("file:///E:/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
//
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
//
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
}
}
2.schema를 구축하는 방식을 통해
schema를 구성하여 DataFrame을 생성하려면 주로 3단계가 포함됩니다.
(1) 원시 RDD에서 RDD를 만드는 것(2) StructType을 사용하고, schema를 만드는 것(3)createDataFrame 방법을 통해 RDD에 schema를 적용한다.
package com.company.sparksql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object RDD2DF_m2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("RDD2DF_m1")
.master("local")
.getOrCreate()
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
runRDD2DF(spark)
}
private def runRDD2DF(spark: SparkSession) = {
// , RDD DataFrame
import spark.implicits._
// RDD
val peopleRDD = spark.sparkContext.textFile("file:///E:/people.txt")
//step 1 RDD ROW RDD
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim.toInt))
//step 2 schema
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
//step 3 DF
val peopleDF = spark.createDataFrame(rowRDD, schema)
// DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL
val results = spark.sql("SELECT name FROM people")
//
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.