Spark 예제 의 배열 정렬

2828 단어 빅 데이터
배열 정렬 은 흔히 볼 수 있 는 동작 이다.비교 정렬 알고리즘 을 바탕 으로 그 성능 하한 선 은 O (nlog (n) 이지 만 분포 식 환경 에서 우 리 는 병행 하여 성능 을 향상 시 킬 수 있다.Spark 에서 배열 정렬 의 실현 을 보 여 주 며 성능 을 분석 하 는 동시에 성능 향상 의 원인 을 찾 아 보 았 다.
공식 예시
import sys
 
from pyspark import SparkContext
 
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: sort "
        exit(-1)
    sc = SparkContext(appName="PythonSort")
    lines = sc.textFile(sys.argv[1], 1)
    sortedCount = lines.flatMap(lambda x: x.split(' ')) \
        .map(lambda x: (int(x), 1)) \
        .sortByKey(lambda x: x)
    # This is just a demo on how to bring all the sorted data back to a single node.
    # In reality, we wouldn't want to collect all the data to the driver node.
    output = sortedCount.collect()
    for (num, unitcount) in output:
        print num
 
    sc.stop()

소유 하 다.
Spark
입구
SparkContext
실례그것 은 전체 이다.
Spark
환경의 추상분포 환경의 복잡성 을 고려 하여 프로 그래 밍 할 때 데이터 세트 의 구분 과 어느 기계 위 에서 계산 해 야 하 는 지 를 고려 해 야 한다 면
Spark
가용성 이 크게 떨 어 질 것 이다.
SparkContext
인 스 턴 스 는 전체 환경의 입구 이다. 자동차의 조작 인터페이스 처럼 프로 그래 밍 할 때 해당 하 는 인 터 페 이 스 를 호출 하여 데 이 터 를 전송 하면 분포 식 시스템 에서 배포 하고 집행 하 며 가능 한 한 최대 의 성능 을 달성 할 수 있다.프로그램의 마지막 에 그것 을 호출 해 야 한다.
stop
방법 으로 환경 을 끊다.
방법 textFile 은 텍스트 파일 을 읽 고 Spark 환경 에서 해당 하 는 RDD 집합 을 만 듭 니 다.이 데이터 세트 는 lines 변수 에 저 장 됩 니 다.방법 flatMap 은 map 와 달리 map 는 key, value 의 맞습니다. 얻 은 RDD 집합 과 해시 표 가 비슷 합 니 다.플랫 맵 의 출력 결 과 는 배열 이다.이 배열 은 모든 요소 에 들 어 오 는 lambda 함 수 를 호출 한 결과 입 니 다.이것 은 들 어 오 는 lambda 함수 가 0 개 이상 의 결 과 를 되 돌려 줄 수 있 음 을 의미 합 니 다. 예 를 들 어:
>>> rdd = sc.parallelize([2, 3, 4])
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
[1, 1, 1, 2, 2, 3]

이어서 호출 하 다
map
방법 은 모든 배열 을
(key,value)
마지막
sortByKey
정렬 하 겠 습 니 다.차례 를 정 한 후 얻다
RDD
모으다
sortedCount
호출
collect
방법 은 집합 안의 데 이 터 를 되 돌려 줄 수 있다.
이 프로그램 은 코드 에서 정렬 알고리즘 을 구현 하 는 것 이 아니 라 Spark 가 제공 하 는 정렬 인터페이스 sortByKey 만 호출 한 것 임 을 쉽게 알 수 있다.방법 sortByKey 의 밑바닥 실현 은 다음 과 같다.
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)      
	: RDD[(K, V)] =  {    
	val part = new RangePartitioner(numPartitions, self, ascending)    
	new ShuffledRDD[K, V, V](self, part)      
		.setKeyOrdering(if (ascending) ordering else ordering.reverse)  
}

그 사고방식 은 데 이 터 를 내 고 싶 지 않 은 구간 으로 나 누 는 것 이다.
RDD
파 티 션 의 개수) 그리고 각 구간 에 정렬 알고리즘 을 단독으로 사용 합 니 다.이 실현 은 매우 우아 해서 두 줄 의 코드 만 사용 했다.구역 의 개 수 를 가정 하면
m,
데이터 집합의 크기 는
n
그 시간의 복잡 도 는
O((n/m)log(n))

Spark
파 티 션 하나 가 실 행 됩 니 다.
task
최종 효 과 는 병행 집행 과 같다.
본인 블 로그 방문 하여 더 많은 정보 얻 기: magic 01

좋은 웹페이지 즐겨찾기