spark 몇 가지 transformation 의 계산 논리 와 테스트

7682 단어 Spark
더 읽 기


         :
test01:
[java]view plaincopyprint? CODE              
a a  
b b  
c c  
d d  
e e  
f f  
g g  

test02:
[java]view plaincopy
print? CODE              
11
22
33
44
55
66
a a  
b b  
c c  
d d  
e e  
f f  


1、union(otherRDD)
      union()    rdd       , mysql  union             rdd,     partition    
spark sql   :
[java]view plaincopyprint? CODE              
./spark-shell  
sc  
val t01 = sc.textFile("hdfs://user/data_spark/test01")  
val t02 = sc.textFile("hdfs://user/data_spark/test02")  
t01.union(t01) foreach println  
  :  
a a  
e e  
b b  
a a  
f f  
b b  
c c  
c c  
g g  
d d  
d d  
e e  
f f  
g g  

    union,         ,  ,union        rdd        

2、groupByKey(numPartitions)
         RDD          ,org.apache.spark.rdd.PairRDDFunctions   pairRdd      ;
           ,         key records     ,  mysql  groupby  ,  ShuffledRDD   partition fetch  ,shuffle       hashShuffle,spark1.1    sorted shuffle,    。shuffle      mapPartition()  ,  MapPartitionRDD。   groupbykey    。
         key      ,    value    arraylist,  arraylist   value
[java]view plaincopy
print? CODE              
val wc = t01.union(t01).flatMap(l=>l.split(" ")).map(w=>(w,1))  
wc foreach println  
  :  
(e,1)  
(e,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  
(g,1)  
(g,1)  
(a,1)  
(a,1)  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(d,1)  
(d,1)  
wc.groupByKey foreach println  
  :  
(d,CompactBuffer(1, 1, 1, 1))  
(g,CompactBuffer(1, 1, 1, 1))  
(c,CompactBuffer(1, 1, 1, 1))  
(b,CompactBuffer(1, 1, 1, 1))  
(f,CompactBuffer(1, 1, 1, 1))  
(e,CompactBuffer(1, 1, 1, 1))  
(a,CompactBuffer(1, 1, 1, 1))  
ok,groupByKey   ,    key          

3、reduceByKey(func,numPartition)
               mapreduce  reduce  ,    key    func   ,    wordcount   :
[java]view plaincopyprint? CODE              
map(x=>(x,1)).reduceByKey(_+_, 5)  
      reduceByKey    map  combine,   groupByKey      map  combine  ,        。
      
[java]view plaincopy
print? CODE              
wc.reduceByKey(_+_) foreach println  
  :  
(d,4)  
(b,4)  
(f,4)  
(g,4)  
(c,4)  
(e,4)  
(a,4)  

4、distinct(numPartitions)
        parent rdd      ,    numPartitions,     shuffle  ,   kv pair           shuffle   ,    key,  spark          shuffle。        reduceByKey()
[java]view plaincopyprint? CODE              
wc.distinct(1) foreach println  
  :  
(g,1)  
(b,1)  
(f,1)  
(d,1)  
(a,1)  
(e,1)  
(c,1)  

5、cogroup(otherRDD,numPartitions)
      groupByKey     ,cogroup     rdd        ,   groupByKey   .
                arraylist  arraylist,   rdd  value    arraylist,  ,   arraylist       arraylist arraylist。
[java]view plaincopy
print? CODE              
val wc01 = t01.flatMap(l=>l.split(" ")).map(w=>(w,1))  
val wc02 = t02.flatMap(l=>l.split(" ")).map(w=>(w,1))  
wc01.cogroup(wc02,1) foreach println  
  :  
(d,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(e,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(4,(CompactBuffer(),CompactBuffer(1, 1)))  
(5,(CompactBuffer(),CompactBuffer(1, 1)))  
(a,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(6,(CompactBuffer(),CompactBuffer(1, 1)))  
(b,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(2,(CompactBuffer(),CompactBuffer(1, 1)))  
(3,(CompactBuffer(),CompactBuffer(1, 1)))  
(f,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  
(1,(CompactBuffer(),CompactBuffer(1, 1)))  
(g,(CompactBuffer(1, 1),CompactBuffer()))  
(c,(CompactBuffer(1, 1),CompactBuffer(1, 1)))  


6、intersection(otherRDD)
              rdd       ,   rdd      ,    cogroup()  。
       ,   cogroup      ,   cogroup          ,       arraylist   arraylist,        arraylist     ,    key,       。
[java]view plaincopyprint? CODE              
wc01.intersection(wc02) foreach println  
  :  
(d,1)  
(e,1)  
(b,1)  
(f,1)  
(a,1)  
(c,1)  
    rdd      kv  

7、join(otherRDD, numPartitions)
        RDD[ K, V ]   sql  join    。  intersection,   cogroup  ,    MappedValuesRDD。
       Iterable[v1]   Iterable[v2]      ,    flat() ,  FlatMappedValuesRDD。
[java]view plaincopy
print? CODE              
wc01.join(wc02,1) foreach println  
  :  
(d,(1,1))  
(d,(1,1))  
(d,(1,1))  
(d,(1,1))  
(e,(1,1))  
(e,(1,1))  
(e,(1,1))  
(e,(1,1))  
(a,(1,1))  
(a,(1,1))  
(a,(1,1))  
(a,(1,1))  
(b,(1,1))  
(b,(1,1))  
(b,(1,1))  
(b,(1,1))  
(f,(1,1))  
(f,(1,1))  
(f,(1,1))  
(f,(1,1))  
(c,(1,1))  
(c,(1,1))  
(c,(1,1))  
(c,(1,1))  
  jion     mysql inner join,          

8、sortByKey(ascending,numPartitions)
      RDD [ k, v ]   key    ,  ascending=true    ,false    。
        shuffle        ,          key  
[java]view plaincopyprint? CODE              
wc01.sortByKey(true,1) foreach println  
  :  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  

9、cartesian(otherRDD)
          rdd     ,   CartesianRDD  partition     rdd partition     。
           join
     ,     ,               

10、coalesce(numPartitions, shuffle = false) 
          ,   rdd,    ,    shuffle,       partitions        ,   shuffle。
                      parentRDD  partition  。        partition    ,     locality   balance   
            :
[java]view plaincopy
print? CODE              
 wc01.coalesce(1) foreach println  
  :  
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:0+14
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
14/10/2717:23:42 INFO rdd.HadoopRDD: Input split: hdfs://qunarcluster/user/data_spark/test01:14+14
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  
 wc01.coalesce(2) foreach println  
  :  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(e,1)  
(d,1)  
(e,1)  
(d,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  

11、repartition(numPartitions)        
           coalesce(numPartitions, shuffle = true)
[java]view plaincopyprint? CODE              
wc01.repartition(1) foreach println  
(a,1)  
(a,1)  
(b,1)  
(b,1)  
(c,1)  
(c,1)  
(d,1)  
(d,1)  
(e,1)  
(e,1)  
(f,1)  
(f,1)  
(g,1)  
(g,1)  



  mapreduce     spark   map +  reduceByKey,    mapreduce  reduce       ,         ,  ,    。
  ,spark      


좋은 웹페이지 즐겨찾기