오프라인 경량급 빅데이터 플랫폼 Spark의 CSV 파일 읽기 실례

2461 단어
스파크의 RDD 데이터 세트는 경량 파일을 처리하기에 적합하다. 일반적인 장면에서 excel 파일로 excel 파일을 CSV(쉼표로 구분)로 저장할 수 있고 스파크는 CSV 파일을 읽어서 RDD를 형성한다.
1. 필드 저장을 위한 정렬화 클래스 Record
package sk.sql;

import java.io.Serializable;

public class Record implements Serializable {
		String area;
		String orderid;
		String content;
		String datetime;
		// constructor , getters and setters 
		public Record(String area, String orderid, String content, String datetime) {
			// TODO Auto-generated constructor stub
			this.area=area;
			this.orderid=orderid;
			this.content=content;
			this.datetime=datetime;
		}
		public String getarea(){
			return this.area;
		}
		public void setarea(String area){
			this.area=area;
		}
		public String getorderid(){
			return this.orderid;
		}
		public void setorderid(String orderid){
			this.orderid=orderid;
		}
		public String getcontent(){
			return this.content;
		}
		public void setcontent(String content){
			this.content=content;
		}
		public String getdatetime(){
			return this.datetime;
		}
		public void setdatetime(String datetime){
			this.datetime=datetime;
		}
}

2, CSV 파일 읽기 및 RDD 코드 생성
package sk.sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class CSVDemo {
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("CSVDemo");
	    JavaSparkContext sc = new JavaSparkContext(conf);
		JavaRDD<String> data = sc.textFile("/tmp/10all.csv");
		//SQLContext sqlContext = new SQLContext(sc);

		JavaRDD<Record> rdd_records = data.map(new Function<String, Record>() {
		      public Record call(String line) throws Exception {
		         // Here you can use JSON
		         // Gson gson = new Gson();
		         // gson.fromJson(line, Record.class);
		         String[] fields = line.split(",");
		         if(fields.length<7 ) return null;
		         Record sd = new Record(fields[1],fields[2],fields[4],fields[6]);
			     return sd;
		      }
		});
		List<Record> lr=rdd_records.collect();
		for(Record rd:lr){
			if(rd!=null)
				System.out.println(rd.area+"|"+rd.orderid+"|"+rd.content+"|"+rd.datetime);
		}
		//rdd_records.saveAsTextFile("/tmp/10all.txt");
		sc.stop();
	}
	
}

좋은 웹페이지 즐겨찾기