Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화
TL; DR
작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때
spark.sql.adaptive.coalescePartitions.enabled
를 false
로 설정합니다.상세히
Apache Spark에는 런타임 통계를 기반으로 최적화를 수행하고 3.2.0부터 기본적으로 활성화되는 Adaptive Query Execution (AQE)이라는 멋진 기능이 있습니다. 이러한 최적화 중 하나는 동적 셔플 파티션 번호 튜닝을 위한 Coalescing Post Shuffle Partitions입니다. 후자는 대부분의 경우 쿼리 성능을 향상시키지만 장기 실행 컴퓨팅 집약적인 작업에는 반대 효과가 있을 수 있습니다. 아래 예를 살펴보십시오.
테스트 데이터:
from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f
from datetime import date, timedelta
cities = ['Amsterdam', 'Oslo', 'Warsaw', 'Copenhagen', 'Prague', 'Helsinki', 'Paris', 'Berlin', 'Dublin', 'Reykjavik']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')
dates = [date(2021, 1, 1) + timedelta(days=x) for x in range(0, 365)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')
df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))
df.show()
+---------+----------+------+
| city| day|orders|
+---------+----------+------+
|Amsterdam|2021-01-01| 71|
|Amsterdam|2021-01-02| 83|
|Amsterdam|2021-01-03| 20|
|Amsterdam|2021-01-04| 23|
|Amsterdam|2021-01-05| 89|
|Amsterdam|2021-01-06| 42|
|Amsterdam|2021-01-07| 50|
|Amsterdam|2021-01-08| 47|
|Amsterdam|2021-01-09| 57|
|Amsterdam|2021-01-10| 65|
|Amsterdam|2021-01-11| 59|
|Amsterdam|2021-01-12| 45|
|Amsterdam|2021-01-13| 52|
|Amsterdam|2021-01-14| 62|
|Amsterdam|2021-01-15| 14|
|Amsterdam|2021-01-16| 66|
|Amsterdam|2021-01-17| 18|
|Amsterdam|2021-01-18| 51|
|Amsterdam|2021-01-19| 59|
|Amsterdam|2021-01-20| 53|
+---------+----------+------+
only showing top 20 rows
df.count() # shows 3650
이제 우리는 UDAF 내에서 어리석은 컴퓨팅 집약적 작업을 수행하여 시계열 데이터에 대한 가짜 모델 교육을 수행합니다.
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import random
schema = StructType([
StructField("city", StringType()),
StructField("day", DateType()),
StructField("orders", LongType()),
StructField("ml_orders", LongType()),
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_intensive(df):
fake_fitted_orders = []
for y in df['orders'].to_list():
yhat = y
for i in range(0, 10000):
yhat += random.randint(-1, 1)
fake_fitted_orders.append(yhat)
df = df.assign(ml_orders = pd.Series(fake_fitted_orders))
return df
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
result_df = df.groupBy("city").apply(compute_intensive)
result_df
에는 spark.sql.shuffle.partitions
개의 파티션이 있습니다.assert(result_df.rdd.getNumPartitions() == int(spark.conf.get("spark.sql.shuffle.partitions")))
제 경우에는 기본적으로 250개의 파티션이 있지만 실제로는 데이터가 거의 없기 때문에 비어 있지 않은 파티션이 10개 있습니다.
num_non_empty_partitions_df = result_df \
.withColumn('partition_id', f.spark_partition_id()) \
.groupby('partition_id') \
.count()
num_non_empty_partitions_df.show()
+------------+-----+
|partition_id|count|
+------------+-----+
| 145| 365|
| 25| 365|
| 108| 365|
| 92| 365|
| 8| 365|
| 40| 365|
| 84| 365|
| 235| 365|
| 152| 365|
| 34| 365|
+------------+-----+
groupby
작업과 관련된 스테이지에는 250개의 작업이 있지만 실제 작업을 수행한 작업은 10개뿐입니다.바쁜 작업은 동일한 실행자 25에 배치되었으며 10개의 작업 중 8개가 동시에 시작되었습니다. 이 숫자는 CPU 코어의 양에 해당합니다.
이제 AQE coalescePartitions를 활성화하고 어떤 일이 발생하는지 살펴보겠습니다.
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Do the same computation again
AQEShuffleRead
셔플 파티션을 동적으로 통합하는 실행 계획에 연산자가 추가됩니다.모든 작업은 이제 단 하나의 작업 내에서 수행됩니다.
전반적인 성능 저하를 측정해 보겠습니다.
import time
def with_execution_time(name, fn):
st = time.time()
fn()
et = time.time()
elapsed_time = et - st
print("Execution time for '{}' {} seconds".format(name, str(elapsed_time)))
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
with_execution_time('coalescePartitions disabled', lambda: result_df.count())
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
with_execution_time('coalescePartitions enabled', lambda: result_df.count())
Execution time for 'coalescePartitions disabled' 11.219334125518799 seconds
Execution time for 'coalescePartitions enabled' 30.78111720085144 seconds
이제 쿼리는 합성 데이터에서 거의 3배 더 느리게 실행되지만 실제로는 Spark 3.2.0 업그레이드 직후 하나의 분산 교육 파이프라인에서 10배 성능 저하가 관찰되었습니다.
병합 후 셔플 파티션을 비활성화하는 것이 바람직하지 않은 경우 Spark에서
spark.sql.adaptive.coalescePartitions.parallelismFirst
, park.sql.adaptive.coalescePartitions.minPartitionSize
및 spark.sql.adaptive.advisoryPartitionSizeInBytes
옵션을 사용하여 동작을 미세 조정할 수 있습니다.이 기사가 유용한 Spark 팁뿐만 아니라 앱에서 유사한 문제를 디버깅하기 위한 프레임워크를 제공했기를 바랍니다.
Reference
이 문제에 관하여(Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/aplotnikov/spark-tip-disable-adaptive-query-execution-aqe-for-compute-intensive-tasks-5bl0텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)