Spark의 RDD 일반 작업
67470 단어 Spark
1. RDD란 무엇인가?
Resilient Distributed Datasets(resilient의 분산 데이터 세트)
RDDs are immutable, fault-tolerant,parallel data structures that let users explicitly persist intermediate results in memory,control their partitioning to optimize data placement, and manipulate them using a rich set of operators. (개인적으로는 영어를 아주 분명하게 썼으니 번역할 필요가 없다)
2. RDD의 특징
3. RDD 작업
transformation vs action ?
In short, RDDs are immutable, RDD transformations are lazily evaluated, and RDD actions are eagerly evaluated and trigger the computation of your data processing logic.(개인적으로는 영어를 아주 분명하게 썼으니 번역할 필요가 없다).
4. RDD 생성 방법
전반적으로 세 가지가 있어요.
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("create RDD by Array Object");
conf.set("spark.testing.memory", "2147480000");
val str_list = Array[String]("How to create RDD", "funny game","Spark is cool!");
val sc = new SparkContext(conf);
val RDD_str = sc.parallelize(str_list,2);
print(RDD_str.count());
}
사례 2.HDFS에서 RDD 가져오기
def main(args: Array[String]): Unit = {
// conf
val sparkConf = new SparkConf()
.setAppName("WordCount sample")
// .setMaster("192.168.1.10:4040")
.setMaster("local[2]")
.set("spark.testing.memory", "2147480000");
val sc = new SparkContext(sparkConf);
val rdd = sc.textFile("/user/hadoop/worddir/word.txt");
val tupleRDD = rdd.flatMap(line => {line.split(" ")
.toList.map(word => (word.trim,1))
});
val resultRDD :RDD[(String,Int)] =tupleRDD.reduceByKey((a,b)=> a + b);
resultRDD.foreach(elm => println(elm._1+"="+elm._2));
Thread.sleep(10000);
sc.stop();
}
사례 3.다른 RDD에서 RDD 가져오기
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setAppName("Get RDD from existed RDD");
conf.setMaster("local[2]");
conf.set("spark.testing.memory","2147480000");
//conf.set("","")
val str_array = Array("one","two","three");
val sc = new SparkContext(conf);
val RDD_list = sc.parallelize(str_array,2);
val RDD_list1 = RDD_list.map(l=>{l+" ******"});
RDD_list1.foreach(elm=>{println(elm)});
}
5. RDD transformation 작업
일반적으로 사용되는 몇 가지 RDD 작업은 다음과 같습니다.
이름:
묘사
map(func)
this applies the provided function to each row as iterating through the rows in the dataset. the returned rDD will contain whatever the provided func returns
flatMap(func)
similar to map(func), the func should return a collection rather than a single element, and this method will flatten out the returned collection. this allows an input item to map to zero or more output items.
filter(func)
Only the elements that the func function returns true will be collected in the returned rDD. in other words, collect only the rows that meet the condition defined in the given func function.
mapPartitions(func)
similar to map(func), but this applies at the partition (chunk) level. this requires the func function to take the input as an iterator to iterate through each row in the partition.
mapParitionsWithIndex(func)
this is similar to mapPartitions, but an additional partition index number is provided to the func function.
mapParitionsWithIndex(func)
this is similar to mapPartitions, but an additional partition index number is provided to the func function.
union(otherRDD)
this is similar to mapPartitions, but an additional partition index number is provided to the func function.
intersection(otherRDD)
Only the rows that exist in both the source rDD and Only the rows that exist in both the source rDD and
substract(otherRDD)
this subtracts the rows in otherRDD from the source rDD.
distinct([numTasks])
this removes duplicate rows from the source rDD.
sample(withReplace, fraction,seed)
this is usually used to reduce a large dataset to a smaller one by randomly selecting a fraction of rows using the given seed and with or without replacements.
사례 1.map (func): 집합의 줄마다 교체합니다. 되돌아오는 RDD 형식은func의 논리를 포함합니다.
object RDDTest08 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("RDD map transformation");
conf.set("spark.testing.memory","2147480000");
// define class Person
case class Person(id:Int,name:String,phoneNum:String);
val personArray = Array("1#Jason#1233242432","2#Mike#1232131312");
val sc = new SparkContext(conf);
val personRDD = sc.parallelize(personArray);
val personObjectRDD = personRDD.map(person=>{
val personInfo = person.split("#");
Person(personInfo(0).toInt,personInfo(1),personInfo(2));
})
personObjectRDD.collect.foreach(println);
}
결과 내보내기
Person(1,Jason,1233242432)
Person(2,Mike,1232131312)
사례 2.flatMap(func): 맵과 같은 동작으로 줄마다 되돌아오는 대상을 다시 편평하게 합니다
***map vs flatMap
예를 들어string 형식의collection array("this is one", "this is two") 역시 공백으로 문자열을 분할하여 되돌아오는 결과가 다르다
val strArray = Array("this is one","this is two");
val sc = new SparkContext(conf);
val strRDD = sc.parallelize(strArray);
val resultRDD = strRDD.map(line=>{
line.split(" ")
})
val array:Array[Array[String]] = resultRDD.collect();
array.foreach(print);
flatMap이 반환하는 유형은 Array[string]입니다.
val strArray = Array("this is one","this is two");
val sc = new SparkContext(conf);
val strRDD = sc.parallelize(strArray);
val resultRDD = strRDD.flatMap(line=>{
line.split(" ")
})
val array:Array[String] = resultRDD.collect();
array.foreach(print);
사례 3.filter(func): 행 기반 필터링, RDD 구조로 반환
val strArray = Array("this is one","this is two");
val sc = new SparkContext(conf);
val strRDD = sc.parallelize(strArray);
val resultRDD = strRDD.filter(line=>{line.contains("two")});
resultRDD.foreach(print);
사례 4.mapPartitions(func):맵을 파티션 처리
case class Person(id:Int,name:String,phoneNum:String);
val personArray = Array("1#Jason#1233242432","2#Mike#1232131312","3#James#01902992888","4#Tom#1231232222");
val sc = new SparkContext(conf);
val personRDD = sc.parallelize(personArray,2);
val personObjectRDD = personRDD.mapPartitions((iter: Iterator[String])=> {
iter.map(person=>{
val personInfo = person.split("#");
Person(personInfo(0).toInt,personInfo(1),personInfo(2))
});
});
personObjectRDD.collect.foreach(println);
사례 5.mapPartitionsWithIndex: 맵을 섹션 처리하기 (섹션 키 포함)
case class Person(id:Int,name:String,phoneNum:String,key:Int);
val personArray = Array("1#Jason#1233242432","2#Mike#1232131312","3#James#01902992888","4#Tom#1231232222");
val sc = new SparkContext(conf);
val personRDD = sc.parallelize(personArray,2);
val personObjectRDD = personRDD.mapPartitionsWithIndex((idx:Int,iter: Iterator[String])=> {
iter.map(person=>{
val personInfo = person.split("#");
Person(personInfo(0).toInt,personInfo(1),personInfo(2),idx);
});
});
personObjectRDD.collect.foreach(println);
4개의 개인 개체에 대한 partition 조회: Jason과 Mike는 partiton 0, James와 Tom은 partiton 1
Person(1,Jason,1233242432,0)
Person(2,Mike,1232131312,0)
Person(3,James,01902992888,1)
Person(4,Tom,1231232222,1)
사례 6.union(otherRDD): 통합 작업기존 SQL의 union과 달리 이 작업은 무겁지 않습니다.
val intArray1 = Array(0,1,3,5,7,9);
val intArray2 = Array(0,2,4,6,8,10);
val sc = new SparkContext(conf);
val intRDD1 = sc.parallelize(intArray1);
val intRDD2 = sc.parallelize(intArray2);
val unionRDD = intRDD1.union(intRDD2);
println(unionRDD.collect().toList);
결과 내보내기
List(0, 1, 3, 5, 7, 9, 0, 2, 4, 6, 8, 10)
사례 6.intersection(otherRDD):교차 작업
val sc = new SparkContext(conf);
val strRDD1 = sc.parallelize(Array("one","two","three"));
val strRDD2 = sc.parallelize(Array("two","three"));
val intersectionRDD = strRDD1.intersection(strRDD2);
println(intersectionRDD.collect().toList);
결과 내보내기
List(two, three)
사례 7.substract(otherRDD):차집합 작업
val sc = new SparkContext(conf);
val strRDD1 = sc.parallelize(Array("one","two","three"));
val strRDD2 = sc.parallelize(Array("two","three"));
val subtractRDD = strRDD1.subtract(strRDD2);
println(subtractRDD.collect().toList);
결과 내보내기
List(one)
사례 8.distinct () 중복된 값 제거
val sc = new SparkContext(conf);
val duplicatedRDD = sc.parallelize(List("one",1,"two",2,"three",3,"four","four"));
print(duplicatedRDD.distinct().collect().toList);
결과 내보내기
List(two, one, three, four, 2, 1, 3)
사례 9.sample(withReplacement, fraction, seed)
이 transform 은 분석할 데이터의 통계 정보를 나타냅니다.
withReplacement 필수 옵션:true 는 중복 값을 사용할 수 있음을 나타내고false 는 중복 값을 사용할 수 없음을 나타냅니다.
fraction 필수 옵션: 0-1 사이에서 Sample의 백분율을 얻습니다
seed 옵션: 무작위 값을 추출하는 함수를 정의합니다. 참고하지 않으면 기본적으로 하나를 지정합니다.
val sc = new SparkContext(conf);
val intArray= sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10));
print(intArray.sample(false,0.2).collect().toList);
결과 내보내기
List(1, 4)
6. RDD action 작업
사례 1.collect (): RDD를 집합으로 변환
*** 만약 당신의 결과가 너무 크면 out-of-memory를 일으킬 수 있으니 주의하십시오
***결과 집합 세척 여과 후 변환하는 것이 좋습니다
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).collect().toList);
결과 내보내기
List(1, 2, 3, 4, 5)
사례 2.count(): 카운트
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).count());
결과 내보내기
5
사례 3.first (): 첫 번째
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).first());
결과 내보내기
1
사례 4.take(n): N 요소 가져오기
val sc = new SparkContext(conf);
sc.parallelize(Array(1,2,3,4,5)).take(3).foreach(print);
결과 내보내기
123
사례 6.reduce(func): 데이터 처리 병합
val sc = new SparkContext(conf);
val sumRDD = sc.parallelize(Array(1,2,3,4,5)).reduce((a1:Int,a2:Int)=>{a1+a2});
print(sumRDD.toInt);
결과 내보내기
15
사례 7.takeSample(withReplacement, n, [seed])
transform의 Sample과 유사
val sc = new SparkContext(conf);
print(sc.parallelize(Array(1,2,3,4,5)).takeSample(false,2).toList);
결과 내보내기
List(1, 5)
사례 8.takeOrdered(n, [ordering])
출력 정렬
val sc = new SparkContext(conf);
sc.setLogLevel("ERROR");
println(sc.parallelize(Array(1,2,3,4,5)).takeOrdered(3) (Ordering[Int].reverse).toList);
println(sc.parallelize(Array(1,2,3,4,5)).takeOrdered(3).toList);
결과 내보내기
List(5, 4, 3)
List(1, 2, 3)
사례 9.top(n, [ordering])
top N 요소 내보내기
val sc = new SparkContext(conf);
sc.setLogLevel("ERROR");
println(sc.parallelize(Array(1,2,3,4,5)).top(3).toList);
println(sc.parallelize(Array(1,2,3,4,5)).top(3)(Ordering[Int]).toList);
결과 내보내기
List(5, 4, 3)
List(5, 4, 3)
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.