자바 spark 구현 (1)
데 이 터 는 매우 간단 하 다. 시간, 전화번호, 상행 데이터 와 하행 데 이 터 를 포함 하여 미리 처 리 된 로그 파일 이다.하나의 행 위 는 하나의 기록 으로 서로 다른 데이터 간 에 탭 문자 로 분리 된다.
견본 류
샘플 클래스 는 로그 파일 의 기록 을 봉인 하기 위 한 것 입 니 다.
package com.icesun.java.accessLog;
import java.io.Serializable;
public class LogInfo implements Serializable {
private static final long serialVersionUID = 5749943279909593929L;
private long timeStamp;
private String phoneNo;
private long down;
private long up;
LogInfo(){}
LogInfo(long timeStamp, String phoneNo, long down, long up){
this.timeStamp = timeStamp;
this.phoneNo = phoneNo;
this.down = down;
this.up = up;
}
public long getTimeStamp() {
return timeStamp;
}
public String getPhoneNo() {
return phoneNo;
}
public long getDown() {
return down;
}
public long getUp() {
return up;
}
}
Spark Core API
package com.icesun.java.accessLog;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class LogSpark {
public static void main(String [] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("AccessLog");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
String path = "files/access.log";
JavaRDD<String> lines = sc.textFile(path);
JavaPairRDD<String,LogInfo> logPairRDD = RDD2RDDPair(lines);
JavaPairRDD<String,LogInfo> reduceByKey = aggregateByDeviceID(logPairRDD);
reduceByKey.foreach(x -> System.out.println(x._2.getDown()));
System.out.println(reduceByKey.count());
sc.close();
}
// strRDD LogInfo RDD K,LogInfor V
private static JavaPairRDD<String, LogInfo> RDD2RDDPair(JavaRDD<String> accessLogRDD){
return accessLogRDD.mapToPair((PairFunction<String, String, LogInfo>) line -> {
String[] logInfo = line.split("\t");
long timeStamp = Long.valueOf(logInfo[0]);
String phone = logInfo[1];
long down = Long.valueOf(logInfo[2]);
long up = Long.valueOf(logInfo[2]);
LogInfo log = new LogInfo(timeStamp, phone, down, up);
return new Tuple2<String, LogInfo>(phone, log);
});
}
// reduceByKey K,
private static JavaPairRDD<String, LogInfo> aggregateByDeviceID(JavaPairRDD<String, LogInfo> pairRDD){
return pairRDD.reduceByKey((Function2<LogInfo, LogInfo, LogInfo>)(v1, v2) -> {
//
long timeStamp = v1.getTimeStamp() < v2.getTimeStamp() ? v1.getTimeStamp(): v2.getTimeStamp();
// add
long up = v1.getUp() + v2.getUp();
long down = v1.getDown() + v2.getDown();
String phone = v1.getPhoneNo();
return new LogInfo(timeStamp, phone, up, down);
}
);
}
}
SparkSQL
package com.icesun.java.accessLog;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class LogSparkSQL {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("SparkSQL").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// HiveConf hiveConf = new HiveConf(sc);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("files/access.log");
// LogInfoRDD
JavaRDD<LogInfo> logInfo = lines.map( line ->{
String[] str = line.split("\t");
long timeStamp = Long.valueOf(str[0]);
String phone = str[1];
long down = Long.valueOf(str[2]);
long up = Long.valueOf(str[3]);
LogInfo log = new LogInfo(timeStamp, phone, down, up);
return log;
});
// RDD DataSet
Dataset<Row> df = sqlContext.createDataFrame(logInfo, LogInfo.class);
// dataset
df.select("phoneNo", "down").where("up > 50000").show();
// df , SQL
df.createOrReplaceTempView("log");
Dataset<Row> seleRs = sqlContext.sql("select * from log where up > 50000 and down < 10000");
seleRs.toJavaRDD().foreach(row -> System.out.println(row.get(1)));
}
}
주소
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.