Spark 핵심 프로 그래 밍 - RDD 변환 작업
21423 단어 빅 데이터/스파크/스파크Core
public static void mapTest(JavaSparkContext sc) {
List words = Arrays.asList("hello", "world");
JavaRDD wordsRDD = sc.parallelize(words);
// map
JavaRDD> wordCountRDD = wordsRDD
.map(new Function>() {
private static final long serialVersionUID = 4883828149185152684L;
public Tuple2 call(String v1) throws Exception {
return new Tuple2(v1, 1);
}
});
// ,
wordCountRDD.foreach(new VoidFunction>() {
private static final long serialVersionUID = 4892545561975184834L;
public void call(Tuple2 t) throws Exception {
System.out.println(" :" + t._1 + ", :" + t._2);
}
});
}
public static void distinctTest(JavaSparkContext sc) {
List nums = Arrays.asList(1, 2, 2, 3, 5);
JavaRDD numsRDD = sc.parallelize(nums);
// distinct
JavaRDD distinceNumsRDD = numsRDD.distinct();
distinceNumsRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 647204360041943265L;
public void call(Integer t) throws Exception {
System.out.println(t);
}
});
}
public static void flatMapTest(JavaSparkContext sc) {
List words = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD wordsRDD = sc.parallelize(words, 3);
JavaRDD splitedRDD = wordsRDD.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = 840597214907231645L;
public Iterable call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
splitedRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 4032309929532415386L;
public void call(String t) throws Exception {
System.out.println(" :"+t);
}
});
}
public static void repartitionTest(JavaSparkContext sc) {
List words = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD wordsRDD = sc.parallelize(words, 3);
System.out.println(" :" + wordsRDD.partitions().size());
JavaRDD coalesce1RDD = wordsRDD.coalesce(2);
System.out.println(" 2 :"+coalesce1RDD.partitions().size());
JavaRDD coalesce2RDD = wordsRDD.coalesce(4);
System.out.println(" 4 shuffle :"+coalesce2RDD.partitions().size());
JavaRDD coalesce3RDD = wordsRDD.coalesce(4,true);
System.out.println(" 4 shuffle :"+coalesce3RDD.partitions().size());
JavaRDD repartitionRDD = wordsRDD.repartition(4);
System.out.println("repartition :" + repartitionRDD.partitions().size());
}
public static void randomSplit(JavaSparkContext sc) {
List nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD numsRDD = sc.parallelize(nums, 10);
// 1
double[] weights = new double[] {0.1,0.2,0.3,0.4};
JavaRDD[] RDDs = numsRDD.randomSplit(weights);
int index = 1;
for (JavaRDD rdd : RDDs) {
System.out.println(" "+(index++)+" RDD");
rdd.foreach(new VoidFunction() {
private static final long serialVersionUID = 510345733961440792L;
public void call(Integer t) throws Exception {
System.out.print(t+" ");
}
});
}
}
public static void glomTest(JavaSparkContext sc) {
List nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD numsRDD = sc.parallelize(nums, 3);
JavaRDD> glomRDD = numsRDD.glom();
glomRDD.foreach(new VoidFunction>() {
private static final long serialVersionUID = -3717022904720402200L;
public void call(List t) throws Exception {
System.out.println(t.toString());
}
});
}
public static void unionTest(JavaSparkContext sc) {
List num1s = Arrays.asList(1,2,3,4,5);
List num2s = Arrays.asList(2,3,4,6,7);
JavaRDD num1sRDD = sc.parallelize(num1s);
JavaRDD num2sRDD = sc.parallelize(num2s);
JavaRDD unionRDD = num1sRDD.union(num2sRDD);
System.out.println("union( ) :"+unionRDD.collect().toArray());
JavaRDD intersectionRDD = num1sRDD.intersection(num2sRDD);
System.out.println("intersection( ) :"+intersectionRDD.collect().toArray());
JavaRDD subtractRDD = num1sRDD.subtract(num2sRDD);
System.out.println("subtract( ) :"+subtractRDD.collect().toArray());
}
맵 작업 과 유사 합 니 다. 맵 의 매개 변 수 는 RDD 의 모든 요소 에서 RDD 의 모든 파 티 션 의 교체 기 가 되 었 습 니 다. 그 중에서 preservers Partitions 는 부모 RDD 의 paritions 파 티 션 정 보 를 보존 할 지 여 부 를 표시 합 니 다.맵 과정 에서 자주 생 성 되 는 추가 대상 이 필요 하 다 면 맵 파 티 션 스 를 사용 하 는 것 이 맵 을 조작 하 는 것 보다 효율 적 입 니 다. 예 를 들 어 RDD 의 모든 데 이 터 는 JDBC 를 통 해 데이터 베 이 스 를 기록 합 니 다. 맵 함 수 를 사용 하면 모든 요소 에 연결 을 만 들 수 있 고 맵 파 티 션 스 를 사용 하면 각 파 티 션 에 연결 만 만들어 야 합 니 다.그러나 mapPartitions 는 큰 대상 에 적용 되 지 않 습 니 다. 메모리 에 한꺼번에 불 러 오 면 메모리 가 넘 치기 쉽 기 때 문 입 니 다.
public static void mapPartitionsTest(JavaSparkContext sc) {
List nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD numsRDD = sc.parallelize(nums, 3);
//
JavaRDD totalRDD = numsRDD
.mapPartitions(new FlatMapFunction, Integer>() {
private static final long serialVersionUID = 8064196197615893270L;
public Iterable call(Iterator t) throws Exception {
Integer total = 0;
while (t.hasNext()) {
total = total + t.next();
}
return Arrays.asList(total);
}
});
System.out.println(totalRDD.collect().toArray());
}
mapPartitions 와 유사 합 니 다. 파 라 메 터 를 입력 하면 파 티 션 색인 이 하나 더 생 깁 니 다.
public static void mapPartitionsWithIndexTest(JavaSparkContext sc) {
List nums = Arrays.asList("1", "2", "3", "4", "5");
JavaRDD numsRDD = sc.parallelize(nums, 2);
JavaRDD totalRDD = numsRDD
.mapPartitionsWithIndex(new Function2, Iterator>() {
private static final long serialVersionUID = 2818512306761968511L;
public Iterator call(Integer index, Iterator nums) throws Exception {
StringBuilder builder = new StringBuilder();
while (nums.hasNext()) {
builder.append(nums.next() + "、");
}
return Arrays.asList(" :" + index + ", :" + builder.toString()).iterator();
}
}, false);
System.out.println(totalRDD.collect());
}
val rdd1 = sc.makeRDD(1 to 5,2)
val rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd1.zip(rdd2).collect
//
res14: Array[(Int, String)] = Array((1,A), (2,B),(3,C), (4,D), (5,E))
val rdd3 = sc.makeRDD(Seq("A","B","C"),3)
rdd1.zip(rdd3).collect
// RDD , 。(Can't zip RDDs with unequal numbers of partitions:List(2,3))
public static void zipWithIndexTest(JavaSparkContext sc) {
List strs = Arrays.asList("a", "b", "c", "d", "e");
JavaRDD strsRDD = sc.parallelize(strs, 2);
JavaPairRDD pairRDD = strsRDD.zipWithIndex();
System.out.println(pairRDD.collect());
}
이 유일한 ID 생 성 알고리즘 은 다음 과 같 습 니 다. (1) 각 파 티 션 의 첫 번 째 요소 의 유일한 ID 값 은: 이 파 티 션 색인 번호 입 니 다.(2) 각 파 티 션 에서 N 번 째 요소 의 유일한 ID 값 은: 이전 요소 의 유일한 ID 값 + 이 RDD 의 전체 파 티 션 수 입 니 다.
그 중에서 zipWith Index 는 각 구역 의 시작 색인 번 호 를 계산 하기 위해 Spark 작업 을 시작 해 야 하 며, zipWith UniqueId 는 필요 하지 않 습 니 다.
public static void zipWithUniqueIdTest(JavaSparkContext sc) {
List strs = Arrays.asList("a", "b", "c", "d", "e");
JavaRDD strsRDD = sc.parallelize(strs, 2);
JavaPairRDD pairRDD = strsRDD.zipWithUniqueId();
System.out.println(pairRDD.collect());
}
2. 키 값 변환 작업
public static void mapValuesTest(JavaSparkContext sc) {
List strs = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD strsRDD = sc.parallelize(strs);
JavaRDD splitedRDD = strsRDD.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = -984130321206766818L;
public Iterable call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
JavaPairRDD pairRDD = splitedRDD
.mapToPair(new PairFunction() {
private static final long serialVersionUID = 2043541493697396334L;
public Tuple2 call(String t) throws Exception {
return new Tuple2(1, t);
}
});
JavaPairRDD resultRDD = pairRDD.mapValues(new Function() {
private static final long serialVersionUID = 924538234523756151L;
public String call(String value) throws Exception {
return value.toUpperCase();
}
});
System.out.println(resultRDD.collect());
}
reduceByKey: RDD [K, V] 의 모든 K 에 대응 하 는 V 값 을 매 핑 함수 에 따라 계산 하 는 데 사 용 됩 니 다. 솔직히 같은 key 의 value 를 reduce 작업 을 합 니 다. 내부 에서 사실은 combineByKey 를 호출 하고 numPartitions 는 지정 한 파 티 션 에 사 용 됩 니 다.reduceByKey Locally 는 연산 결 과 를 RDD 가 아 닌 맵 에 표시 합 니 다.
public static void reduceByKeyTest(JavaSparkContext sc) {
List strs = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD strsRDD = sc.parallelize(strs);
JavaRDD splitedRDD = strsRDD.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = -984130321206766818L;
public Iterable call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
JavaPairRDD pairRDD = splitedRDD
.mapToPair(new PairFunction() {
private static final long serialVersionUID = 2043541493697396334L;
public Tuple2 call(String t) throws Exception {
return new Tuple2(t, 1);
}
});
JavaPairRDD resultRDD = pairRDD
.reduceByKey(new Function2() {
private static final long serialVersionUID = 4852162726837426718L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(resultRDD.collect());
}
groupbyKey: RDD [K, V] 의 각 K 에 대응 하 는 V 값 을 하나의 집합 Iterable [V] 에 합 치 는 데 사용 합 니 다. 즉, key 에 따라 그룹 을 나 누 는 것 입 니 다.
public static void groupByKeyTest(JavaSparkContext sc) {
List strs = Arrays.asList("spark core", "spark sql", "spark streaming");
JavaRDD strsRDD = sc.parallelize(strs);
JavaRDD splitedRDD = strsRDD.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = -984130321206766818L;
public Iterable call(String t) throws Exception {
return Arrays.asList(t.split(" "));
}
});
JavaPairRDD pairRDD = splitedRDD
.mapToPair(new PairFunction() {
private static final long serialVersionUID = 2043541493697396334L;
public Tuple2 call(String t) throws Exception {
return new Tuple2(t, 1);
}
});
JavaPairRDD> resultRDD = pairRDD.groupByKey();
System.out.println(resultRDD.collect());
}
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A",1),("B",2),("C",3)),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("D","d")),2)
val rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2)
val rdd4 = rdd1.cogroup(rdd2,rdd3).collect
/**
* Array[(String, (Iterable[Int],Iterable[String], Iterable[String]))]
* =Array((B,(CompactBuffer(2),CompactBuffer(b),CompactBuffer())),
*(D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())),
* (A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))),
*(C,(CompactBuffer(3),CompactBuffer(),CompactBuffer())),
*(E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))
*/
내부 연결 은 cogroup 을 기반 으로 이 루어 집 니 다. 두 RDD 사이 의 똑 같은 key 를 연결 하고 서로 다른 것 을 버 립 니 다.
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.join(rdd2).collect
//Array[(String, (String, String))] = Array((B,(2,b)), (A,(1,a)))
왼쪽 외부 연결 은 cogroup 기반 으로 이 루어 지 며 왼쪽 RDD 의 key 를 기준 으로 연결 되 며 다른 RDD 가 없 으 면 None 입 니 다.
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.leftOuterJoin(rdd2).collect
// Array[(String,(String, Option[String]))] = Array((B,(2,Some(b))), (A,(1,Some(a))),(C,(3,None)))
오른쪽 외부 연결 은 cogroup 기반 으로 이 루어 지 며 오른쪽 RDD 의 key 를 기준 으로 연결 되 며 다른 RDD 가 없 으 면 None 입 니 다.
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.rightOuterJoin(rdd2).collect
//Array[(String, (Option[String], String))] = Array((B,(Some(2),b)),(F,(None,f)), (A,(Some(1),a)))
모든 연결 은 cogroup 을 기반 으로 이 루어 집 니 다. 두 RDD 의 모든 키 값 쌍 은 연결 이 필요 합 니 다. 다른 쪽 이 없 으 면 None 입 니 다.
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.fullOuterJoin(rdd2).collect
//Array[(String, (Option[String], Option[String]))] =Array((B,(Some(2),Some(b))),
// (F,(None,Some(f))),(A,(Some(1),Some(a))), (C,(Some(3),None)))
첫 번 째 RDD 와 두 번 째 RDD 의 차 집, 즉 첫 번 째 RDD 가 두 번 째 RDD 에 존재 하지 않 는 요소, 예 를 들 어 {1, 2, 3, 5} 과 {1, 2, 4} 은 1, 2 가 두 번 째 집합 에 있 기 때문에 되 돌아 오지 않 습 니 다. 3 과 5 는 두 번 째 집합 에 없어 서 돌아 갑 니 다.
val sparkConf = new SparkConf().setAppName("Client Main").
setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("F","f")),2)
rdd1.subtractByKey(rdd2).collect
//Array[(String, String)] = Array((C,3))
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming - OrdCount 프로그램텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.