Spark 10 대 상용 RDD API (소 백 에 적합)

40938 단어 spark
0. RDD 는 무엇 입 니까?
RDD 는 Spark 의 추상 적 인 데이터 구조 유형 으로 모든 데 이 터 는 Spark 에서 RDD 로 표 시 됩 니 다.프로 그래 밍 의 측면 에서 볼 때 RDD 는 간단하게 하나의 배열 로 볼 수 있다.일반 배열 과 의 차 이 는 RDD 의 데 이 터 는 파 티 션 에 저 장 된 것 으로 서로 다른 파 티 션 의 데 이 터 는 서로 다른 기계 에 분포 되 고 동시에 병행 처 리 될 수 있다 는 것 이다.따라서 Spark 응용 프로그램 은 처리 해 야 할 데 이 터 를 RDD 로 변환 한 다음 에 RDD 에 대해 일련의 변환 과 조작 을 해서 결 과 를 얻 는 것 이 아니다.
1. RDD 만 들 기 (주로 다음 두 가지 방법 으로 RDD 만 들 기)
방식 1: 일반 배열 (List 와 Array) 에서 RDD 만 들 기
collect () RDD 의 모든 요 소 를 되 돌려 줍 니 다.
scala> val a =sc.parallelize(1 to 9,3)
/*     1 9 9   ,     3    。*/
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24

/* List */

scala>val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println(r01.collect().mkString(","))
1,4,9,16,25,36

/* Array */

val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println(r02.collect().mkString(","))
1,2,3,4

방식 2: 파일 README. md 를 읽 어서 RDD 를 만 듭 니 다. 파일 의 줄 마다 RDD 의 요소 가 있 습 니 다.
scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[5] at textFile at :2

2. map 와 filter
map: 매개 변 수 는 함수 입 니 다. 함 수 는 RDD 의 모든 요소 에 적 용 됩 니 다. 반환 값 은 새로운 RDD 입 니 다.다시 말 하면 RDD 의 모든 요소 에 지정 한 함 수 를 실행 하여 새로운 RDD 를 만 드 는 것 이다.모든 원 RDD 의 요 소 는 새 RDD 에 있 고 하나의 요소 만 대응 합 니 다.
filter: 매개 변 수 는 함수 입 니 다. 함 수 는 조건 에 맞지 않 는 요 소 를 걸 러 냅 니 다. 반환 값 은 새로운 RDD 입 니 다.
eg1: a 의 모든 요소 에 2 를 곱 한 다음 정렬 합 니 다.
scala> val a = sc.parallelize(List(1,2,3,4,5))
scala> val b = a.map(_*3).collect
b: Array[Int] = Array(3, 6, 9, 12, 15)

/*true     ,false     */
scala> val c = a.map(_*2).sortBy(x =>x,true).collect
c: Array[Int] = Array(2, 4, 6, 8, 10)
scala> val d = a.map(_*2).sortBy(x =>x,false).collect
d: Array[Int] = Array(10, 8, 6, 4, 2)

ex2: 20 이상 의 수 와 짝수 걸 러 내기
scala> val a = sc.parallelize(List(1,22,100,5,56)).filter(_>20).collect
a: Array[Int] = Array(22, 100, 56)

scala> val a = sc.parallelize(List(1,22,100,5,55)).filter(_%2==0).collect
a: Array[Int] = Array(22, 100)

3. map 와 flatMap
맵 과 유사 합 니 다. 차이 점 은 원래 RDD 의 요 소 는 맵 처 리 를 거 친 후에 하나의 요소 만 생 성 할 수 있 고 원래 RDD 의 요 소 는 flatmap 처 리 를 거 친 후에 여러 개의 요 소 를 생 성하 여 새로운 RDD 를 구축 할 수 있 습 니 다.
eg3: 맵 과 flatMap 의 차 이 를 자세히 느끼 세 요.
scala> val a = sc.parallelize(Array("china","is","Strong"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at :24

/* map */

scala> a.map(_.split(" ")).collect
res3: Array[Array[String]] = Array(Array(china), Array(is), Array(Strong))

/* flatMap       */

scala> val a = sc.parallelize(Array("china","is","Strong")).flatMap(_.split(" ")).collect
a: Array[String] = Array(china, is, Strong)

/* Map */

scala> val a = sc.parallelize(List(List("china","is","Strong"),List("will","be"," better")))
scala> val b =a.map(_.map(_.split(" "))).collect
b: Array[List[Array[String]]] = Array(List(Array(china), Array(is), Array(Strong)), List(Array(will), Array(be), Array("", better)))

/* flatMap */

scala> val c =a.map(_.flatMap(_.split(" "))).collect
c: Array[List[String]] = Array(List(china, is, Strong), List(will, be, "", better))

scala> val d =a.flatMap(_.flatMap(_.split(" "))).collect
d: Array[String] = Array(china, is, Strong, will, be, "", better)

eg: 원 RDD 의 모든 요소 x 에 y 개의 요 소 를 생 성 합 니 다 (1 부터 y, y 는 요소 x 의 값 입 니 다)
scala> val a = sc.parallelize(1 to 4, 2).flatMap(x=> 1 to x).collect
a: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

4.union,intersecttion,distinct,subtract
union: Performs the standard set operation: A union B 도 + 를 사용 할 수 있 습 니 다.
eg: 병합
scala> val b =sc.parallelize(1 to 3,1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at :24

scala> val a = sc.parallelize(4 to 9,1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at :24

scala> (b ++ a).collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> a.union(b).collect
res6: Array[Int] = Array(4, 5, 6, 7, 8, 9, 1, 2, 3)

/* intersecttion():   RDD,    RDD     */ 

scala> val a = sc.parallelize(List(1,2,3,4))
scala> val b = sc.parallelize(List(1,3,5,4))
scala> val c = a.intersection(b).collect
c: Array[Int] = Array(4, 1, 3)

/* distinct:    , RDD          */

scala> val d = a.union(b).collect
d: Array[Int] = Array(1, 2, 3, 4, 1, 3, 5, 4)

scala> val e = d.distinct
e: Array[Int] = Array(1, 2, 3, 4, 5)

/* subtract()    RDD,  RDD    RDD        */

scala> val c = a.subtract(b).collect
c: Array[Int] = Array(2)

/*cartesian()   RDD,   RDD     */

scala> val c = a . cartesian(b).collect
c: Array[(Int, Int)] = Array((1,1), (1,3), (1,5), (1,4), (2,1), (2,3), (2,5), (2,4), (3,1), (3,3), (3,5), (3,4), (4,1), (4,3), (4,5), (4,4))

5.sortBy
ex6: 정렬 (오름차 순 또는 내림차 순)
scala> val a = sc.parallelize(Array(("H","1"),("A","4"),("B","3"),("L","2")))
scala> a.sortBy(x=>x._1,true).collect
res0: Array[(String, String)] = Array((A,4), (B,3), (H,1), (L,2))
scala> a.sortBy(x=>x._2,true).collect
res1: Array[(String, String)] = Array((H,1), (L,2), (B,3), (A,4))

/*            */

scala> val a = sc.parallelize(List(1,2,4,0.1,0.2))
scala> a.sortBy(x=>"x",true).collect
res18: Array[Double] = Array(1.0, 2.0, 4.0, 0.1, 0.2)
/*          */
scala> a.sortBy(x=>x+"",true).collect
res19: Array[Double] = Array(0.1, 0.2, 1.0, 2.0, 4.0)

6. groupBy,groupByKey,reduceByKey,sortByKey
ex7: groupBy, groupByKey, reduceByKey, sortByKey 구별
/* groupBy */

scala> val a = sc.parallelize(1 to 9, 3)
scala> s.groupBy(x=>{if(x%2==0)"even" else "odd"}).collect
res22: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc(a: Int) : Int =
     | {
     |   a % 2
     | }
myfunc: (a: Int)Int

scala> a.groupBy(myfunc).collect
res24: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))

groupby Key 도 모든 key 를 조작 하지만 하나의 sequence 만 생 성 합 니 다.특별한 주의 가 필요 합 니 다: sequence 에 aggregation 작업 이 필요 하 다 면 (주의, groupbyKey 자체 가 조작 함 수 를 사용자 정의 할 수 없습니다) reduceByKey / aggregateByKey 를 선택 하 는 것 이 좋 습 니 다.이것 은 groupbyKey 가 함 수 를 사용자 정의 할 수 없 기 때문에 우 리 는 먼저 groupbyKey 로 RDD 를 생 성 한 후에 야 이 RDD 가 map 를 통 해 사용자 정의 함수 조작 을 할 수 있 습 니 다.
/*groupByKey*/

scala> val a = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
scala> val b = a.groupByKey().collect
rdd2: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(50, 90)), (class2,CompactBuffer(80, 70)))

/*reduceByKey*/

scala> val a = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
scala> val b = a.reduceByKey(_+_).collect /* key    */
rdd2: Array[(String, Int)] = Array((class1,140), (class2,150))

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect/* key    */
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

/*sortByKey*/

scala> val a = sc.parallelize(Array(("cat",2),("dog","3"),("cat",5),("dog","5")) )
scala> val b = a.sortByKey().collect
b: Array[(String, Any)] = Array((cat,2), (cat,5), (dog,3), (dog,5))

scala> b.foreach(score => println(score._1+":"+score._2))
cat:2
cat:5
dog:3
dog:5

7. join Join 은 SQL 과 유사 한 inner join 작업 입 니 다. 결 과 는 앞 과 뒤 집합 에서 짝 짓 기 에 성 공 했 습 니 다. 연결 되 지 않 은 것 을 걸 러 냅 니 다.
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

/* leftOuterJoin   SQL      left outer join,        RDD  ,          */

scala> b.leftOuterJoin(d).collect
res15: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

/*rightOuterJoin   SQL      right outer join,             RDD  ,         */

scala> b.rightOuterJoin(d).collect
res16: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

/* union :   RDD,      RDD      RDD*/

scala> b.union(d).collect
res19: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant), (3,dog), (3,cat), (3,gnu), (6,salmon), (6,rabbit), (6,turkey), (4,wolf), (4,bear), (3,bee))

8.reduce
reduce (): 모든 RDD 데 이 터 를 병렬 통합 합 니 다. 예 를 들 어 구 와 조작.
val a = sc.parallelize(1 to 100, 3)

/* 1 100  */
a.reduce(_ + _)
res41: Int = 5050

/*  a      */
scala> a.count
res23: Long = 100

/*      2 */
scala> a.top(2)
res24: Array[Int] = Array(100, 99)

/*  2 */
scala> a.take(2)
res25: Array[Int] = Array(1, 2)

/*    */
scala> a.first()
res26: Int = 1

/*  5     */
scala> a.takeOrdered(5)
res27: Array[Int] = Array(1, 2, 3, 4, 5)

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

/*reduceByKey*/

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

9.cogroup
cogroup: 여러 RDD 에서 같은 Key 에 대응 하 는 Value 를 조합 합 니 다.최대 네 개의 RDD 를 조합 할 수 있 습 니 다.
scala> val a = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
scala> val b = sc.parallelize(Array((1,100),(2,65),(3,99)))
scala> a.cogroup(b).collect
res33: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(100))), (3,(CompactBuffer(c),CompactBuffer(99))), (2,(CompactBuffer(b),CompactBuffer(65))))


val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c))),
(3,(ArrayBuffer(b),ArrayBuffer(c))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)

val d = a.map((_, "d"))
b.cogroup(c, d).collect
res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
)

scala> val a = sc.parallelize(List((1,"cat"),(3,"dog")))
scala> val b = sc.parallelize(List((1,"fish"),(3,"apple")))
scala> a.cogroup(b).collect
res40: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(cat),CompactBuffer(fish))), (3,(CompactBuffer(dog),CompactBuffer(apple))))

10.foreach(func)
foreach (func): RDD 요소 마다 특정 함 수 를 사용 합 니 다.
scala> val a =sc.parallelize(List("cat","dog"))
scala> a.foreach(x=>println(x+"sare yummy"))
catsare yummy
dogsare yummy

val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
b.foreachPartition(x => println(x.reduce(_ + _)))
6
15
24

val a = sc.parallelize(1 to 9, 3)
a.foreachWith(i => i)((x,i) => if (x % 2 == 1 && i % 2 == 0) println(x) )
1
3
7
9

좋은 웹페이지 즐겨찾기