spark RDD 상용 함수 / 조작
글 의 코드 는 모두 spark - shell 에서 실 행 될 수 있 습 니 다.
transformations
map(func)
집합 내의 모든 요 소 는 function 을 통 해 새로운 요소 로 매 핑 됩 니 다.
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.map( _ + 1)
모든 transformation 작업 에 대해 생 성 된 것 은 새로운 RDD (여기 가 resultRdd) 입 니 다. 실제 연산 을 하지 않 고 RDD 에 대해 action 작업 을 할 때 만 실제 계산 하고 결 과 를 얻 을 수 있 습 니 다.
scala> resultRdd.collect
res3: Array[Int] = Array(2, 3, 4, 5)
다음 transformation 작업 은 동일 합 니 다.
filter(func)
func
을 통 해 집합 요 소 를 걸 러 내 고 func
의 반환 값 은 Boolean
유형 이 어야 합 니 다.val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.filter( _ > 1)
scala> resultRdd.collect
res4: Array[Int] = Array(2, 3, 4)
flatmap(func)
func
을 통 해 집합 안의 모든 요 소 를 하나의 서열 로 매 핑 합 니 다 (구체 적 으로 TraversableOnce [?] 입 니 다. 여 기 는 이 유형 을 상관 하지 않 아 도 됩 니 다. spark 는 스스로 암시 적 으로 전환 하고 일반적인 순서 로 교체 할 수 있 는 서열 이 모두 가능 합 니 다).말하자면 이해 하기 어 려 울 수도 있 으 니 예 를 들 어 보 자.아니면 [1,2,3,4]
입 니 다. 만약 에 func 가 이렇다 고 가정 합 니 다. x => Array(x+0.1, x+0.2)
즉, 하나의 순서 로 돌아 가 는 것 입 니 다. flatMap 절 차 는 모든 요소 에 대해 func 를 먼저 실행 한 것 으로 볼 수 있 습 니 다.[(1.1, 1.2), (2.1, 2.2), (3.1, 3.2), (4.1, 4.2)]
마지막 으로 모든 서열 을 평평 하 게 펼 치면 다음 과 같은 것 을 얻 을 수 있 습 니 다.
[1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2]
코드 형식:
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.flatMap( x => Array(x+0.1,x+0.2) )
scala> resultRdd.collect
res8: Array[Double] = Array(1.1, 1.2, 2.1, 2.2, 3.1, 3.2, 4.1, 4.2)
mapPartitions(func)
맵 과 유사 하지만 RDD 의 각 파 티 션 에서 각각 실 행 됩 니 다. 한 파 티 션 에 있 는 요 소 를 새로운 파 티 션 으로 표시 하고 마지막 으로 모든 새 파 티 션 을 합 쳐 새로운 RDD 로 만 드 는 것 으로 이해 할 수 있 습 니 다.
T
형식의 RDD 에 이 함 수 를 사용 할 때 func
의 서명 은 Iterator => Iterator
이 어야 합 니 다.val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.mapPartitions(iter => iter.map(_+1)) // iter.map RDD map , scala
scala> resultRdd.collect
res11: Array[Int] = Array(2, 3, 4, 5)
mapPartitionsWithIndex(func)
유사
mapPartitions(func)
하지만 하나의 파 티 션 의 색인 번호 정 보 를 더 제공 하기 때문에 요소 T
유형의 RDD, func
의 유형 서명 은 (Int, Iterator) => Iterator
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.mapPartitionsWithIndex( (index, iter) => iter.map( x => (x,s" $index")) )
scala> resultRdd.collect
res14: Array[(Int, String)] = Array((1, 0), (2, 1), (3, 2), (4, 3))
각 분 구 의 상황 을 볼 수 있 는데 여기 서 군집 구조 가 다 를 때 결과 도 다 를 수 있 는데 이것 은 RDD 가 어떤 분 구 에 분포 하 느 냐 에 달 려 있다.
sample(withReplacement, fraction, seed)
샘플링 함수, 일정한 확률 로 데 이 터 를 샘플링 합 니 다.
val a = Array(1,2,3,4)
val pa = sc.parallelize(a)
val resultRdd = pa.sample(true,0.5)
resultRdd.collect
여러 번 실 행 된 결과 가 다 릅 니 다:
scala> pa.sample(false,0.5).collect
res52: Array[Int] = Array(1, 3)
scala> pa.sample(false,0.5).collect
res53: Array[Int] = Array(1, 2)
withReplacement
은 true
, 즉 돌려 놓 을 수 있 는 상황 으로 설정 하면 중복 요소 가 발생 할 수 있다.scala> pa.sample(true,0.5).collect
res60: Array[Int] = Array(1, 2, 2)
scala> pa.sample(true,0.5).collect
res61: Array[Int] = Array(3, 4)
union(otherDataset)
병집 을 구하 다.
예시:
val pa = sc.parallelize( Array(1,2))
val pb = sc.parallelize(Array(3,4))
pa.union(pb).collect
결과:
res63: Array[Int] = Array(1, 2, 3, 4)
intersection(otherDataset)
교 집합 을 구하 다.
예시:
val pa = sc.parallelize( Array(1,2,3))
val pb = sc.parallelize(Array(3,4,5))
pa.intersection(pb).collect
결과:
res66: Array[Int] = Array(3)
distinct([numTasks]))
무 거 운 것 을 제거 하 다.
numTasks
선택 할 수 있 는 매개 변수 로 몇 개의 작업 으로 분 배 된 것 을 나타 낸다.예시:
val pa = sc.parallelize(Array(0,1,1,2,2,3))
pa.distinct.collect
결과:
res92: Array[Int] = Array(0, 1, 2, 3)
groupByKey([numTasks])
패 킷 함수. -형식 (K, V) 의 데이터 세트 를 사용 하고 (K, Iterable) 형식의 데이터 세트 를 되 돌려 줍 니 다. - 그룹 을 나 눈 후에
sum,average
등 취 합 함 수 를 사용 하려 면 reduceByKey
또는 aggregateByKey
를 사용 하 는 것 이 좋 습 니 다. 더 좋 은 성능 을 얻 을 수 있 습 니 다. - 기본 병렬 도 는 부모 RDD 에 의존 하고 선택 가능 한 인자 numTasks
가 지정 한 병렬 작업 수량 을 입력 할 수 있 습 니 다.예시:
val pa = sc.parallelize(Array( "a" -> 1,"a" ->2, "b" -> 3))
pa.groupByKey.collect
결과:
res99: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2)), (b,CompactBuffer(3)))
reduceByKey(func, [numTasks])
key 에 따라 그룹 을 나 누 어 모 으 고 SQL 의 groupby 와 유사 한 후에 모 으 기 함 수 를 사용 합 니 다.
(K, V) 형식의 데이터 세트 가 이 함 수 를 호출 하면 같은 (K, V) 형식의 데이터 세트 를 되 돌려 줍 니 다.
예시:
val pa = sc.parallelize(Array( "a" -> 1,"a" ->2,"a" ->3, "b" -> 4))
pa.reduceByKey( (x,y) => x+y).collect
결과:
res110: Array[(String, Int)] = Array((a,6), (b,4))
사실은 먼저 조 를 나 누고 각 조 내 에 reduce 를 하 는 것 이다.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
먼저 조 를 나 눈 다음 에 각 조 에서 호출
aggregate
하여 접 으 면 reduceByKey
과 차이 점 은 접 은 결과 가 다른 유형 일 수 있다 는 것 이다.(K, V) 의 데이터 세트 를 호출 하고 (K, U) 형식의 데이터 세트 를 되 돌려 줍 니 다.
참고 가능
aggregate
.val pa = sc.parallelize(Array("a" -> 1,"a" ->2,"a" ->3, "b" -> 4))
val r = pa.aggregateByKey("")(
(x:String,y:Int) => x+y.toString*2,
(x:String,y:String) => x+y
)
결과:
scala> r.collect
res135: Array[(String, String)] = Array((a,112233), (b,44))
sortByKey([ascending], [numTasks])
정렬 - ascending: 선택 할 수 있 습 니 다. 오름차 정렬 여부 - numTasks: 선택 할 수 있 습 니 다. 동시 작업 수량 은 (K, V) 의 데이터 세트 를 조작 하고 같은 (K, V) 형식의 데이터 세트 를 되 돌려 줍 니 다. 그 중에서 K 는
Ordered
trait 를 실 현 했 습 니 다. 즉, 정렬 할 수 있 습 니 다.val pa = sc.parallelize(Array("b" -> 1,"d" ->2,"a" ->3, "c" -> 4))
pa.sortByKey().collect
결과:
res139: Array[(String, Int)] = Array((a,3), (b,1), (c,4), (d,2))
join(otherDataset, [numTasks])
두 집합의 내 적 은 데이터베이스 안의
inner join
에 대응한다.(K, V) 와 (K, W) 형식의 데이터 세트 를 조작 하여 (K, (V, W) 형식의 데이터 세트 를 되 돌려 줍 니 다.
또한 대외 적 인 지원 도 있다.
leftOuterJoin
, rightOuterJoin
, fullOuterJoin
데이터베이스 에 있 는 해당 개념 과 같다.예 를 들 어:
val pa = sc.parallelize(
Array("a" -> 1,
"b" -> 2, "b" -> 3)
)
val pb = sc.parallelize(
Array("b" -> 2, "b" -> 3,
"d" -> 4)
)
내 적:
scala> pa.join(pb).collect
res140: Array[(String, (Int, Int))] = Array((b,(2,2)), (b,(2,3)), (b,(3,2)), (b,(3,3)))
왼쪽 외 적:
scala> pa.leftOuterJoin(pb).collect
res141: Array[(String, (Int, Option[Int]))] = Array((a,(1,None)), (b,(2,Some(2))), (b,(2,Some(3))), (b,(3,Some(2))), (b,(3,Some(3))))
오른쪽 외 적:
scala> pa.rightOuterJoin(pb).collect
res142: Array[(String, (Option[Int], Int))] = Array((d,(None,4)), (b,(Some(2),2)), (b,(Some(2),3)), (b,(Some(3),2)), (b,(Some(3),3)))
전 외적:
scala> pa.fullOuterJoin(pb).collect
res143: Array[(String, (Option[Int], Option[Int]))] = Array((d,(None,Some(4))), (a,(Some(1),None)), (b,(Some(2),Some(2))), (b,(Some(2),Some(3))), (b,(Some(3),Some(2))), (b,(Some(3),Some(3))))
주의해 야 할 것 은 외적 결과 의 원소 가
Option
유형 으로 바 뀌 었 다 는 것 이다cogroup(otherDataset, [numTasks])
두 데이터 세트 를 각각 그룹 으로 나 눈 다음 그룹 결 과 를 요소 로 연결 합 니 다.
(K, V) 와 (K, W) 형식 을 조작 하고 (K, (Iterable, Iterable) 형식 을 되 돌려 줍 니 다. 별명 groupWith.
val pa = sc.parallelize(Array("a" -> 1, "b" -> 2, "b" -> 3))
val pb = sc.parallelize(Array("a" -> "a", "b" -> "b", "b" -> "c"))
pa.cogroup(pb).collect
결과:
res152: Array[(String, (Iterable[Int], Iterable[String]))] = Array((a,(CompactBuffer(1),CompactBuffer(a))), (b,(CompactBuffer(2, 3),CompactBuffer(b, c))))
cartesian(otherDataset)
두 집합 에 대해 피리 칼 적 을 구하 고 T 와 U 유형 을 조작 하여 (T, U) 유형 을 되 돌려 줍 니 다.
val pa = sc.parallelize(Array(1,2))
val pb = sc.parallelize(Array(3,4))
pa.cartesian(pb).collect
결과:
res156: Array[(Int, Int)] = Array((1,3), (1,4), (2,3), (2,4))
pipe(command, [envVars])
각 파 티 션 을 stdin 에 출력 한 다음 명령 을 실행 하고 마지막 으로 stdout 을 읽 어 모든 행동 요소 로 새로운 RDD 를 생 성 합 니 다.명령 을 수행 하 는 단 위 는 파 티 션 이지 요소 가 아 닙 니 다.
/ home / zeta / 디 렉 터 리 에 새 스 크 립 트 test. sh:
#!/bin/bash
while read LINE; do
echo e${LINE}
done
그리고 spark - shell 에서 실행:
val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.pipe("/home/zeta/test.sh").collect
결과:
res181: Array[String] = Array(e1, e2, e3, e4, e5, e6)
coalesce(numPartitions)
RDD 의 파 티 션 수 를
numPartitions
개 로 줄 이 고 하나의 빅 데이터 세트 filter
작업 을 한 후에 파 티 션 수 를 줄 이면 효율 을 높 일 수 있다.repartition(numPartitions)
RDD 내 모든 파 티 션 의 데 이 터 를 무 작위 로 흐 트 러 뜨리 고 균형 을 잡 으 세 요.
actions
collect()
집합 내의 모든 요 소 를 배열 로 되 돌려 줍 니 다.
count()
데이터 세트 내의 요소 개 수 를 되 돌려 줍 니 다.
foreach(func)
데이터 세트 의 모든 요소 에 func 를 실행 합 니 다.
몇 가지 주의: 1. 부작용: 분포 식 상황 에서 모든 executor 는 자신의 실행 공간 을 가지 기 때문에 변 수 는 전역 적 으로 공유 되 지 않 고 변수 에 대한 부작용 은 정의 되 지 않 은 행 위 를 초래 할 수 있 습 니 다.이 럴 때 는 Accumulator 를 사용 해 야 한다.2. 패 킷 닫 기: 패 킷 참조 문제 Understanding closures
first()
데이터 세트 의 첫 번 째 요 소 를 되 돌려 줍 니 다.
take(n)
데이터 세트 내 n 개의 요 소 를 배열 로 되 돌려 줍 니 다.
reduce(func)
reduce 작업 은 병렬 계산 에서 정확 한 결 과 를 얻 을 수 있 도록 이 함 수 는 교환 율 과 결합 율 을 만족 시 켜 야 합 니 다. 즉, 데이터 세트 와 이 연산 은 반드시 교환 군 을 구성 해 야 합 니 다.
val pa = sc.parallelize(Array(1,2,3))
pa.reduce( (x,y) => x+y )
결과:
res184: Int = 6
aggregate(zeroValue)(seqOp, combOp)
이 함수 의 작업 절 차 는 두 단계 로 볼 수 있 습 니 다. 1. RDD 의 각 파 티 션 에서 호출
seqOp
작업 을 접 습 니 다. fold 2. 호출 combOp
과 유사 합 니 다. 각 파 티 션 의 결 과 를 집합 합 니 다.그것 의 서명 은 다음 과 같다.
def aggregate[U](zeroValue: U)(
seqOp: (U, Int) => U,
combOp: (U, U) => U
)
설명 하기 가 너무 귀찮아 서...한 마디 로 각 파 티 션 에서 fold 를 한 번 더 하고 결 과 를 모 으 는 것 이다.
예:
val pa = sc.parallelize(Array(1,2,3,4))
def seqOp(x:String,y:Int) = x+y.toString
def combOp(x:String,y:String) = x+y
pa.aggregate("")(seqOp,combOp)
취 합 시 파 티 션 의 순서 가 일정 하지 않 기 때문에 위의 코드 의 실행 결과 도 확실 하지 않 습 니 다.
scala> pa.aggregate("")(seqOp,combOp)
res201: String = 2413
scala> pa.aggregate("")(seqOp,combOp)
res202: String = 1234
takeSample(withReplacement, num, [seed])
무 작위 추출
num
개 샘플, - with Replacement: 추출 후 원 소 를 되 돌려 놓 을 지 여부 - num: 추출 개수 - [seed]: 선택 가능 한 매개 변수, 무 작위 피 드val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.takeSample(false,3)
무 작위 결과:
res185: Array[Int] = Array(3, 2, 5)
takeOrdered[T](n:Int)(implicit ord: Ordering[T])
정렬 후 n 의 요 소 를 되 돌려 줍 니 다. 두 번 째 암시 적 매개 변수
ordering
는 컴 파일 러 가 스스로 찾 고 사용자 가 정의 할 수 있 습 니 다.val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.takeOrdered(3)
결과:
res193: Array[Int] = Array(1, 2, 3)
비교 기 사용자 정의 시도:
object Ord extends Ordering[Int] {
override def compare(x:Int,y:Int):Int = {
if(x1 else -1;
}
}
val pa = sc.parallelize(Array(1,2,3,4,5,6))
pa.takeOrdered(3)(Ord)
이번 결 과 는:
res195: Array[Int] = Array(6, 5, 4)
countByKey()
키 값 은 종류 RDD 에 유효 하 며, 키 마다 대응 하 는 요소 개 수 를 집계 합 니 다.
saveAsTextFile(path)
모든 요 소 는 한 줄 로 텍스트 파일 (또는 일련의 텍스트 파일) 을 기록 합 니 다.인자
path
가 지정 한 기록 디 렉 터 리 로 로 컬 파일 시스템, HDFS 및 기타 Hadoop 이 지원 하 는 파일 시스템 을 지원 합 니 다.saveAsSequenceFile(path)
자바 와 스칼라 지원) 모든 요 소 를 Hadoop SequenceFile 에 기록 하고 로 컬 파일 시스템, HDFS, Hadoop 이 지원 하 는 모든 파일 시스템 을 지원 합 니 다.
Hadoop
Writable
인 터 페 이 스 를 실현 하 는 키 값 만 이 형식의 RDD 를 지원 합 니 다.스칼라 에 서 는 암시 적 으로
Writable
으로 전환 할 수 있 는 형식 도 이 동작 을 지원 합 니 다. (Spark 는 기본 유형 인 Int, Double, String 등에 대해 암시 적 으로 변환 되 었 습 니 다)saveAsObjectFile(path)
자바 의 직렬 화 형식 직렬 화 대상 을 사용 하여 자바 와 스칼라 를 지원 합 니 다. 불 러 오 려 면 SparkContext. object File () 을 사용 하 십시오.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.