spark 몇 가지 transformation 의 계산 논리 와 테스트
7682 단어 Spark
:
test01:
[java]view plaincopyprint? CODE
a a
b b
c c
d d
e e
f f
g g
test02:
[java]view plaincopy
print? CODE
11
22
33
44
55
66
a a
b b
c c
d d
e e
f f
1、union(otherRDD)
union() rdd , mysql union rdd, partition
spark sql :
[java]view plaincopyprint? CODE
./spark-shell
sc
val t01 = sc.textFile("hdfs://user/data_spark/test01")
val t02 = sc.textFile("hdfs://user/data_spark/test02")
t01.union(t01) foreach println
:
a a
e e
b b
a a
f f
b b
c c
c c
g g
d d
d d
e e
f f
g g
union, , ,union rdd
2、groupByKey(numPartitions)
RDD ,org.apache.spark.rdd.PairRDDFunctions pairRdd ;
, key records , mysql groupby , ShuffledRDD partition fetch ,shuffle hashShuffle,spark1.1 sorted shuffle, 。shuffle mapPartition() , MapPartitionRDD。 groupbykey 。
key , value arraylist, arraylist value
[java]view plaincopy
print? CODE
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))
wc foreach println
:
(e,1)
(e,1)
(e,1)
(e,1)
(f,1)
(f,1)
(f,1)
(f,1)
(g,1)
(g,1)
(g,1)
(g,1)
(a,1)
(a,1)
(a,1)
(a,1)
(b,1)
(b,1)
(b,1)
(b,1)
(c,1)
(c,1)
(c,1)
(c,1)
(d,1)
(d,1)
(d,1)
(d,1)
wc.groupByKey foreach println
:
(d,CompactBuffer(1, 1, 1, 1))
(g,CompactBuffer(1, 1, 1, 1))
(c,CompactBuffer(1, 1, 1, 1))
(b,CompactBuffer(1, 1, 1, 1))
(f,CompactBuffer(1, 1, 1, 1))
(e,CompactBuffer(1, 1, 1, 1))
(a,CompactBuffer(1, 1, 1, 1))
ok,groupByKey , key
3、reduceByKey(func,numPartition)
mapreduce reduce , key func , wordcount :
[java]view plaincopyprint? CODE
map(x=>(x,1)).reduceByKey(_+_, 5)
reduceByKey map combine, groupByKey map combine , 。
[java]view plaincopy
print? CODE
wc.reduceByKey(_+_) foreach println
:
(d,4)
(b,4)
(f,4)
(g,4)
(c,4)
(e,4)
(a,4)
4、distinct(numPartitions)
parent rdd , numPartitions, shuffle , kv pair shuffle , key, spark shuffle。 reduceByKey()
[java]view plaincopyprint? CODE
wc.distinct(1) foreach println
:
(g,1)
(b,1)
(f,1)
(d,1)
(a,1)
(e,1)
(c,1)
5、cogroup(otherRDD,numPartitions)
groupByKey ,cogroup rdd , groupByKey .
arraylist arraylist, rdd value arraylist, , arraylist arraylist arraylist。
[java]view plaincopy
print? CODE
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))
wc01.cogroup(wc02,1) foreach println
:
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(4,(CompactBuffer(),CompactBuffer(1, 1)))
(5,(CompactBuffer(),CompactBuffer(1, 1)))
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(6,(CompactBuffer(),CompactBuffer(1, 1)))
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(2,(CompactBuffer(),CompactBuffer(1, 1)))
(3,(CompactBuffer(),CompactBuffer(1, 1)))
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
(1,(CompactBuffer(),CompactBuffer(1, 1)))
(g,(CompactBuffer(1, 1),CompactBuffer()))
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))
6、intersection(otherRDD)
rdd , rdd , cogroup() 。
, cogroup , cogroup , arraylist arraylist, arraylist , key, 。
[java]view plaincopyprint? CODE
wc01.intersection(wc02) foreach println
:
(d,1)
(e,1)
(b,1)
(f,1)
(a,1)
(c,1)
rdd kv
7、join(otherRDD, numPartitions)
RDD[ K, V ] sql join 。 intersection, cogroup , MappedValuesRDD。
Iterable[v1] Iterable[v2] , flat() , FlatMappedValuesRDD。
[java]view plaincopy
print? CODE
wc01.join(wc02,1) foreach println
:
(d,(1,1))
(d,(1,1))
(d,(1,1))
(d,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(e,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(a,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(b,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(f,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
(c,(1,1))
jion mysql inner join,
8、sortByKey(ascending,numPartitions)
RDD [ k, v ] key , ascending=true ,false 。
shuffle , key
[java]view plaincopyprint? CODE
wc01.sortByKey(true,1) foreach println
:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
9、cartesian(otherRDD)
rdd , CartesianRDD partition rdd partition 。
join
, ,
10、coalesce(numPartitions, shuffle = false)
, rdd, , shuffle, partitions , shuffle。
parentRDD partition 。 partition , locality balance
:
[java]view plaincopy
print? CODE
wc01.coalesce(1) foreach println
:
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
wc01.coalesce(2) foreach println
:
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(e,1)
(d,1)
(e,1)
(d,1)
(f,1)
(f,1)
(g,1)
(g,1)
11、repartition(numPartitions)
coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint? CODE
wc01.repartition(1) foreach println
(a,1)
(a,1)
(b,1)
(b,1)
(c,1)
(c,1)
(d,1)
(d,1)
(e,1)
(e,1)
(f,1)
(f,1)
(g,1)
(g,1)
mapreduce spark map + reduceByKey, mapreduce reduce , , , 。
,spark
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.