코드 에서 pyspark 성능 최적화
우 리 는 일상적인 특징 공사 에서 여러 장의 표를 관련 조작, 즉 join 이 필요 하 다.현재 세 장의 표 A, B, C 가 있 는데 그 중에서 A 표 데이터 의 총 크기 는 약 300 M 이 고 B 표 의 총 데이터 크기 는 약 15G 이 며 C 표 데이터 의 총 크기 는 약 400 G 이다. 현재 의 수 요 는 이 세 장의 표 에 대해 join 을 하 는 것 이다. 어떻게 실현 해 야 합 니까?
일반적인 방법
가장 간단 한 실현 은 그 중의 두 장의 시 계 를 join 한 다음 에 남 은 시 계 를 join 으로 하 는 것 이다. 코드 는 다음 과 같다.
sc = SparkSession\
.builder\
.appName("Test")\
.getOrCreate()
A = sc.sparkContext.textFile("...")
B = sc.sparkContext.textFile("...")
C = sc.sparkContext.textFile("...")
A_rdd = A.map(read_A)
B_rdd = B.map(read_B).reduceByKey(add, 40)
C_rdd = C.map(read_C)
BC_rdd = B_rdd.join(C_rdd).map(merge_BC)
result = A_rdd.join(BC_rdd, 320).mapValues(cal_final_score)
result.saveAsTextFile("...")
위의 코드 기 계 는 간단 하 다. 만약 데이터 의 양 이 적 으 면 겨우 OK 이지 만 현재 의 실제 장면 에 서 는 달 릴 수 없다.필 자 는 M10 (45 핵, 125 G 메모리) 8 대의 기계 가 만 든 클 러 스 터 에서 3 시간 동안 실 행 된 것 을 실측 하고 결국 끊 었 다.로그 에서 던 진 이상 을 보 려 면 먼저 org. apache. spark. shuffle. FetchFailed Exception 과 Lost Task 등 이 있 습 니 다. spark 는 계속 재 시도 하고 그 다음 에 OOM 문제 가 발생 했 습 니 다.이 문 제 를 어떻게 처리 해 야 합 니까?첫 번 째 반응 은 메모리 에 CPU 를 추가 하 는 것 이 분명 하지만 이것 은 지표 가 근본 적 인 문 제 를 해결 하지 못 하 는 방법 에 속 하고 반드시 문 제 를 해결 할 수 있 는 것 은 아니다.우 리 는 먼저 무 너 진 마음 에서 냉정 해 져 야 한다. 코드 에 최적화 할 수 있 는 부분 이 있 는 지 살 펴 봐 야 한다. 우 리 는 이 몇 가지 점 에서 고려 할 수 있다.
세 장의 시계 크기 가 다 르 고 차이 가 크다. 이 코드 는 처음부터 가장 큰 두 장의 시 계 를 join 하려 고 시도 했다. 이것 은 분명히 불필요 하 다.
시계 세 장의 join 이 아 닙 니까? spark 의 join 작업 이 두 번 필요 합 니까?
이 코드 가 작 동 하 는 데 이 터 는 양 이 많 지만 checkpoint 와 같은 조작 이 없 으 면 실패 할 때 반복 적 으로 계산 하여 헛 된 일 을 할 수 있 습 니 다.
단순 한 큰 시계 와 큰 시계 join 이 라면 방법 이 있 을 까?최적화 방법 에서 흔히 볼 수 있 는 최적화 전략 은 다음 과 같다.
그래서 우 리 는 다음 과 같은 전략 을 사용 하여 이 코드 를 최적화 할 수 있다. 큰 표 B 와 작은 표 A 에 대한 join.가장 작은 A broadcast 를 각 노드 에 연결 한 후 B 와 map 에서 join 을 하여 shuffle 을 피 할 수 있 습 니 다.
두 장의 표 join 에 앞서 브 로드 캐 스 트 에서 각 노드 의 A 를 이용 하여 데 이 터 를 filter 하여 미래 shuffle 의 규 모 를 줄인다.
join 작업 의 두 표 에 대해 같은 파 티 션 전략 을 사용 하여 같은 key 를 같은 노드 에 두 고 shuffle 과 네트워크 IO 를 감소 합 니 다.
checkpoint 메커니즘 을 추가 하여 중복 계산 을 피 합 니 다.주의해 야 할 것 은 checkpoint 전에 rdd 에 cache 를 하 는 것 이 좋 습 니 다. 그렇지 않 으 면 두 번 반복 해서 계산 합 니 다.
shuffle 관련 매개 변 수 를 최적화 합 니 다. 구체 적 으로 다음 과 같 습 니 다.
- conf spark. default. parallelism = 2000 \ # shuffle 단계 의 작업 병행 도 를 높이 고 단일 작업 의 메모리 사용량 을 낮 춥 니 다 - conf spark. shuffle. file. buffer = 128 k \ # shuffle 버퍼 크기 향상 - conf spark. yarn. executor. memory Overhead = 1g \ # shuffle 버퍼 크기 향상 - conf spark. shuffle. consolidateFiles = true \ # shuffle write 출력 파일 을 합 칩 니 다.디스크 IO 감소
수 정 된 코드 는 다음 과 같 습 니 다.
sc = SparkSession\
.builder\
.appName("...")\
.getOrCreate()
sc.sparkContext.setCheckpointDir('...')
A = sc.sparkContext.textFile(INPUT_PATH, PARTITIONS)
B = sc.sparkContext.textFile(B_PATH, PARTITIONS)
C = sc.sparkContext.textFile(C_PATH, PARTITIONS)
A_rdd = A.map(read_A).filter(lambda x: x != None)
A_mp = sc.sparkContext.broadcast(A_rdd.collectAsMap())
B_rdd = B.map(read_B).filter(lambda x: x != None and x[0] in A_mp.value)\
.reduceByKey(add, PARTITIONS).map(lambda line: join_A(line, A_mp)).partitionBy(PARTITIONS, lambda k: k).cache()
#printRddInfo(B_rdd)
B_rdd.checkpoint()
C_rdd = C.map(read_C).filter(lambda x: x != None and x[0] in A_mp.value).partitionBy(PARTITIONS, lambda k: k)
# printRddInfo(C_rdd)
result = B_rdd.join(C_rdd, PARTITIONS).mapValues(cal_final_score).filter(lambda x: x != None)
result.saveAsTextFile("...")
B.unpersist()
C.unpersist()
B_rdd.unpersist()
sc.stop()
이상 의 수정 을 거 친 후 이 코드 는 약 7 분 정도 면 실 행 될 수 있다.
큰 시계 도 위 와 같이 맵 을 만 들 수 있 을까요? - join?
이상 의 문 제 는 해 결 된 셈 이지 만 마음속 으로 는 또 하나의 의문 이 있다. 브 로드 캐 스 트 를 한 후에 map join 을 하면 shuffle 을 피 할 수 있다 면 큰 표 (2G 이상) 에 대해 서도 비슷 한 방법 을 사용 할 수 있 을 까?답 은 부정 적 이다. 특히 pyspark 에 게 는 전혀 바람 직 하지 않다.네트워크 IO 를 제외 하고 pyspark 는 모든 파 티 션 에서 하나의 프로 세 스 를 시작 하여 단독으로 대응 하기 때문에 모든 프로 세 스 는 broadcast 의 데이터 에 대해 직렬 화 된 복사 본 을 보존 하고 n 개의 프로 세 스 가 n 부 를 저장 하기 때문에 메모리 자원 이 심각하게 부족 할 수 있 습 니 다.메모리 사용량 은 다음 과 같 습 니 다.
spark 원본 코드 에서 work. py 대응 코드 는 다음 과 같 습 니 다.
결어
spark 의 최 적 화 는 구덩이 가 매우 깊 은 분야 에 속 하고 자주 밟 습 니 다. 좋 은 방법 이 있 으 면 저 를 찾 아와 서 토론 하 세 요 ~
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Azure HDInsight + Microsoft R Server에서 연산 처리 분산Microsoft Azure HDInsight는 Microsoft가 제공하는 Hadoop의 PaaS 서비스로 인프라 주변의 구축 노하우를 몰라도 훌륭한 Hadoop 클러스터를 구축할 수 있는 훌륭한 서비스입니다. 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.