Spark 연습 의 Transformation 조작 개발
1. map: 집합 중의 모든 요 소 를 2 로 곱 합 니 다.
1.1 Java
/**
* map : 2
*/
private static void map() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List numbers = Arrays.asList(1, 2, 3, 4, 5);
// , RDD
JavaRDD numberRDD = sc.parallelize(numbers);
// map , 2
//map , RDD,
// Java ,map Function
// Function , , ,
// call() ,
// call() , RDD ,
// RDD
JavaRDD multipleNUmberRDD = numberRDD.map(new Function() {
private static final long serivalVersionUID = 1L;
// call , 1,2,3,4,5
// 2,4,6,8,10
@Override
public Integer call(Integer integer) throws Exception {
return integer * 2;
}
});
// RDD
multipleNUmberRDD.foreach(new VoidFunction() {
private static final long serivalVersionUID = 1L;
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
// JavaSparkContext
sc.close();
}
1.2 Scala
def map(): Unit = {
val conf = new SparkConf().setAppName("map").setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1, 2, 3, 4, 5)
val numberRDD = sc.parallelize(numbers, 1)
val multipleNumberRDD = numberRDD.map(num => num * 2)
multipleNumberRDD.foreach(num => println(num))
}
2. filter: 집합 중의 짝수 를 걸 러 냅 니 다.
2.1 Java
/**
* filter :
*/
private static void filter() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("map")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// , RDD
JavaRDD numberRDD = sc.parallelize(numbers);
// RDD filter ,
//filter , Function, , map
// , , call() Boolean
// RDD , call() ,
//
// RDD , true, , false
JavaRDD evenNumberRDD = numberRDD.filter(new Function() {
private static final long serivalVersionUID = 1L;
// call , 1,2,3,4,5
// 2,4,6,8,10
@Override
public Boolean call(Integer integer) throws Exception {
return integer % 2 == 0;
}
});
// RDD
evenNumberRDD.foreach(new VoidFunction() {
private static final long serivalVersionUID = 1L;
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
// JavaSparkContext
sc.close();
}
2.2 Scala
def filter(): Unit = {
val conf = new SparkConf().setAppName("filter").setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numberRDD = sc.parallelize(numbers, 1)
val multipleNumberRDD = numberRDD.filter(num => num % 2 == 0)
multipleNumberRDD.foreach(num => println(num))
}
3. flatMap: 줄 을 단어 로 나 눕 니 다.
3.1 Java
/**
* flatMap :
*/
private static void flatMap() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List lineList = Arrays.asList("hello you", "hello me", "hello world");
// , RDD
JavaRDD lines = sc.parallelize(lineList);
// RDD flatMap , ,
//flatMap , Java , FlagMapFunction
// FlatMapFunction ,
//call() , , U, Iterable, U
//flatMap , RDD , ,
// , Iterator , ArrayList
JavaRDD words = lines.flatMap(new FlatMapFunction() {
private static final long serivalVersionUID = 1L;
@Override
public Iterator call(String s) throws Exception {
return (Iterator) Arrays.asList(s.split(" "));
}
});
// RDD
words.foreach(new VoidFunction() {
private static final long serivalVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
// JavaSparkContext
sc.close();
}
3.2 Scala
def flatMap(): Unit = {
val conf = new SparkConf().setAppName("flatMap").setMaster("local")
val sc = new SparkContext(conf)
val lineArray = Array("hello you", "hello me", "hello world")
val lines = sc.parallelize(lineArray, 1)
val words = lines.flatMap(line => line.split(" "))
words.foreach(word => println(word))
}
4. groupby Key: 각 반 의 성적 을 그룹 으로 나 눕 니 다.
4.1 Java
/**
* groupNyKey :
*/
private static void groupByKey() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List> scoresList = Arrays.asList(
new Tuple2<>("class1", 80),
new Tuple2<>("class2", 88),
new Tuple2<>("class1", 80),
new Tuple2<>("class2", 90));
// , JavaPairRDD
JavaPairRDD scores = sc.parallelizePairs(scoresList);
// Scores RDD, groupByKey ,
//groupByKey , JavaPairRDD
// ,JavaPairRDD , Iterable
// , Key , key value, value Iterable
// , groupedScores JavaPairRDD
JavaPairRDD> groupEdScores = scores.groupByKey();
// groupedScores RDD
groupEdScores.foreach(new VoidFunction>>() {
private static final long serivalVersionUID = 1L;
// key, value, call
// key value
// , key value, Tuple2, RDD
@Override
public void call(Tuple2> stringIterableTuple2) throws Exception {
System.out.println("class:" + stringIterableTuple2._1);
Iterator ite = stringIterableTuple2._2.iterator();
while (ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("====================================");
}
});
// JavaSparkContext
sc.close();
}
2.2 Scala
def groupByKey(): Unit = {
val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(new Tuple2[String, Integer]("class1", 80),
new Tuple2[String, Integer]("class2", 88),
new Tuple2[String, Integer]("class1", 80),
new Tuple2[String, Integer]("class2", 90))
val scores = sc.parallelize(scoreList, 1)
val groupedScores = scores.groupByKey()
groupedScores.foreach(
score => {
println(score._1)
score._2.foreach(singleScore => println(singleScore))
println("===========")
}
)
}
5. reduceByKey: 각 반 의 총 점 을 통계 합 니 다.
5.1 Java
/**
* reduceNyKey :
*/
private static void reduceNyKey() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("reduceNyKey")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List> scoresList = Arrays.asList(
new Tuple2<>("class1", 80),
new Tuple2<>("class2", 88),
new Tuple2<>("class1", 80),
new Tuple2<>("class2", 90));
// , JavaPairRDD
JavaPairRDD scores = sc.parallelizePairs(scoresList);
// Scores RDD, reduceByKey
//reduceByKey, Function2 , , 3
// , RDD value
// key reduce, 、 value , value
// , , call()
// , reduce , RDD value
//reduceByKey RDD, JavaPairRDD
JavaPairRDD totalScores = scores.reduceByKey(new Function2() {
private static final long serivalVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// totalScore RDD
// groupedScores RDD
totalScores.foreach(new VoidFunction>() {
private static final long serivalVersionUID = 1L;
@Override
public void call(Tuple2 t) throws Exception {
System.out.println(t._1 + ":" + t._2);
}
});
// JavaSparkContext
sc.close();
}
5.2 Scala
def reduceByKey(): Unit = {
val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(new Tuple2[String, Integer]("class1", 80),
new Tuple2[String, Integer]("class2", 88),
new Tuple2[String, Integer]("class1", 80),
new Tuple2[String, Integer]("class2", 90))
val scores = sc.parallelize(scoreList, 1)
val totalScores = scores.reduceByKey(_ + _)
totalScores.foreach(classScore => println(classScore._1 + ":" + classScore._2))
}
6. sortByKey: 학생 점 수 를 정렬 합 니 다.
6.1 Java
/**
* sortByKey :
*/
private static void sortByKey() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List> scoresList = Arrays.asList(
new Tuple2<>(65, "leo"),
new Tuple2<>(60, "tom"),
new Tuple2<>(90, "marry"),
new Tuple2<>(88, "jack"));
// , JavaPairRDD
JavaPairRDD scores = sc.parallelizePairs(scoresList);
// scoreRDD sortByKey
//sortByKey key , ,
// , JavaPairRDD, RDD
// RDD
JavaPairRDD sortedScored = scores.sortByKey();
// sortedScored RDD
sortedScored.foreach(new VoidFunction>() {
private static final long serivalVersionUID = 1L;
@Override
public void call(Tuple2 t) throws Exception {
System.out.println(t._1 + ":" + t._2);
}
});
// JavaSparkContext
sc.close();
}
6.2 Scala
def sortByKey(): Unit = {
val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(new Tuple2[Integer, String](90, "cat"),
new Tuple2[Integer, String](80, "leo"),
new Tuple2[Integer, String](80, "opp"),
new Tuple2[Integer, String](55, "lll"))
val scores = sc.parallelize(scoreList, 1)
val totalScores = scores.sortByKey()
totalScores.foreach(studentScore => println(studentScore._1 + ":" + studentScore._2))
}
7. join: 모든 학생 의 성적 을 인쇄 합 니 다.
7.1 Java
/**
* join :
*/
private static void join() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List> studentList = Arrays.asList(
new Tuple2<>(1, "leo"),
new Tuple2<>(2, "tom"),
new Tuple2<>(3, "marry"),
new Tuple2<>(4, "jack"));
//
List> scoreList = Arrays.asList(
new Tuple2<>(1, 100),
new Tuple2<>(2, 80),
new Tuple2<>(3, 50),
new Tuple2<>(4, 20));
// RDD
JavaPairRDD student = sc.parallelizePairs(studentList);
JavaPairRDD scores = sc.parallelizePairs(scoreList);
// join RDD
//join , key join, JavaPairRDD
// JavaPairRDD , JavaPairRDD key , key join
// , Tuple2 ,Tuple2 RDD value
//join, RDD , Key join pair
// (1,1)(1,2)(1,3) RDD (1,4)(2,1)(2.2) RDD
//join , (1,(1,4))(1,(2,4))(1,(3,4))
JavaPairRDD> studentScores = student.join(scores);
// sortedScored RDD
studentScores.foreach(new VoidFunction>>() {
@Override
public void call(Tuple2> t) throws Exception {
System.out.println("student id:" + t._1);
System.out.println("student name:" + t._2._1);
System.out.println("student score:" + t._2._2);
System.out.println("==============");
}
});
// JavaSparkContext
sc.close();
}
7.2 Scala
def join(): Unit = {
val conf = new SparkConf().setAppName("join").setMaster("local")
val sc = new SparkContext(conf)
//
val studentList = Array(
new Tuple2[Integer, String](1, "leo"),
new Tuple2[Integer, String](2, "tom"),
new Tuple2[Integer, String](3, "marry"))
//
val scoreList = Array(
new Tuple2[Integer, Integer](1, 100), new Tuple2[Integer, Integer](2, 80),
new Tuple2[Integer, Integer](3, 50), new Tuple2[Integer, Integer](1, 70),
new Tuple2[Integer, Integer](2, 10), new Tuple2[Integer, Integer](3, 40))
val student = sc.parallelize(studentList)
val scores = sc.parallelize(scoreList)
val studentScores = student.join(scores)
studentScores.foreach(studentScore => println({
System.out.println("student id:" + studentScore._1)
System.out.println("student name:" + studentScore._2._1)
System.out.println("student score:" + studentScore._2._2)
System.out.println("==============")
}))
}
8. cogroup: 모든 학생 의 성적 을 인쇄 합 니 다.
8.1 Java
/**
* cogroup :
*/
private static void cogroup() {
// SparkConf
SparkConf conf = new SparkConf()
.setAppName("cogroup")
.setMaster("local");
// JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//
List> studentList = Arrays.asList(
new Tuple2<>(1, "leo"),
new Tuple2<>(2, "tom"),
new Tuple2<>(3, "marry"));
//
List> scoreList = Arrays.asList(
new Tuple2<>(1, 100),
new Tuple2<>(2, 80),
new Tuple2<>(3, 50),
new Tuple2<>(1, 70),
new Tuple2<>(2, 10),
new Tuple2<>(3, 40));
// RDD
JavaPairRDD student = sc.parallelizePairs(studentList);
JavaPairRDD scores = sc.parallelizePairs(scoreList);
//cogroup join
// , key join value, Iterable
JavaPairRDD, Iterable>> studentScores = student.cogroup(scores);
// sortedScored RDD
studentScores.foreach(new VoidFunction, Iterable>>>() {
@Override
public void call(Tuple2, Iterable>> t) throws Exception {
System.out.println("student id:" + t._1);
System.out.println("student name:" + t._2._1);
System.out.println("student score:" + t._2._2);
System.out.println("==============");
}
});
// JavaSparkContext
sc.close();
}
9. main 함수
9.1 Java
public static void main(String[] args) {
//map();
//filter();
//flatMap();
//groupByKey();
//reduceNyKey();
//sortByKey();
//join();
cogroup();
}
9.2 Scala
def main(args: Array[String]) {
//map()
//filter()
//flatMap()
//groupByKey()
//reduceByKey()
//sortByKey()
join()
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.