Spark에서 실행이 빠른 쿼리를 작성하려면 suffle을 줄입니다.

8488 단어 스파크

동기



SparkSQL에서는 같은 결과를 얻을 수 있어도 쿼리의 작성 방법에 의해 느리거나 빠르거나 한다.
어떤 쿼리를 쓰면 빠른가.

각각의 요구되는 사양에 따라 어떤 쿼리를 쓰면 빠른지 바뀌어 온다. 그러나 일정한 정책을 원한다.

정책



노드 간 통신을 줄이는 쿼리를 작성합니다.
Spark에서는 노드간 통신이 연발하는 쿼리를 작성하면 계산 시간에 시간이 걸린다.

아래의 단순히 최소값을 찾는 쿼리를 생각해 보자.
SELECT 
min(_id)
FROM
dataframe

SQLServer와 비교해 생각해 본다. SQLServer에서는 보통 1대의 머신으로 동작이 완결한다. 혼자서 뭐든지 해야 한다.
쪽이나 Spark에서는 복수의 머신이 협력해 동작한다. 예를 들어 4명이 _id의 최소값을 열심히 찾아보자. . 마지막으로 4명 각자가 찾아낸 최소값을 비추어 최소값을 찾아낸다.



즉, group by, distinct, except 등, 레코드끼리의 비교가 필요한 SQL문에서는 노드간 통신이 발생하게 된다. 노드간 통신을 연발하면 계산의 지연으로 이어진다.

예제



예를 들면 다음과 같은 매출 데이터가 있다고 하자. 다음 3개의 컬럼이 있는 데이터가 있다.
  • id: 일련 번호. 하나의 레코드에 독특한 값이 있습니다. 더블리는 없다.
  • customer_id: 고객 번호. 고객마다 독특한 값이 흔들리고 있다.
  • price: 고객이 구입한 가격.

  • 이 데이터에 새롭게 max_price라는 컬럼을 붙이면 좋겠다. max_price는 고객이 구입한 상품 중에서 가장 고액인 것에 1을 붙이면 좋겠다.



    실행 환경 만들기



    Azure의 HDinsight를 사용했다.

    샘플 데이터 작성


    
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.appName("sample").getOrCreate()
    #適当な量のデータを生成
    df = spark.range(10000000000)
    df = df.withColumn("customer_id", F.expr("id % 1000000"))
    df = df.withColumn("price", F.expr("id % 100000"))
    df.write.orc("path/to/file")
    

    해결 방법 1: JOIN을 사용하는 방법


  • 레코드에서 customer_id로 그룹으로 나누어 가장 price가 큰 레코드만을 추출
  • 원래 레코드에 join

  • 코드


    df.write.orc("path/to/file")
    df.createOrReplaceTempView("df")
    
    maxprice = spark.sql("""
    SELECT
    max(id) as id,
    max(price) as tmp_maxprice
    FROM
    df
    GROUP BY customer_id
    """)
    
    maxprice.createOrReplaceTempView("maxpricedf")
    
    final_df = spark.sql("""
    SELECT 
    a.*,
    CASE 
        WHEN a.price = b.tmp_maxprice and a.id = b.id THEN 1
        ELSE 0
    END as maxprice
    FROM
    df as a
    LEFT JOIN maxpricedf as b
    ON 
    a.id = b.id and a.price = b.tmp_maxprice
    """)
    
    final_df.write.orc("wasbs:///jointest")
    

    DAG SCHEDULER 관찰





    실행 결과





    36분이 걸렸다.

    해법 2:


  • window 함수를 사용하여 price가 가장 높은 것의 열을 작성
  • price가 가장 높은 것에 플래그를 붙인다

  • 코드


    df = spark.read.orc("path/to/file")
    df.createOrReplaceTempView("df")
    
    max_df = spark.sql("""
    SELECT 
    *,
    max( price ) over(partition by customer_id ) as tmp_max_price
    from 
    df
    """)
    
    max_df.createOrReplaceTempView("max_df")
    
    final_df2 = spark.sql("""
    SELECT 
    *,
    CASE 
        WHEN price = tmp_max_price then 1 
        else 0
    END as max_price
    FROM 
    max_df
    """).drop("tmp_max_price")
    
    final_df2.write.orc("wasbs:///jointest2")
    

    DAG SCHEDULER 관찰





    실행 결과





    25분이 걸렸다.

    요약



    해법 1은 grpup by와 left join의 2회 suffle이 실행되는 명령이 있으므로 DAG의 stage가 4개 있다. (group by로 1스테이지, left join으로 2스테이지 실시)

    해법 2는 max ... over 의 1회 suffle가 실행되는 커멘드가 있으므로, DAG의 Stage가 2개로 끝난다.

    그 결과, 해법 2 쪽이 11분 정도 단축되었다.

    DAG의 stage가 많을수록 suffle이 많이 발생한 것을 나타내고 있다. 같은 결과를 얻을 수 있다면 suffle이 적은 쿼리가 더 빠릅니다.

    참고문헌



    High Performance Spark 의 Chapeter 5

    좋은 웹페이지 즐겨찾기