Spark DataFrame transformation 작업 오류 문제

3025 단어 spark
Spark 2.0 이후 릴리즈에서는 DataFrame 객체에 대한 transformation 작업을 수행할 때 컴파일 단계에서 오류가 발생하지 않지만 런타임 시 예외가 발생하여 오류 메시지가 표시됩니다.
:26: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       df.map(_.get(0))
             ^

대략적인 뜻은 현재의 데이터 집합에서 encoder를 찾을 수 없다는 것이다. 이 문제는 나를 오랫동안 괴롭혔다. 예전의 1.X버전에서는 맵/filter 등의 조작을 모두 사용할 수 있었고 인터넷에서도 관련 자료를 찾지 못했다.
그리고 홈페이지를 한 바퀴 돌아보니 홈페이지가 데이터셋에 대한 정의에 다음과 같은 설명이 있었다.
Operations available on Datasets are divided into transformations and actions. Transformations are the ones that produce new Datasets, and actions are the ones that trigger computation and return results. Example transformations include map, filter, select, and aggregate (groupBy). Example actions count, show, or writing data out to file systems.

Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation required to produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a physical plan for efficient execution in a parallel and distributed manner. To explore the logical plan as well as optimized physical plan, use the explain function.

To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark's internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function.

There are typically two ways to create a Dataset. The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession.


   val people = spark.read.parquet("...").as[Person]  // Scala
   Dataset people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java

위에서 말한 바와 같이 특정 유형의 대상을 효과적으로 사용하기 위해서는 인코더를 만들고 인코더는 지정한 유형 T를spark 내부의 유형 시스템에 비추어야 한다.이 점은 공식적으로 데이터 프레임 구조를 최적화하고 성능을 향상시켜 달라진 것으로 보인다.spark 내부에 비치는 유형은 RDD를 가리키는 것으로 인코더를 이용하여 데이터 프레임의 요소를 rdd로 변환한 다음transformation 조작을 하는 것이다.예는 홈페이지에서도 주었다. 즉, as[T] 조작이다.
DataSet 유형에 lazy 함수 변수 rdd가 있는 소스를 살펴보았습니다. 인터페이스는 다음과 같습니다.
lazy val rdd : org.apache.spark.rdd.RDD[T] = { /* compiled code */ }
이게 아마 1.x가 운행할 수 있는 곳, 2.x 예상이 lazy로 바뀐 것은 수동으로 Encoder를 지정하여 계산 성능을 향상시키도록 격려하기 위해서다.인코더를 지정하고 싶지 않으면 rdd 익명 함수를 직접 호출하여 RDD[T] 대상으로 전환한 다음transformation 작업을 할 수 있습니다.

좋은 웹페이지 즐겨찾기