자바 spark 구현 (1)

31057 단어 빅 데이터spark
자바 로 간단 한 스파크 예 구현.
데 이 터 는 매우 간단 하 다. 시간, 전화번호, 상행 데이터 와 하행 데 이 터 를 포함 하여 미리 처 리 된 로그 파일 이다.하나의 행 위 는 하나의 기록 으로 서로 다른 데이터 간 에 탭 문자 로 분리 된다.
견본 류
샘플 클래스 는 로그 파일 의 기록 을 봉인 하기 위 한 것 입 니 다.
package com.icesun.java.accessLog;

import java.io.Serializable;

public class LogInfo implements Serializable {
    private static final long serialVersionUID = 5749943279909593929L;
    private long timeStamp;
    private String phoneNo;
    private long down;
    private long up;


    LogInfo(){}
    LogInfo(long timeStamp, String phoneNo, long down, long up){
        this.timeStamp = timeStamp;
        this.phoneNo = phoneNo;
        this.down = down;
        this.up = up;
    }

    public long getTimeStamp() {
        return timeStamp;
    }

    public String getPhoneNo() {
        return phoneNo;
    }

    public long getDown() {
        return down;
    }

    public long getUp() {
        return up;
    }
}


Spark Core API
package com.icesun.java.accessLog;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class LogSpark {
    public static void main(String [] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("AccessLog");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");

        String path = "files/access.log";
        JavaRDD<String> lines = sc.textFile(path);

        JavaPairRDD<String,LogInfo> logPairRDD = RDD2RDDPair(lines);
        JavaPairRDD<String,LogInfo>  reduceByKey = aggregateByDeviceID(logPairRDD);

        reduceByKey.foreach(x -> System.out.println(x._2.getDown()));
        System.out.println(reduceByKey.count());

        sc.close();
    }

    //  strRDD LogInfo RDD           K,LogInfor V
    private static JavaPairRDD<String, LogInfo> RDD2RDDPair(JavaRDD<String> accessLogRDD){
        return accessLogRDD.mapToPair((PairFunction<String, String, LogInfo>) line -> {
            String[] logInfo = line.split("\t");
            long timeStamp = Long.valueOf(logInfo[0]);
            String phone = logInfo[1];
            long down = Long.valueOf(logInfo[2]);
            long up = Long.valueOf(logInfo[2]);

            LogInfo log = new LogInfo(timeStamp, phone, down, up);
            return new Tuple2<String, LogInfo>(phone, log);
        });
    }

    //  reduceByKey     K,            
    private static JavaPairRDD<String, LogInfo> aggregateByDeviceID(JavaPairRDD<String, LogInfo> pairRDD){
        return pairRDD.reduceByKey((Function2<LogInfo, LogInfo, LogInfo>)(v1, v2) -> {

                //         
                long timeStamp = v1.getTimeStamp() < v2.getTimeStamp() ? v1.getTimeStamp(): v2.getTimeStamp();
                //           add
                long up = v1.getUp() + v2.getUp();
                long down = v1.getDown() + v2.getDown();
                String phone = v1.getPhoneNo();
                return new LogInfo(timeStamp, phone, up, down);
            }
        );
    }
}


SparkSQL
package com.icesun.java.accessLog;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class LogSparkSQL {
    public static void main(String[] args){
        SparkConf conf = new SparkConf().setAppName("SparkSQL").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

//        HiveConf hiveConf = new HiveConf(sc);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("files/access.log");

        //       LogInfoRDD
        JavaRDD<LogInfo> logInfo = lines.map( line ->{
            String[] str = line.split("\t");
            long timeStamp = Long.valueOf(str[0]);
            String phone = str[1];
            long down = Long.valueOf(str[2]);
            long up = Long.valueOf(str[3]);
            LogInfo log = new LogInfo(timeStamp, phone, down, up);
            return log;
        });

        // RDD   DataSet   
        Dataset<Row>  df = sqlContext.createDataFrame(logInfo, LogInfo.class);
        // dataset          
        df.select("phoneNo", "down").where("up > 50000").show();
        
        // df       ,     SQL         
        df.createOrReplaceTempView("log");
        Dataset<Row> seleRs = sqlContext.sql("select * from log where up > 50000 and down < 10000");
        seleRs.toJavaRDD().foreach(row -> System.out.println(row.get(1)));
        


    }
}


주소

좋은 웹페이지 즐겨찾기