Hadoop 학습 노트: Spark 에 대하 여
Spark 는 작업 과 작업 사이 에 발생 하 는 대규모 작업 데이터 세트 를 메모리 에 저장 하고 MapReduce 보다 성능 이 한 단계 높 습 니 다.Spark 처리 모델 에서 가장 큰 이익 을 얻 은 것 은 교체 알고리즘 (데이터 세트 에 대해 특정한 함 수 를 중복 응용 하여 종료 조건 을 만족 시 킬 때 까지) 과 상호작용 분석 (사용자 가 데이터 세트 에 일련의 전용 탐색 적 조회 로 보 내 는 것) 이다.
Spark 소프트웨어 스 택
스파크 핵심
단어 주파수 통계
object WordCount {
def main(args: Array[String]) {
val inputFile = args(0)
val outputFile = args(1)
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
val input = sc.textFile(inputFile)
val words = input.flatMap(line => line.split(" "))
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
counts.saveAsTextFile(outputFile)
}
}
구축 및 실행
sbt clean package spark-submit --class WordCount target/scala-2.11/scala_demo_2.11-0.1.jar file/1.txt file/wordcounts
RDD 프로 그래 밍
RDD 는 가 변 적 이지 않 은 분포 식 대상 집합 으로 각 RDD 는 여러 개의 파 티 션 으로 나 뉘 어 있 으 며 파 티 션 은 클 러 스 터 의 서로 다른 노드 에서 실 행 됩 니 다.
RDD 를 만 드 는 두 가지 방법:
textFile
등 을 읽 습 니 다.parallelize
RDD 가 지원 하 는 두 가지 동작:
RDD 는 기본적으로 영구 화 되 지 않 습 니 다. 여러 동작 에서 같은 RDD 를 다시 사용 하려 면 같은 RDD 를 여러 번 계산 하지 않도록
RDD.persist()
Spark 를 사용 하여 이 RDD 를 캐 시 할 수 있 습 니 다.상용 전환 조작
함수 명
목적.
실례
map()
모든 요소 에 함 수 를 적용 합 니 다.
rdd.map(x => x + 1)
flatMap()
함수 와 모든 요 소 를 적용 합 니 다. 모든 반환 값 은 교체 기 입 니 다.
rdd.flatMap(x => x.to(3))
filter()
조건 을 충족 시키다
distinct()
무 거 운 것 을 제거 하 다
union()
RDD 의 모든 요 소 를 포함 합 니 다.
rdd.union(other)
intersection()
아울러
rdd.intersection(other)
subtract()
첫 번 째 에 존재 하고 두 번 째 에 존재 하지 않 는 다.
위 와 같다
cartesian()
피리 칼 적
위 와 같다
상용 행동 조작
함수 명
목적.
collect()
RDD 의 모든 요 소 를 되 돌려 줍 니 다.
count()
원소 개수
countByValue()
각 원소 출현 횟수
take(num)
num 개 요소 되 돌리 기
top(num)
맨 앞 에 있 는 num 개 요 소 를 되 돌려 줍 니 다.
takerdered(num)(ordering)
RDD 에서 제 공 된 순서대로 맨 앞 에 있 는 num 요 소 를 되 돌려 줍 니 다.
reduce(func)
sum 과 같은 RDD 의 모든 메타 데 이 터 를 병렬 통합 합 니 다.
fold(zero)(func)
reduce 와 마찬가지 로 초기 값 을 제공 해 야 합 니 다.
aggregate(zeroValue)(seqOp, combOp)
reduce 와 비슷 합 니 다. 보통 다른 유형의 함 수 를 되 돌려 줍 니 다.
foreach(func)
RDD 의 각 요소 에 주어진 함 수 를 사용 하고 결과 가 되 돌아 오지 않 습 니 다.
키 쌍 조작
키 쌍 을 포함 하 는 RDD 를 Pair RDD 라 고 하고 Pair RDD 는 reduceByKey () 방법 을 제공 하 며 각 키 에 대응 하 는 데 이 터 를 각각 요약 할 수 있 습 니 다.join () 은 두 개의 RDD 중 키 가 같은 요 소 를 조합 하여 하나의 RDD 로 합 칠 수 있다.
맵 으로 일반 RDD 를 Pair RDD 로 변환 할 수 있 습 니 다.
val pairs = linex.map(x => (x.split(" ")(0), x))
Pair RDD 의 전환 작업
함수.
목적.
reduceByKey(func)
같은 키 를 가 진 값 을 합 칩 니 다.
groupByKey()
같은 키 를 가 진 값 을 그룹 으로 나 눕 니 다.
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
서로 다른 반환 형식 을 사용 하여 같은 키 를 가 진 값 을 합 칩 니 다.
mapValues(func)
pair RDD 의 모든 값 에 키 를 바 꾸 지 않 고 함 수 를 적용 합 니 다.
flatMapValues(func)
pair RDD 의 모든 값 에 리 턴 교체 기 함 수 를 적용 한 다음 돌아 오 는 모든 요소 에 원 키 에 대응 하 는 키 쌍 기록 을 생 성 합 니 다.기호 화
keys()
키 만 포 함 된 RDD 되 돌리 기
values()
값 만 포 함 된 RDD 되 돌리 기
sortByKey()
키 에 따라 정렬 된 RDD 되 돌리 기
pair RDD 두 개 에 대한 전환 작업
함수.
목적.
subtractByKey
RDD 의 키 와 other RDD 의 키 가 같은 요 소 를 삭제 합 니 다.
join
RDD 두 개 를 내부 연결 합 니 다.
rightOuterJoin
오른쪽 외부 연결, RDD 의 키 가 존재 해 야 합 니 다.
leftOuterJoin
왼쪽 외부 연결, other 의 키 가 존재 해 야 합 니 다.
cogroup
두 RDD 에서 같은 키 를 가 진 데 이 터 를 묶 습 니 다.
combineByKey 상세 설명
combineByKey 는 Spark 에서 비교적 핵심 적 인 고급 함수 로 다른 고급 키 값 은 함수 밑 에 모두 이 를 통 해 이 루어 집 니 다.groupByKey, reduceByKey 등
정의:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
인자:
createCombiner: V => C
각 파 티 션 에서 새로운 요 소 를 만 났 을 때 그 키 에 대응 하 는 누적 기의 초기 값 mergeValue: (C, V) => C
이미 만난 키 에 대해 키 누산기 에 대응 하 는 현재 값 을 이 새 값 과 합 칩 니 다 mergeCombiners: (C, C) => C
, 서로 다른 구역 의 같은 키 를 합 친 누산기 인 스 턴 스 코드
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
val d1 = sc.parallelize(initialScores)
type MVType = (Int, Double) // ( , )
d1.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
).map { case (name, (num, socre)) => (name, socre / num) }.collect
매개 변수 의미 의 해석
score => (1, score)
우 리 는 점 수 를 매개 변수 로 하고 추 가 된 원조 유형 을 되 돌려 주 었 다."Fred" 를 열 로 현재 점 수 는 88.0 = > (1, 88.0) 1 로 현재 과목 의 계수 기 를 표시 합 니 다. 이 때 는 한 과목 만 있 습 니 다 (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore)
여기 있 는 c1 은 createCombiner
초기 화 된 (1, 88.0) 입 니 다.한 구역 내 에서 우 리 는 또 'Fred' 의 새로운 점수 91.0 을 만 났 다.물론 우 리 는 이전의 과목 점수 와 현재 의 점 수 를 합치 면 c1._2 + newScore
, 그리고 과목 계산 기 를 1 즉 c1._1 + 1
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
'Fred' 가 학 패 일 수 있 으 니 주의 하 세 요. 그 가 선택 한 과목 이 너무 많아 서 서로 다른 구역 에 분 산 될 수 있 습 니 다.모든 분 구 를 진행 mergeValue
한 후, 그 다음은 분 구 간 을 합병 하 였 으 며, 분 구 간 과목 수 와 과목 수 를 더 하면 총 점 과 총 과목 수 실행 결 과 는 다음 과 같다.
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))
데이터 파 티 션데이터 집합 이 노드 간 의 파 티 션 을 제어 합 니 다.
Spark 가 제공 하 는 파 티 션 방식:
파 티 션 에서 이득 을 보 는 작업
데 이 터 를 키 의 크로스 노드 에 따라 혼합 하 는 것 을 도입 해 야 이득 을 볼 수 있다.
예 를 들 어 cogroup (), groupWith (), join (), leftOuterJoin (), rightOuterJoin (), groupByKey (), reduceByKey (), combineByKey (), loopup ()
파 티 션 방식 에 영향 을 주 는 작업
파 티 션 을 설정 합 니 다: cogroup (), groupWith (), join (), left OuterJoin (), rightOuterJoin (), groupByKey (), reduceByKey (), combineByKey (), partition By (), sort (), mapValues () (부모 RDD 에 파 티 션 이 있 으 면), flatMapValues () (부모 RDD 에 파 티 션 이 있 으 면)
데이터 읽 기와 저장
Spark 는 다양한 입 출력 원 을 지원 합 니 다. 일부 원인 은 Spark 가 Hadoop 생태 권 을 기반 으로 구축 되 었 기 때 문 입 니 다. Hadoop MapReduce 에서 사용 하 는 InputFormat 과 OutputFormat 인 터 페 이 스 를 통 해 데 이 터 를 방문 할 수 있 고 대부분 흔히 볼 수 있 는 파일 형식 과 저장 시스템 이 이 인 터 페 이 스 를 지원 합 니 다.
지원 하 는 파일 형식: 텍스트 파일, JSON, CSV, SequenceFiles, Protocol buffers, 대상 파일
지원 하 는 파일 시스템: 로 컬 / "일반" 파일 시스템, Amazon S3, HDFS
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Java 액세스 Hadoop 분산 파일 시스템 HDFS 구성 설명프로파일 m103은hdfs 서비스 주소로 바꿉니다. Java 클라이언트를 이용하여 HDFS의 파일을 액세스하려면 프로필hadoop-0.20.2/conf/core-site를 사용해야 합니다.xml입니다. 처음에 저는 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.