자바 와 scala 가 Spark RDD 를 DataFrame 로 전환 하 는 두 가지 방법 소결
프로젝트 다음 에 student.txt 파일 을 새로 만 듭 니 다.내용 은:
1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18
실현Java 버 전:
1.먼저 student 의 Bean 대상 을 새로 만 들 고 직렬 화 와 toString()방법 을 실현 합 니 다.구체 적 인 코드 는 다음 과 같 습 니 다.
package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
String sid;
String sname;
int sage;
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public String getSname() {
return sname;
}
public void setSname(String sname) {
this.sname = sname;
}
public int getSage() {
return sage;
}
public void setSage(int sage) {
this.sage = sage;
}
@Override
public String toString() {
return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
}
}
2.변환,구체 적 인 코드 는 다음 과 같다.
package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
reflectTransform(spark);//Java
dynamicTransform(spark);//
}
/**
* Java
* @param spark
*/
private static void reflectTransform(SparkSession spark)
{
JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
JavaRDD<Student> rowRDD = source.map(line -> {
String parts[] = line.split(",");
Student stu = new Student();
stu.setSid(parts[0]);
stu.setSname(parts[1]);
stu.setSage(Integer.valueOf(parts[2]));
return stu;
});
Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
df.select("sid", "sname", "sage").
coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
}
/**
*
* @param spark
*/
private static void dynamicTransform(SparkSession spark)
{
JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
JavaRDD<Row> rowRDD = source.map( line -> {
String[] parts = line.split(",");
String sid = parts[0];
String sname = parts[1];
int sage = Integer.parseInt(parts[2]);
return RowFactory.create(
sid,
sname,
sage
);
});
ArrayList<StructField> fields = new ArrayList<StructField>();
StructField field = null;
field = DataTypes.createStructField("sid", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("sname", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
}
}
scala 버 전:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
case class Student(id:Int,name:String,age:Int)
def main(args:Array[String])
{
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
import spark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/**
* Java
* @param spark
*/
private def reflectCreate(spark:SparkSession):Unit={
import spark.implicits._
val stuRDD=spark.sparkContext.textFile("student2.txt")
//toDF()
val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
//stuDf.select("id","name","age").write.text("result") //
stuDf.printSchema()
stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //
nameDf.show()
}
/**
*
* @param spark
*/
private def dynamicCreate(spark:SparkSession):Unit={
val stuRDD=spark.sparkContext.textFile("student.txt")
import spark.implicits._
val schemaString="id,name,age"
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema=StructType(fields)
val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
val stuDf=spark.createDataFrame(rowRDD, schema)
stuDf.printSchema()
val tmpView=stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //
nameDf.show()
}
}
주:1.위의 코드 는 모두 테스트 에 통과 되 었 고 테스트 환경 은 spark 2.1.0,jdk 1.8 입 니 다.
2.이 코드 는 spark 2.0 이전 버 전에 적용 되 지 않 습 니 다.
이상 의 자바 와 scala 가 Spark RDD 를 DataFrame 로 전환 하 는 두 가지 방법 인 소결 은 바로 편집장 이 여러분 에 게 공유 한 모든 내용 입 니 다.여러분 께 참고 가 될 수 있 고 많은 응원 부 탁 드 리 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.