Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화

TL; DR



작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabledfalse로 설정합니다.

상세히



Apache Spark에는 런타임 통계를 기반으로 최적화를 수행하고 3.2.0부터 기본적으로 활성화되는 Adaptive Query Execution (AQE)이라는 멋진 기능이 있습니다. 이러한 최적화 중 하나는 동적 셔플 파티션 번호 튜닝을 위한 Coalescing Post Shuffle Partitions입니다. 후자는 대부분의 경우 쿼리 성능을 향상시키지만 장기 실행 컴퓨팅 집약적인 작업에는 반대 효과가 있을 수 있습니다. 아래 예를 살펴보십시오.

테스트 데이터:

from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f

from datetime import date, timedelta

cities = ['Amsterdam', 'Oslo', 'Warsaw', 'Copenhagen', 'Prague', 'Helsinki', 'Paris', 'Berlin', 'Dublin', 'Reykjavik']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')

dates = [date(2021, 1, 1) + timedelta(days=x) for x in range(0, 365)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')

df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))
df.show()



+---------+----------+------+
|     city|       day|orders|
+---------+----------+------+
|Amsterdam|2021-01-01|    71|
|Amsterdam|2021-01-02|    83|
|Amsterdam|2021-01-03|    20|
|Amsterdam|2021-01-04|    23|
|Amsterdam|2021-01-05|    89|
|Amsterdam|2021-01-06|    42|
|Amsterdam|2021-01-07|    50|
|Amsterdam|2021-01-08|    47|
|Amsterdam|2021-01-09|    57|
|Amsterdam|2021-01-10|    65|
|Amsterdam|2021-01-11|    59|
|Amsterdam|2021-01-12|    45|
|Amsterdam|2021-01-13|    52|
|Amsterdam|2021-01-14|    62|
|Amsterdam|2021-01-15|    14|
|Amsterdam|2021-01-16|    66|
|Amsterdam|2021-01-17|    18|
|Amsterdam|2021-01-18|    51|
|Amsterdam|2021-01-19|    59|
|Amsterdam|2021-01-20|    53|
+---------+----------+------+
only showing top 20 rows



df.count() # shows 3650


이제 우리는 UDAF 내에서 어리석은 컴퓨팅 집약적 작업을 수행하여 시계열 데이터에 대한 가짜 모델 교육을 수행합니다.

from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
from pyspark.sql.functions import pandas_udf, PandasUDFType

import pandas as pd
import random

schema = StructType([
    StructField("city", StringType()),
    StructField("day", DateType()),
    StructField("orders", LongType()),
    StructField("ml_orders", LongType()),
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_intensive(df):
    fake_fitted_orders = []
    for y in df['orders'].to_list():
        yhat = y
        for i in range(0, 10000):
            yhat += random.randint(-1, 1)

        fake_fitted_orders.append(yhat)

    df = df.assign(ml_orders = pd.Series(fake_fitted_orders))
    return df

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")

result_df = df.groupBy("city").apply(compute_intensive)

result_df에는 spark.sql.shuffle.partitions개의 파티션이 있습니다.

assert(result_df.rdd.getNumPartitions() == int(spark.conf.get("spark.sql.shuffle.partitions")))


제 경우에는 기본적으로 250개의 파티션이 있지만 실제로는 데이터가 거의 없기 때문에 비어 있지 않은 파티션이 10개 있습니다.

num_non_empty_partitions_df = result_df \
    .withColumn('partition_id', f.spark_partition_id()) \
    .groupby('partition_id') \
    .count()

num_non_empty_partitions_df.show()



+------------+-----+
|partition_id|count|
+------------+-----+
|         145|  365|
|          25|  365|
|         108|  365|
|          92|  365|
|           8|  365|
|          40|  365|
|          84|  365|
|         235|  365|
|         152|  365|
|          34|  365|
+------------+-----+

groupby 작업과 관련된 스테이지에는 250개의 작업이 있지만 실제 작업을 수행한 작업은 10개뿐입니다.


바쁜 작업은 동일한 실행자 25에 배치되었으며 10개의 작업 중 8개가 동시에 시작되었습니다. 이 숫자는 CPU 코어의 양에 해당합니다.

이제 AQE coalescePartitions를 활성화하고 어떤 일이 발생하는지 살펴보겠습니다.

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Do the same computation again

AQEShuffleRead 셔플 파티션을 동적으로 통합하는 실행 계획에 연산자가 추가됩니다.



모든 작업은 이제 단 하나의 작업 내에서 수행됩니다.



전반적인 성능 저하를 측정해 보겠습니다.

import time

def with_execution_time(name, fn):
    st = time.time()
    fn()
    et = time.time()
    elapsed_time = et - st
    print("Execution time for '{}' {} seconds".format(name, str(elapsed_time)))

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
with_execution_time('coalescePartitions disabled', lambda: result_df.count())

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
with_execution_time('coalescePartitions enabled', lambda: result_df.count())



Execution time for 'coalescePartitions disabled' 11.219334125518799 seconds
Execution time for 'coalescePartitions enabled' 30.78111720085144 seconds


이제 쿼리는 합성 데이터에서 거의 3배 더 느리게 실행되지만 실제로는 Spark 3.2.0 업그레이드 직후 하나의 분산 교육 파이프라인에서 10배 성능 저하가 관찰되었습니다.

병합 후 셔플 파티션을 비활성화하는 것이 바람직하지 않은 경우 Spark에서 spark.sql.adaptive.coalescePartitions.parallelismFirst , park.sql.adaptive.coalescePartitions.minPartitionSizespark.sql.adaptive.advisoryPartitionSizeInBytes 옵션을 사용하여 동작을 미세 조정할 수 있습니다.

이 기사가 유용한 Spark 팁뿐만 아니라 앱에서 유사한 문제를 디버깅하기 위한 프레임워크를 제공했기를 바랍니다.

좋은 웹페이지 즐겨찾기