해결 오류:Unable to find encoder for typestored in a Dataset

4067 단어 Spark
Error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
오류: 데이터 집합에 저장된 형식의 인코더를 찾을 수 없습니다.원래 유형(Int, String 등)과 제품 유형(case 클래스)은 spark로 가져옵니다.implicits 지원.다른 유형에 대한 서열화를 지원하며 이후 버전에 추가됩니다.
문제 해결은 다음과 같습니다.
개발 과정에서 HDFS에 저장된 텍스트 파일을 CSV 형식의 파일로 저장하려고 합니다.이전 파일 형식은 다음과 같습니다 (여기 자체 만든 가짜 데이터 사용) 각 열에 대응하는 것은 id,name,no,sp,ep3303 용순 JD8 적벽 심강 5426정범 G58 용암 모종 밤입니다. 아래 형식은 CSV 기본 형식 id,name,no,sp,ep1309, 항경, BKZ, 소관, 호북성 3507, 영풍천, KY7, 하원, 자양 처리 코드는 다음과 같습니다
def main(args: Array[String]) {
	val sparkSession = SparkSession.builder().appName("Spark shell").getOrCreate()
	// 
	val path = "hdfs://master:9000/TestData/aviation9"
	// 
	val savePath = "hdfs://master:9000/TestData/aviation10/"
	val file = sparkSession.read.textFile(path)
	// , 
	val rd = file.map(line => {
	  val arr = line.split("\t")
	  (arr(0), arr(1), arr(2), arr(3), arr(4))
	})
	// DataFrame ,
	val res = rd.toDF("id", "name", "no", "sp", "ep")
	// 
	res.repartition(1).write.mode(SaveMode.Append).format("csv").option("header", true).save(savePath)
}

포장 운행 준비, 포장할 때 항상 위의 오류가 발생합니다
데이터 집합에 저장된 형식의 인코더를 찾을 수 없습니다.원래 유형(Int, String 등)과 제품 유형(case 클래스)은 spark로 가져옵니다.implicits 지원.다른 유형에 대한 서열화를 지원하며 이후 버전에 추가됩니다.그래서 우리 스스로 Dataset에 메타그룹을 추가하는 인코딩 해결 방법이 필요합니다. 처리 데이터 사이에 다음 줄 코드를 추가하는 implicit val matchError = org.apache.spark.sql.Encoders.tuple( Encoders.STRING, Encoders.STRING, Encoders.STRING, Encoders.STRING, Encoders.STRING)
마지막 코드는
def main(args: Array[String]) {
	val sparkSession = SparkSession.builder().appName("Spark shell").getOrCreate()
	// 
	val path = "hdfs://master:9000/TestData/aviation9"
	// 
	val savePath = "hdfs://master:9000/TestData/aviation10/"
	val file = sparkSession.read.textFile(path)
	implicit val matchError = org.apache.spark.sql.Encoders.tuple( Encoders.STRING, Encoders.STRING, Encoders.STRING, Encoders.STRING, Encoders.STRING)
	// , 
	val rd = file.map(line => {
	  val arr = line.split("\t")
	  (arr(0), arr(1), arr(2), arr(3), arr(4))
	})
	// DataFrame ,
	val res = rd.toDF("id", "name", "no", "sp", "ep")
	// 
	res.repartition(1).write.mode(SaveMode.Append).format("csv").option("header", true).save(savePath)
}

이 오류를 처리하는 과정에서 인코더에 기본적인tuple이 최대 5개의 요소인 것을 발견했습니다. 만약 우리가 열 데이터가 많다면 어떻게 처리해야 합니까?
예를 들어 현재의 데이터는 아래와 같다.
5426
평범하다
G56
2013-12-24 17:26:23
용암
활용단어참조
4413
서로 이어받다
TV7
2014-04-09 20:44:25
북표
개원
만약 아래의 코드로 또 같은 오류가 발생한다면
implicit val matchError = org.apache.spark.sql.Encoders.tuple( Encoders.STRING, Encoders.STRING, Encoders.STRING, Encoders.STRING, Encoders.STRING)
Encoders에서 최대 5개의 요소를 지원하는 tuple 때문에 DataSet을 Row 형식으로 처리하고 마지막에 RDD로 바꾸어 데이터와 헤더로 DataFrame을 만들어야 합니다
def main(args: Array[String]) {
    val sparkSession = SparkSession.builder().appName("Spark shell").getOrCreate()
    val fields = "id,name,no,time,sp,ep"
    val path = "hdfs://master:9000/TestData/aviation9"
    val savePath = "hdfs://master:9000/TestData/aviation10/"
    
    val file: Dataset[String] = sparkSession.read.textFile(path)
    
    implicit val matchError = org.apache.spark.sql.Encoders.kryo[Row]
    // Row
    val ds = file.map(x => {
      val arr = x.split("\t")
      Row.fromSeq(arr.toSeq)
    })
    // 
    val field_array = fields.split(",")
    val schema = StructType(field_array.map(fieldName => StructField(fieldName, StringType, true)))
    // DataFrame
    val df = sparkSession.createDataFrame(ds.rdd, schema)
    df.repartition(1).write.mode(SaveMode.Append).format("csv").option("header", true).save(savePath)
}

좋은 웹페이지 즐겨찾기