실시간 빅데이터 처리를 위한 Spark & Flink Online 9) DataFrame

DataFrames

  • DataFrame의 사용법
    • DataFrame의 데이터 타입
    • DataFrame에서 가능한 연산들
    • DataFrame에서의 Aggregation 작업들

DataFrame은 관계형 데이터

  • 한마디로 : 관계형 데이터셋 : RDD + Relation
  • RDD가 함수형 API를 가졌다면 DataFrame은 선언형 API
  • 자동으로 최적화가 가능
  • 타입이 없다

DataFrame의 특징

DataFrame: RDD의 확장판

  • 지연 실행 (Lazy Execution)
  • 분산 저장
  • Immutable
  • 열 (Row) 객체가 있다
  • SQL 쿼리를 실행할 수 있다
  • 스키마를 가질 수 있고 이를 통해 성능을 더욱 최적화 할 수 있다
  • CSV, JSON, Hive 등으로 읽어오거나 변환이 가능하다

DataFrame의 스키마를 확인하는 법

  • dtypes
  • show()
    • 테이블 형태로 데이터를 출력
    • 첫 20개의 열만 보여준다
  • printSchema()
    • 스키마를 트리 형태로 볼 수 있다

복잡한 DataType들

  • ArrayType
  • MapType
  • StructType

DataFrame Operations

SQL과 비슷한 작업이 가능하다

  • Select
  • Where
  • Limit
  • OrderBy
  • GroupBy
  • Join

Select

사용자가 원하는 Column이나 데이터를 추출 하는데 사용

>>> df.select('*').collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
>>> df.select('name', 'age').collect()
[Row(name='Alice', age=2), Row(name='Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name='Alice', age=12), Row(name='Bob', age=15)]

Agg

Aggregate의 약자로, 그룹핑 후 데이터를 하나로 합치는 작업

>>> df.agg({"age": "max"}).collect()
[Row(max(age)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]

GroupBy

사용자가 지정한 Column을 기준으로 데이터를 Grouping하는 작업

>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> sorted(df.groupBy('name').agg({'age':'mean'}).collect())
[Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(df.name).avg().collect())
[Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(['name', df.age]).count().collect())
[Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)]

Join

다른 DataFrame과 사용자가 지정한 Column을 기준으로 합치는 작업

>>> df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name='Bob', height=85)]

Summary

  • DataFrame 개념
  • DataFrame 조작법
  • 다음 강의에선: SQL

좋은 웹페이지 즐겨찾기