Apache Spark New Pandas API 탐색

39293 단어 pandassparkpython


Apache Spark™ 3.2 릴리스는 pandas API 발표와 함께 제공되었습니다.
이제 Pandas API에 익숙한 데이터 과학자/분석가는 Spark가 제공하는 것과 동일한 구문 및 분산 컴퓨팅 기능을 활용할 수 있습니다.

Spark에서 Pandas를 사용하는 이유는 무엇입니까?

• 최적화된 단일 시스템 성능: Databricks benchmark에 따르면 Spark의 Pandas는 Spark 엔진의 최적화 덕분에 단일 시스템에서도 Pandas보다 성능이 뛰어납니다.
• 단일 코드 기반: 데이터 과학 및 데이터 엔지니어 팀이 데이터 분석/데이터 변환 단계를 위한 통합 코드 기반으로 더 쉽게 협업



Source : https://databricks.com/fr/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html



이 블로그 게시물에서는 Spark Pandas API를 살펴보고 이를 표준 PySpark DataFrame 구문과 비교할 것입니다.

pyspark.pandas 데이터프레임 생성

from pyspark.pandas import DataFrame

pdf = DataFrame({
                   'foo': ['one', 'one', 'one', 'two', 'two','two'],
                   'bar': ['A', 'B', 'C', 'A', 'B','C'],
                   'baz': [1, 2, 3, 4, 5,6],
                   'zoo': ['x', 'y', 'z', 'q', 'w','t'],
                   "too": [[11,22],[22,78],[8,6],[78,2],[12],[2]]})

print(pdf)



   foo bar  baz zoo       too
0  one   A    1   x  [11, 22]
1  one   B    2   y  [22, 78]
2  one   C    3   z    [8, 6]
3  two   A    4   q   [78, 2]
4  two   B    5   w      [12]
5  two   C    6   t       [2]


apyspark.pandas.DataFrame를 표준pyspark.sql.dataframe.DataFrame으로 변환 가능

sdf = pdf.to_spark()
sdf.show()



+---+---+---+---+--------+
|foo|bar|baz|zoo|     too|
+---+---+---+---+--------+
|one|  A|  1|  x|[11, 22]|
|one|  B|  2|  y|[22, 78]|
|one|  C|  3|  z|  [8, 6]|
|two|  A|  4|  q| [78, 2]|
|two|  B|  5|  w|    [12]|
|two|  C|  6|  t|     [2]|
+---+---+---+---+--------+

print(type(pdf))
print(type(sdf))

<class 'pyspark.pandas.frame.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>



데이터 조작



칼럼의 생성



간단한 조건으로 열을 생성하는 것은 두 API 모두에서 매우 간단합니다.

# Standard PySpark Syntax
sdf.withColumn("new",sdf.baz + 1).show()



+---+---+---+---+--------+---+
|foo|bar|baz|zoo|     too|new|
+---+---+---+---+--------+---+
|one|  A|  1|  x|[11, 22]|  2|
|one|  B|  2|  y|[22, 78]|  3|
|one|  C|  3|  z|  [8, 6]|  4|
|two|  A|  4|  q| [78, 2]|  5|
|two|  B|  5|  w|    [12]|  6|
|two|  C|  6|  t|     [2]|  7|
+---+---+---+---+--------+---+



# Pandas Spark API
print(pdf.assign(new = pdf.baz + 1))



   foo bar  baz zoo       too  new
0  one   A    1   x  [11, 22]    2
1  one   B    2   y  [22, 78]    3
2  one   C    3   z    [8, 6]    4
3  two   A    4   q   [78, 2]    5
4  two   B    5   w      [12]    6
5  two   C    6   t       [2]    7


변환 작업



폭발의 예를 들어보겠습니다.
열을 분해하는 것은 목록과 같은 열의 각 요소가 행이 되고 인덱스 값이 있는 경우 복제되는 간단한 작업입니다.

#pyspark syntax
from pyspark.sql.functions import explode
sdf.withColumn("too", explode("too")).show()



+---+---+---+---+---+
|foo|bar|baz|zoo|too|
+---+---+---+---+---+
|one|  A|  1|  x| 11|
|one|  A|  1|  x| 22|
|one|  B|  2|  y| 22|
|one|  B|  2|  y| 78|
|one|  C|  3|  z|  8|
|one|  C|  3|  z|  6|
|two|  A|  4|  q| 78|
|two|  A|  4|  q|  2|
|two|  B|  5|  w| 12|
|two|  C|  6|  t|  2|
+---+---+---+---+---+




print(pdf.explode("too"))



   foo bar  baz zoo  too
0  one   A    1   x   11
0  one   A    1   x   22
1  one   B    2   y   22
1  one   B    2   y   78
2  one   C    3   z    8
2  one   C    3   z    6
3  two   A    4   q   78
3  two   A    4   q    2
4  two   B    5   w   12
5  two   C    6   t    2


두 API 모두 간단하지만 다음 예제에서는 상황이 달라집니다.

사용자 정의 변환 작업



PySpark UDF(일명 사용자 정의 함수):

조건 등에 따라 특정 변형을 적용하려는 경우 Spark udf를 만들어야 할 수 있습니다.

사용자 정의 함수는 PySpark SQL udf() 개체 내부에 래핑된 Python 함수 구문입니다. 먼저 등록한 다음 대상 DataFrame에서 호출합니다.

PySpark에서 가장 비용이 많이 드는 작업이지만 사용자 지정 변환 작업을 수행하려면 UDF를 만들어야 할 수 있습니다.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create function
def encode(baz,too,bar):
  baz = int(baz)
  return ("Yes" if (baz % 2 == 0) 
  and ( len(too) == 2) 
  and (bar in ["A","B"]) else "No") 

# Register the function as UDF
encode_udf = udf(encode, StringType())

# Call it on the target Dataframe
sdf.withColumn("encode", encode_udf("baz","too","bar")).show()

+---+---+---+---+--------+------+
|foo|bar|baz|zoo|     too|encode|
+---+---+---+---+--------+------+
|one|  A|  1|  x|[11, 22]|    No|
|one|  B|  2|  y|[22, 78]|   Yes|
|one|  C|  3|  z|  [8, 6]|    No|
|two|  A|  4|  q| [78, 2]|   Yes|
|two|  B|  5|  w|    [12]|    No|
|two|  C|  6|  t|     [2]|    No|
+---+---+---+---+--------+------+



동일한 변환에는 Pandas Syntax에서 더 적은 코딩이 필요합니다.

pdf["encode"] = (pdf.apply(lambda x: "Yes" if (x["baz"] % 2 == 0 ) and ( len(x["too"]) == 2 ) 
          and (x["bar"]) in ["A","B"]
           else "No",axis=1)).to_list()

print(pdf)

   foo bar  baz zoo       too encode
0  one   A    1   x  [11, 22]     No
1  one   B    2   y  [22, 78]    Yes
2  one   C    3   z    [8, 6]     No
3  two   A    4   q   [78, 2]    Yes
4  two   B    5   w      [12]     No
5  two   C    6   t       [2]     No


창 기능



Spark Window 함수는 입력 행 범위에서 순위, 행 번호 등과 같은 결과를 계산하는 데 사용됩니다.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("foo").orderBy("baz")
sdf.withColumn("rank_", row_number().over(windowSpec)).show()



+---+---+---+---+--------+-----+
|foo|bar|baz|zoo|     too|rank_|
+---+---+---+---+--------+-----+
|one|  A|  1|  x|[11, 22]|    1|
|one|  B|  2|  y|[22, 78]|    2|
|one|  C|  3|  z|  [8, 6]|    3|
|two|  A|  4|  q| [78, 2]|    1|
|two|  B|  5|  w|    [12]|    2|
|two|  C|  6|  t|     [2]|    3|
+---+---+---+---+--------+-----+


동일한 변환에는 Pandas 구문에서 단순groupby이 필요합니다.

rank_df = pdf.copy()
rank_df["rank_"] = pdf.groupby(by=['foo'])['baz'].rank().to_list()
print(rank_df)



   foo bar  baz zoo       too  rank_
0  one   A    1   x  [11, 22]    1.0
1  one   B    2   y  [22, 78]    2.0
2  one   C    3   z    [8, 6]    3.0
3  two   A    4   q   [78, 2]    1.0
4  two   B    5   w      [12]    2.0
5  two   C    6   t       [2]    3.0


데이터 시각화

Pandas의 플롯에 대한 기본 백엔드는 matplotlib이지만 Spark의 Pandas는 정적 이미지뿐만 아니라 대화형 플롯을 플롯하는 기능을 제공하므로 plotly과 함께 제공됩니다.

pdf["plot_col"] = [2, 1, 2, 2, 0, 0]
pdf.plot_col.hist()


좋은 웹페이지 즐겨찾기