spark 는 그룹 을 나 눈 후 value 값 을 정렬 합 니 다 (JAVA)
11488 단어 spark
org.apache.spark
spark-core_2.10
1.6.0
groupsort.txt: spark 100 storm 90 kafka 75 hadoop 60 zookeeper 100 impala 80 hbase 65 hive 90 flume 95 elasticsearch 100 spark 80 storm 70 kafka 80 hadoop 75 zookeeper 90 impala 100 hbase 30 hive 70 flume 80 elasticsearch 90 spark 56 storm 88 kafka 44 hadoop 33 zookeeper 99 impala 88 hbase 63 hive 45 flume 89 elasticsearch 79
public class GroupSort {
public static void main(String[] args) {
/**
* spark SparkConf, spark ,
* setMaster Master URL, local,
* spark
*/
SparkConf conf = new SparkConf().setAppName("My first spark").setMaster("local");
/**
* JavaSparkContext
* SparkContext spark ,
* SparkContext , spark , spark master 。
*
*/
JavaSparkContext sc = new JavaSparkContext(conf);
//sc.setLogLevel("OFF");
/**
* , JavaSparkContext RDD
*/
JavaRDD lines = sc.textFile("E:/groupsort.txt");
JavaPairRDD, Integer> pairs = lines.mapToPair(new PairFunction, String, Integer>() {
public Tuple2, Integer> call(String line) throws Exception {
String[] split = line.split(" ");
return new Tuple2, Integer>(split[0], Integer.parseInt(split[1]));
}
});
/**
*
*/
JavaPairRDD, Iterable> groups = pairs.groupByKey();
/**
*
*/
JavaPairRDD, Iterable> groupsSort = groups.mapToPair(new PairFunction, Iterable>, String, Iterable>() {
public Tuple2, Iterable> call(Tuple2, Iterable> groupData) throws Exception {
List integers = new ArrayList();
String name = groupData._1;
Iterator it = groupData._2.iterator();
while (it.hasNext()) {
integers.add(it.next());
}
integers.sort(new Comparator() {
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
return new Tuple2, Iterable>(name, integers);
}
});
/**
*
*/
groupsSort.foreach(new VoidFunction, Iterable>>() {
public void call(Tuple2, Iterable> data) throws Exception {
System.out.println(data._1+" "+data._2);
}
});
/**
* JavaSparkContext
*/
sc.stop();
}
}
운행 결과:spark [100, 80, 56] hive [90, 70, 45] hadoop [75, 60, 33] flume [95, 89, 80] zookeeper [100, 99, 90] impala [100, 88, 80] storm [90, 88, 70] elasticsearch [100, 90, 79] kafka [80, 75, 44] hbase [65, 63, 30]
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.