python 실전 spark(5)상용 API

16172 단어 빅 데이터
상용 API
Spark 공식 문서
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)
RDD 저장 소 를 제어 하 는 데 사용 합 니 다.모든 StorageLevel 기록:메모리 사용 여부,메모리 가 부족 하면 RDD 를 디스크 에 저장 할 지,자바 에 지정 한 직렬 화 형식 으로 데 이 터 를 메모리 에 저장 할 지,여러 노드 에서 RDD 파 티 션 을 복사 할 지 여부.자주 사용 되 는 메모리 등급 의 정적 상수,MEMORY 도 포함 되 어 있 습 니 다.ONLY。데 이 터 는 항상 Python 에서 직렬 화 되 기 때문에 모든 상수 에서 직렬 화 형식 을 사용 합 니 다.
class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None) SparkContext.broadcast()을 사용 하여 방송 변 수 를 만 듭 니 다.통과 값.value접근 값.
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()
>>> large_broadcast = sc.broadcast(range(10000))

1.destroy()이 방송 변수 와 관련 된 모든 데이터 와 메타 데 이 터 를 삭제 합 니 다.일단 방송 변수 가 소각 되면 다시 사용 할 수 없다.이 방법 은 삭제 가 완 료 될 때 까지 차단 합 니 다.2.dump(value,f)3.load(file)4.load_from_path(path)5.unpersist(blocking=False)실행 기 에서 이 방송의 캐 시 복사 본 을 삭제 합 니 다.호출 후 방송 을 사용 하려 면 모든 실행 프로그램 에 다시 보 내야 한다.인자:blocking-비 지구 화 6 이 완 료 될 때 까지 차단 할 지 여부 입 니 다.property valueclass pyspark.Accumulator(aid, value, accum_param)
누적 가능 한 공유 변 수 는 교환 가능 하고 연결 가능 한'add'동작 이 있 습 니 다.Spark 클 러 스 터 의 작업 작업 작업 은+=operator누산기 에 값 을 추가 할 수 있 지만 드라이버 만 value 를 사용 하여 값 에 접근 할 수 있 습 니 다.워 커 에서 온 업 데 이 트 는 자동 으로 driver 로 전 파 됩 니 다.Spark Context 는 기본 데이터 형식(예 를 들 어 int 와 float)의 누적 기 를 지원 하지만 사용 자 는 사용자 정의 Accumulator Param 대상 을 제공 하여 사용자 정의 형식 에 누적 기 를 정의 할 수 있 습 니 다.이 모듈 의 doctest 를 예 로 들 면.1. add(term) 2. property value
class pyspark.AccumulatorParam
주어진 형식의 값 을 축적 하 는 helper 대상 을 정의 합 니 다.1.addInPlace(value1, value2)누산기 데이터 형식의 두 값 을 추가 하여 새로운 값 을 되 돌려 줍 니 다.효율 성 을 높이 기 위해 서 는 적절 한 곳 에서 value 1 을 업데이트 하고 되 돌려 줄 수도 있다.2.zero(value)유형 에'0 값'을 제공 하고 차원 에서 제공 하 는 값 과 호 환(예 를 들 어 0 벡터)class pyspark.MarshalSerializer
Python 의 Marshal 직렬 화 대상 을 사용 합 니 다.이 직렬 화 는 빠 르 지만 소량의 데 이 터 를 지원 합 니 다.1.dumps(obj)2.loads(obj)class pyspark.PickleSerializer
이 직렬 화 기 는 거의 모든 Python 대상 을 지원 하지만 다른 전용 직렬 화 기 는 그렇게 빠 르 지 않 을 수 있 습 니 다.1. dumps(obj) 2. loads(obj) class pyspark.StatusTracker(jtracker)
job 와 stage progress 를 감시 하 는 저급 상태 보고 api.이러한 api 는 매우 약 한 일치 성 의 미 를 제공 하려 고 합 니 다.이 api 사용자 들 은 빈/잃 어 버 린 정 보 를 준비 해 야 합 니 다.예 를 들 어 작업 의 stage id 는 이미 알 고 있 을 수 있 지만 상태 API 는 이러한 stage 디 테 일 에 대한 정보 가 없 을 수 있 으 므 로getStageInfo효과 적 인 stage id 를 위해 None 로 돌아 갈 수 있 습 니 다.메모리 사용 을 제한 하기 위해 서 이 api 들 은 최근 jobs/stages 에 대한 정보 만 제공 합 니 다.이 api 는 마지막spark.ui.retainedStagesspark.ui.retainedJobs에 정 보 를 제공 합 니 다.1.getActiveJobsIds()모든 활성 화 된 jobs 를 포함 하 는 id 를 되 돌려 주 는 배열 2.getActiveStageIds()모든 활성 화 된 stages 를 포함 하 는 id 를 되 돌려 주 는 배열 3.getJobIdsForGroup(jobGroup=None)특정 작업 그룹 에서 알 고 있 는 모든 작업 목록 을 되 돌려 줍 니 다.job Group 이 None 이면 작업 그룹 과 무관 한 모든 알려 진 작업 을 되 돌려 줍 니 다.돌아 오 는 목록 은 실행 중,실패,완 료 된 작업 을 포함 할 수 있 으 며,이 방법 에 따라 호출 이 다 를 수 있 습 니 다.이 방법 은 그 결과 의 원소 의 순 서 를 보장 하지 않 는 다.4.getJobInfo(jobId)SparkJobInfo 대상 으로 돌아 가 작업 정보 나 작업 정보 가 쓰레기 로 수집 되 었 을 경우 None 로 돌아 갑 니 다.5.getStageInfo(stageId)SparkStageInfo 대상 으로 돌아 가 작업 정보 나 작업 정보 가 쓰레기 로 수집 되 었 을 경우 None 로 돌아 갑 니 다.class pyspark.SparkJobInfo
스파크 작업 에 대한 정 보 를 노출 하 다.class pyspark.SparkStageInfo
스파크 단계 에 대한 정 보 를 노출 하 다.class pyspark.Profiler(ctx)
PySpark 는 Basic Profiler 가 아 닌 사용자 정의 분석 기 를 지원 합 니 다.사용자 정의 분석 기 는 다음 과 같은 방법 을 정의 하거나 계승 해 야 합 니 다.
  • profile–특정한 유형의 시스템 프로필 을 생 성 합 니 다.
  • stats–수 집 된 통계 정 보 를 되 돌려 줍 니 다.
  • dump–개요 파일 을 경로 로 저장
  • add--개요 파일 을 기 존의 누적 개요 파일 에 추가
  • SparkContext 를 만 들 때 proffler 클래스 를 선택 하 십시오.
    >>> from pyspark import SparkConf, SparkContext
    >>> from pyspark import BasicProfiler
    >>> class MyCustomProfiler(BasicProfiler):
    ...     def show(self, id):
    ...         print("My custom profiles for RDD:%s" % id)
    ...
    >>> conf = SparkConf().set("spark.python.profile", "true")
    >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
    >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    >>> sc.parallelize(range(1000)).count()
    1000
    >>> sc.show_profiles()
    My custom profiles for RDD:1
    My custom profiles for RDD:3
    >>> sc.stop()
    

    1.dump(id, path)profflee 를 path 에 저장 합 니 다.id 는 RDD id 2.profile(func)함수 분석 3.show(id)profflee 상 태 를 출력 4 로 인쇄 합 니 다.stats()수 집 된 분석 상 태 를 되 돌려 줍 니 다.class pyspark.BasicProfiler(ctx)
    Basic Profiler 는 기본 proffler 입 니 다.cProfile 과 누적 기 를 기반 으로 하 는 1.profile(func)실행 및 전송 방법 을 설정 합 니 다 toprofile。프로필 대상 을 되 돌려 줍 니 다.2.stats()수 집 된 profiling 통계 정보(pstats.Stats)를 되 돌려 줍 니 다.class pyspark.TaskContext
    작업 의 상하 문 정 보 는 실행 과정 에서 읽 거나 수정 할 수 있 습 니 다.실행 중인 작업 에 접근 할 TaskContext 통과TaskContext.get().1.attemptNumber()"이 임 무 는 이미 몇 번 이나 시도 했다.첫 번 째 퀘 스 트 시 도 는 시도 번호=0 으로 배정 되 며,후속 시도 의 시도 번 호 는 계속 증가 할 것 이다.2.classmethod get()현재 이벤트 의 TaskContext 로 돌아 갑 니 다.이것 은 실행 중인 작업 에 대한 상하 문 정 보 를 사용자 함수 내부 에서 호출 할 수 있 습 니 다.주의:driver 가 아 닌 called on worker 여야 합 니 다.초기 화 되 지 않 으 면 None 으로 돌아 갑 니 다.3.getLocalProperty(key)driver 의 상류 에 로 컬 속성 을 설정 하고 잃 어 버 리 면 설정 하지 않 습 니 다.4.partitionId()이 퀘 스 트 가 계산 한 RDD 파 티 션 의 ID.5.stageId()이 퀘 스 트 가 속 한 단계 의 ID.6.taskAttemptId()이 작업 을 시도 하 는 유일한 ID(같은 SparkContext 에서 두 작업 이 시도 하 는 시도 id 가 다 르 지 않 음).이 는 대체로 Hadoop 의 TaskAttemptID 에 해당 한다.class pyspark.RDDBarrier(rdd)
    RDD 를 barrier 단계 에 포장 하여 Spark 와 함께 이 단계 의 작업 을 시작 하도록 합 니 다.RDDBarrier인 스 턴 스 는RDD.barrier()에 의 해 만들어 진 것 이다.1.mapPartitions(f, preservesPartitioning=False)포 장 된 RDD 의 각 파 티 션 에 함 수 를 적용 하여 새로운 RDD 를 되 돌려 줍 니 다.그 중의 작업 은 barrier 단계 에서 함께 시 작 됩 니 다.이 인 터 페 이 스 는 RDD.mapPartitions()와 같 습 니 다.class pyspark.BarrierTaskContext
    barrier stage 에 추가 컨 텍스트 정보 와 도 구 를 가 진 TaskContext.BarrierTaskContext.get()을 사용 하여 실행 중인 barrier 작업 의 barrier 컨 텍스트 를 가 져 옵 니 다.1.barrier()전역 장벽 을 설치 하고 이 단계 의 모든 작업 이 이 장벽 에 부 딪 힐 때 까지 기다린다.MPI 의 MPI 와Barrier 함수 와 유사 합 니 다.이 함 수 는 같은 단계 의 모든 작업 이 이 규칙 에 도달 할 때 까지 막 힙 니 다.barrier 단계 에서 모든 작업 이 가능 한 코드 분기 에서 barrier()를 호출 하 는 횟수 는 같 습 니 다.그렇지 않 으 면 작업 을 끊 거나 시간 을 초과 한 후에 SparkException 이 나타 날 수 있 습 니 다.2.classmethod get()현재 활동 중인 Barrier TaskContext 로 돌아 갑 니 다.이것 은 실행 중인 작업 에 대한 상하 문 정 보 를 사용자 함수 내부 에서 호출 할 수 있 습 니 다.주의:driver 가 아 닌 worker 에서 호출 해 야 합 니 다.초기 화 되 지 않 으 면 None 으로 돌아 갑 니 다.3.getTaskInfos()이 장벽 단계 의 모든 작업 의 Barrier TaskInfo 를 되 돌려 주 고 파 티 션 ID 에 따라 정렬 합 니 다.class pyspark.BarrierTaskInfo(address)
    barrier 작업 의 모든 작업 정 보 를 가지 고 있 습 니 다.변수:address-barrier 작업 이 실 행 된 실행 프로그램의 IPv 4 주소(호스트:포트)

    좋은 웹페이지 즐겨찾기