Spark에서combinebyKey와reduceByKey의 입력 함수 매개 변수의 차이를 가르쳐 주시겠습니까?
6711 단어 Spark
Spark에서combinebyKey와reduceByKey의 입력 함수 매개 변수의 차이를 가르쳐 주시겠습니까?
val testData = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
val testDataCombine = testData.combineByKey(x=>x,(x:Int,y:Int)=>x+y,(x:Int,y:Int)=>x+y)
val testDataReduce = testData.reduceByKey((x,y)=>x+y)
combineByKey
x ,y Int , "+" ,
reduceByKey 。
저자: 연성
링크:https://www.zhihu.com/question/45420080/answer/99044117
알다
저작권은 작자에게 귀속된다.상업 전재는 작가에게 연락하여 권한을 부여받고, 비상업 전재는 출처를 밝혀 주십시오.
테마 예시 코드에서 테스트 데이터라는 RDD의 유형은 RDD[(String, Int)]로 확정된 다음에 RDDD를 통해rddTorddpairFunctions라는 은밀한 형식은PairRddFunctions[string, Int]로 변환하여 ReduceByKey와combineByKey 두 개의 methods를 획득합니다.그런 다음 두 함수의 서명을 비교합니다.class PairRDDFunctions[K, V](...) {
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
}
ReduceByKey의func 매개 변수의 유형은PairRDDFunction의 형식 매개 변수 V에만 의존하는 것을 볼 수 있습니다. 이 예에서 Int입니다.따라서 func의 유형은 (Int, Int)=> Int으로 결정되었기 때문에 형식을 추가로 표시할 필요가 없습니다.
한편,combineByKey는 ReduceByKey보다 더욱 통용된다. 각partition은shuffle 전에 local reduce를 먼저 해서 C의 중간값을 얻고shuffle를 한 후에 각 키에 대응하는 C를 합칠 수 있다.균일치를 예로 들면, 우리는 모든 파티션이 하나의 파티션 안의 키에 대응하는 모든 정수와sum 및 개수count를 구한 다음pair(sum,count)를 되돌려줄 수 있다.shuffle 후 키에 대응하는 모든sum와count를 누적하여 평균값을 나눈다.val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
(_: Int) => (0, 0L),
(pair: (Int, Long), value: Int) =>
(pair._1 + value, pair._2 + 1L),
(pair1: (Int, Long), pair2: (Int, Long)) =>
(pair1._1 + part2._1, pair2._2 + pair2._2)
)
val averages: RDD[String, Double] = sumCountPairs.mapValues {
case (sum, 0L) => 0D
case (sum, count) => sum.toDouble / count
}
C라는 유형의 매개 변수는 임의이기 때문에testData의 유형에서 직접 유도할 수 없기 때문에 명확하게 지정해야 한다.단지 제목의 예는 가장 간단하게 Reduce By Key로 해결할 수 있는 상황이다. 즉 V와 C가 완전히 같기 때문에 차이를 볼 수 없다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수
1. socket 단어 수 통계
TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다.
2. maven 설정
3. 프로그래밍 코드
입력 내용
결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.
val testData = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
val testDataCombine = testData.combineByKey(x=>x,(x:Int,y:Int)=>x+y,(x:Int,y:Int)=>x+y)
val testDataReduce = testData.reduceByKey((x,y)=>x+y)
combineByKey
x ,y Int , "+" ,
reduceByKey 。
class PairRDDFunctions[K, V](...) {
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
}
val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
(_: Int) => (0, 0L),
(pair: (Int, Long), value: Int) =>
(pair._1 + value, pair._2 + 1L),
(pair1: (Int, Long), pair2: (Int, Long)) =>
(pair1._1 + part2._1, pair2._2 + pair2._2)
)
val averages: RDD[String, Double] = sumCountPairs.mapValues {
case (sum, 0L) => 0D
case (sum, count) => sum.toDouble / count
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.