spark 는 그룹 을 나 눈 후 value 값 을 정렬 합 니 다 (JAVA)

11488 단어 spark
maven:

    org.apache.spark
    spark-core_2.10
    1.6.0
groupsort.txt:
spark 100 storm 90 kafka 75 hadoop 60 zookeeper 100 impala 80 hbase 65 hive 90 flume 95 elasticsearch 100 spark 80 storm 70 kafka 80 hadoop 75 zookeeper 90 impala 100 hbase 30 hive 70 flume 80 elasticsearch 90 spark 56 storm 88 kafka 44 hadoop 33 zookeeper 99 impala 88 hbase 63 hive 45 flume 89 elasticsearch 79
public class GroupSort {
    public static void main(String[] args) {
        /**
         *   spark    SparkConfspark         *     setMaster            Master URLlocal         * spark     
         */
        SparkConf conf = new SparkConf().setAppName("My first spark").setMaster("local");
        /**
         *   JavaSparkContext  
         * SparkContext spark         * SparkContextsparkspark   master         *
         */
        JavaSparkContext sc = new JavaSparkContext(conf);
        //sc.setLogLevel("OFF");
        /**
         * JavaSparkContext   RDD
         */
        JavaRDD lines = sc.textFile("E:/groupsort.txt");

        JavaPairRDD, Integer> pairs = lines.mapToPair(new PairFunction, String, Integer>() {
            public Tuple2, Integer> call(String line) throws Exception {
                String[] split = line.split(" ");
                return new Tuple2, Integer>(split[0], Integer.parseInt(split[1]));
            }
        });
        /**
         *   
         */
        JavaPairRDD, Iterable> groups = pairs.groupByKey();
        /**
         *        
         */
        JavaPairRDD, Iterable> groupsSort = groups.mapToPair(new PairFunction, Iterable>, String, Iterable>() {
            public Tuple2, Iterable> call(Tuple2, Iterable> groupData) throws Exception {
                List integers = new ArrayList();
                String name = groupData._1;
                Iterator it = groupData._2.iterator();
                while (it.hasNext()) {
                    integers.add(it.next());
                }
                integers.sort(new Comparator() {
                    public int compare(Integer o1, Integer o2) {
                        return o2 - o1;
                    }
                });
                return new Tuple2, Iterable>(name, integers);
            }
        });
        /**
         *   
         */
        groupsSort.foreach(new VoidFunction, Iterable>>() {
            public void call(Tuple2, Iterable> data) throws Exception {
                System.out.println(data._1+"  "+data._2);
            }
        });
        /**
         *   JavaSparkContext
         */
        sc.stop();
    }
}
운행 결과:
spark  [100, 80, 56] hive  [90, 70, 45] hadoop  [75, 60, 33] flume  [95, 89, 80] zookeeper  [100, 99, 90] impala  [100, 88, 80] storm  [90, 88, 70] elasticsearch  [100, 90, 79] kafka  [80, 75, 44] hbase  [65, 63, 30]

좋은 웹페이지 즐겨찾기