Spark에서 실행이 빠른 쿼리를 작성하려면 suffle을 줄입니다.
8488 단어 스파크
동기
SparkSQL에서는 같은 결과를 얻을 수 있어도 쿼리의 작성 방법에 의해 느리거나 빠르거나 한다.
어떤 쿼리를 쓰면 빠른가.
각각의 요구되는 사양에 따라 어떤 쿼리를 쓰면 빠른지 바뀌어 온다. 그러나 일정한 정책을 원한다.
정책
노드 간 통신을 줄이는 쿼리를 작성합니다.
Spark에서는 노드간 통신이 연발하는 쿼리를 작성하면 계산 시간에 시간이 걸린다.
아래의 단순히 최소값을 찾는 쿼리를 생각해 보자.
SELECT
min(_id)
FROM
dataframe
SQLServer와 비교해 생각해 본다. SQLServer에서는 보통 1대의 머신으로 동작이 완결한다. 혼자서 뭐든지 해야 한다.
쪽이나 Spark에서는 복수의 머신이 협력해 동작한다. 예를 들어 4명이 _id의 최소값을 열심히 찾아보자. . 마지막으로 4명 각자가 찾아낸 최소값을 비추어 최소값을 찾아낸다.
즉, group by, distinct, except 등, 레코드끼리의 비교가 필요한 SQL문에서는 노드간 통신이 발생하게 된다. 노드간 통신을 연발하면 계산의 지연으로 이어진다.
예제
예를 들면 다음과 같은 매출 데이터가 있다고 하자. 다음 3개의 컬럼이 있는 데이터가 있다.
노드 간 통신을 줄이는 쿼리를 작성합니다.
Spark에서는 노드간 통신이 연발하는 쿼리를 작성하면 계산 시간에 시간이 걸린다.
아래의 단순히 최소값을 찾는 쿼리를 생각해 보자.
SELECT
min(_id)
FROM
dataframe
SQLServer와 비교해 생각해 본다. SQLServer에서는 보통 1대의 머신으로 동작이 완결한다. 혼자서 뭐든지 해야 한다.
쪽이나 Spark에서는 복수의 머신이 협력해 동작한다. 예를 들어 4명이 _id의 최소값을 열심히 찾아보자. . 마지막으로 4명 각자가 찾아낸 최소값을 비추어 최소값을 찾아낸다.
즉, group by, distinct, except 등, 레코드끼리의 비교가 필요한 SQL문에서는 노드간 통신이 발생하게 된다. 노드간 통신을 연발하면 계산의 지연으로 이어진다.
예제
예를 들면 다음과 같은 매출 데이터가 있다고 하자. 다음 3개의 컬럼이 있는 데이터가 있다.
이 데이터에 새롭게 max_price라는 컬럼을 붙이면 좋겠다. max_price는 고객이 구입한 상품 중에서 가장 고액인 것에 1을 붙이면 좋겠다.
실행 환경 만들기
Azure의 HDinsight를 사용했다.
샘플 데이터 작성
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("sample").getOrCreate()
#適当な量のデータを生成
df = spark.range(10000000000)
df = df.withColumn("customer_id", F.expr("id % 1000000"))
df = df.withColumn("price", F.expr("id % 100000"))
df.write.orc("path/to/file")
해결 방법 1: JOIN을 사용하는 방법
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("sample").getOrCreate()
#適当な量のデータを生成
df = spark.range(10000000000)
df = df.withColumn("customer_id", F.expr("id % 1000000"))
df = df.withColumn("price", F.expr("id % 100000"))
df.write.orc("path/to/file")
해결 방법 1: JOIN을 사용하는 방법
코드
df.write.orc("path/to/file")
df.createOrReplaceTempView("df")
maxprice = spark.sql("""
SELECT
max(id) as id,
max(price) as tmp_maxprice
FROM
df
GROUP BY customer_id
""")
maxprice.createOrReplaceTempView("maxpricedf")
final_df = spark.sql("""
SELECT
a.*,
CASE
WHEN a.price = b.tmp_maxprice and a.id = b.id THEN 1
ELSE 0
END as maxprice
FROM
df as a
LEFT JOIN maxpricedf as b
ON
a.id = b.id and a.price = b.tmp_maxprice
""")
final_df.write.orc("wasbs:///jointest")
DAG SCHEDULER 관찰
실행 결과
36분이 걸렸다.
해법 2:
코드
df = spark.read.orc("path/to/file")
df.createOrReplaceTempView("df")
max_df = spark.sql("""
SELECT
*,
max( price ) over(partition by customer_id ) as tmp_max_price
from
df
""")
max_df.createOrReplaceTempView("max_df")
final_df2 = spark.sql("""
SELECT
*,
CASE
WHEN price = tmp_max_price then 1
else 0
END as max_price
FROM
max_df
""").drop("tmp_max_price")
final_df2.write.orc("wasbs:///jointest2")
DAG SCHEDULER 관찰
실행 결과
25분이 걸렸다.
요약
해법 1은 grpup by와 left join의 2회 suffle이 실행되는 명령이 있으므로 DAG의 stage가 4개 있다. (group by로 1스테이지, left join으로 2스테이지 실시)
해법 2는 max ... over 의 1회 suffle가 실행되는 커멘드가 있으므로, DAG의 Stage가 2개로 끝난다.
그 결과, 해법 2 쪽이 11분 정도 단축되었다.
DAG의 stage가 많을수록 suffle이 많이 발생한 것을 나타내고 있다. 같은 결과를 얻을 수 있다면 suffle이 적은 쿼리가 더 빠릅니다.
참고문헌
High Performance Spark 의 Chapeter 5
Reference
이 문제에 관하여(Spark에서 실행이 빠른 쿼리를 작성하려면 suffle을 줄입니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/neppysan/items/7406955f0e1af0d54019
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
High Performance Spark 의 Chapeter 5
Reference
이 문제에 관하여(Spark에서 실행이 빠른 쿼리를 작성하려면 suffle을 줄입니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/neppysan/items/7406955f0e1af0d54019텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)