Spark 세 가지 방식 으로 데 이 터 를 조회 합 니 다.
필드: 학급 번호, 학급 이름, 입학 날짜, 소속 학과 중국어 이름
, , , ,
170401011001 , , ,0101,467
170401011002 , , ,0101,518
170401011003 , , ,0101,509
170401011004 , , ,0101,508
170401011005 , , ,0101,494
170401011006 , , ,0101,500
170401011007 , , ,0101,490
170401011008 , , ,0101,466
세 가지 방식 을 사용 하 다
제 1 종: 열 이름 을 지정 하여 Schema 추가 조회 데이터
package SparkSql
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, sql}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Created by on 2020/4/13.
*
* Schema
*/
object CreateDFDS {
def main(args: Array[String]): Unit = {
// 1 SparkSeesion
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("01").getOrCreate()
// sc
val sc: SparkContext = spark.sparkContext
//
val fileRDD: RDD[String] = sc.textFile("E:\\student_scores.txt")
//
val DataRDD: RDD[(String, String,String, String,String)] = fileRDD.map(a => a.split(",")).map(b => (b(0), b(1),b(2),b(3),b(4)))
// 222
import spark.implicits._
val DataDF: DataFrame = DataRDD.toDF()
DataDF.show()
DataDF.printSchema()
//
sc.stop()
spark.stop()
}
}
두 번 째: StructType 을 통 해 Schema 지정 조회 데이터
package SparkSql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, _}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Created by on 2020/4/13.
*
*
*
*
*
*/
object CreateDFDS_02 {
def main(args: Array[String]): Unit = {
// 1 SparkSeesion
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("01").getOrCreate()
// sc
val sc: SparkContext = spark.sparkContext
//
val fileRDD: RDD[String] = sc.textFile("E:\\student_scores.txt")
//
val RowRDD: RDD[Row] = fileRDD.map(a => a.split(",")).map(b => Row(b(0), b(1),b(2),b(3),b(4)))
//
val structType: StructType = StructType(Seq(
StructField("id", StringType, true), //
StructField("name", StringType, true),
StructField("rux", StringType, true),
StructField("score", StringType, true)
)
)
val DataDF: DataFrame = spark.createDataFrame(RowRDD, structType)
// DataDF.show()
// DataDF.printSchema()
//
DataDF.createOrReplaceTempView("DataDF")
spark.sql("select * from DataDF ").show()
//
sc.stop()
spark.stop()
}
}
세 번 째: 샘플 류 를 작성 하고 반사 체 제 를 이용 하여 Schema 를 추정 합 니 다. 조회 데이터
package SparkSql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Created by on 2020/4/13.
*/
object CreateDFDS_03 {
//
// case class Student(id: String, name: String, sex: String,classname:String,day:String)
// case class department(id:String,name:String)
case class tudent_scores(id:String,name:String,rux:String,score:String)
def main(args: Array[String]): Unit = {
// 1 SparkSeesion
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("01").getOrCreate()
// sc
val sc: SparkContext = spark.sparkContext
//
val fileRDD: RDD[String] = sc.textFile("E:\\student_scores.txt")
//
val DataRDD: RDD[Array[String]] = fileRDD.map(x => x.split(","))
val tudent_scoresRDD: RDD[tudent_scores] = DataRDD.map(a => tudent_scores(a(0), a(1),a(2),a(3)))
//
import spark.implicits._
//RDD DF
val tudent_scoresDF: DataFrame = tudent_scoresRDD.toDF()
//
tudent_scoresDF.show()
tudent_scoresDF.printSchema()
//json json
// studentDF.write.json(" ")
//
tudent_scoresDF.createOrReplaceTempView("tudent_scoresDF")
// var sql =
// """
// |select value , count(value) as count
// |from personDF
// |group by value
// |order by count desc
// """.stripMargin
// spark.sql(sql).show()
spark.sql("select * from tudent_scoresDF").show()
//
// personDF.select("name", "age").filter($"age" > 25).show()
//
sc.stop()
spark.stop()
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.