spark 성능 최적화 - 데이터 경사

배경:
       같은 품종 의 두 상품 의 싱크로 율 을 계산 합 니 다. 기 존의 데이터 구 조 는 [(cid, int), (pid, int), (features, vector)], 상품 수 4W, 상품 대 8W, 사용 시간 8h 입 니 다.분석 은 데이터 경사 로 인 한 것 입 니 다. 예 를 들 어 cid 1 은 100 개의 상품 이 있 고 cid 2 는 300 개의 상품 이 있 으 며 cid 3 는 1000 개의 상품 이 있 습 니 다. 분류 id 에 따라 상품 의 싱크로 율 을 계산 하기 때문에 cid 3 의 상품 은 하나의 task 에서 모든 작업 이 이 task 운행 이 끝 날 때 까지 기 다 립 니 다.
 
최적화 방안:
       1. 데이터 통신 시간 소모 최적화
       상품 특징 벡터 는 1024 차원 의 벡터 로 약 5K, 4W 상품 은 모두 200 M 으로 그림 의 특징 벡터 를 방송 하여 각 실행 기 executor 에 한 부 를 저장 하여 데이터 통신 비용 을 줄 일 수 있다.그렇지 않 으 면 상품 당 1 천 개 상품 쌍 을 계산 하면 총 데이터 통 신 량 은 4W * 5M = 200 G 로 실제 계산 에 서 는 200 G 를 훌쩍 넘는다.
       2. 데이터 경사 최적화
       연 결 된 상품 에 대해 서 는 (pid 1, pid 2) 에 따라 repartition 해시 분 구 를 합 니 다. 상품 이 유일 하기 때문에 상품 에 따라 shuffle 연산 을 한 후 싱크로 율 계산 을 통 해 각 task 에 골 고루 분포 하여 데이터 경사 문 제 를 제거 합 니 다.
최적화 후 운행 시간 은 1h 30min 이다.
 
코드 는 다음 과 같 습 니 다:
#              ,          
feature_broadcast = fn.broadcast(feature_vector)

#    id,     ,        shuffle  ,      
pid_pairs = cid_pid.join(cid_pid, "cid")\
            .toDF("cid", "pid1", "pid2")\
            .repartition(100, "pid1", "pid2").cache()

#        ,     
@fn.udf(returnType=FloatType())
def cos_sim(a, b):
    #        float  ,  string      (char   2B)
    return float(a.dot(b)/(a.norm(2) * b.norm(2)))

pid_pairs_feature = pid_pairs.join(feature_broadcast, feature_broadcast.products_id == pid_pairs.pid1)\
            .select("cid", "pid1", "pid2", feature_broadcast.features.alias("feature1"))\
            .join(feature_broadcast, feature_broadcast.products_id == pid_pairs.pid2)\
            .select("cid", "pid1", "pid2", "feature1", feature_broadcast.features.alias("feature2"))
pid_pairs_simscore = pid_pairs_feature.withColumn("simscore", cos_sim("feature1", "feature2"))\
            .select("pid1", "pid2", "simscore")

 

좋은 웹페이지 즐겨찾기