spark streaming sql demo

6585 단어 hadoop
demo
package com.hxzq.sjzx.realdata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.json.JSONObject;
import scala.Tuple2;

public class KafkaStreamingManager {

	private static final Pattern SPACE = Pattern.compile(" ");

	private KafkaStreamingManager() {
	}

	public static void main(String[] args) throws Exception {

		// SparkContext sc = new SparkContext("local[2]","testsparkcontext");

		// final SQLContext sqlCtx = new SQLContext(sc);

		JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
				"appname", new Duration(10000));
		final SQLContext sqlCtx = new SQLContext(jssc.sparkContext());

		Map map = new HashMap();
		map.put("message", 5);

		JavaPairReceiverInputDStream lines = KafkaUtils
				.createStream(jssc, "192.168.20.237:2181", "message", map);
		// lines.foreachRDD(foreachFunc);

		JavaDStream newline = lines
				.map(new Function, String>() {
					public String call(Tuple2 tuple2) {
						return tuple2._2;

					}
				});

		lines.foreachRDD(new VoidFunction2, Time>() {

			public void call(JavaPairRDD paramT1, Time paramT2)
					throws Exception {
				// TODO Auto-generated method stub
				System.out.println("======" + paramT2);

				JavaRDD rowRDD = paramT1
						.map(new Function, ServerRecord>() {
							public ServerRecord call(
									Tuple2 paramT1)
									throws Exception {
								// TODO Auto-generated method stub
								JSONObject ret = new JSONObject(paramT1._2);
								String serverid = ret.getString("serverid");
								String requestId = ((JSONObject) ret
										.get("traceinfo")).get("requestid")
										.toString();

								String uuid = ((JSONObject) ret
										.get("traceinfo")).get("uuid")
										.toString();

								java.util.Random random = new java.util.Random();
								String testData = String.valueOf(random
										.nextInt(10));
								ServerRecord sr = new ServerRecord();
								sr.setAllTime(testData);
								sr.setErrorCode(testData);
								sr.setSeverId(testData);
								sr.setStatus(testData);
								sr.setNum(testData);
								return sr;
							}
						});

				DataFrame sServerFrame = sqlCtx.createDataFrame(rowRDD,
						ServerRecord.class);
				sServerFrame.registerTempTable("test000");

				// private String severId;
				// private String status;
				// private String errorCode;
				// private String allTime;
				// private String num;
				DataFrame teenagers = sqlCtx
						.sql("SELECT count(*) from test000 group by severId,status,errorCode,allTime,num");

				List list = teenagers.collectAsList();

				if (list != null && list.size() > 0) {

					for (int i = 0; i < list.size(); i++) {
						Row row = list.get(i);

						System.out.println("lllllll========" + list.get(i));
					}
				}

			}
		});

		// lines.print();
		// lines.print();
		// lineLengths.print();
		jssc.start();
		jssc.awaitTermination();

	}
}


 
  


package com.hxzq.sjzx.realdata;

public class ServerRecord {

	private String severId;
	private String status;
	private String errorCode;
	private String allTime;
	private String num;
	public String getSeverId() {
		return severId;
	}
	public void setSeverId(String severId) {
		this.severId = severId;
	}
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	public String getErrorCode() {
		return errorCode;
	}
	public void setErrorCode(String errorCode) {
		this.errorCode = errorCode;
	}
	public String getAllTime() {
		return allTime;
	}
	public void setAllTime(String allTime) {
		this.allTime = allTime;
	}
	public String getNum() {
		return num;
	}
	public void setNum(String num) {
		this.num = num;
	}
	
	
}



 
  

	4.0.0
	com.hxzq.sjzx
	common
	0.0.1-SNAPSHOT

	

		
			org.apache.kafka
			kafka-clients
			0.10.1.0
		
		
			com.googlecode.json-simple
			json-simple
			1.1.1
		


		
			org.json
			json
			20160810
		


		
			org.apache.spark
			spark-streaming_2.10
			1.6.2
		

		
			org.apache.spark
			spark-streaming-kafka_2.10
			1.6.2
		

		
			org.apache.spark
			spark-sql_2.10
			1.6.2
		

		
			org.apache.spark
			spark-core_2.10
			1.6.2
		
	



좋은 웹페이지 즐겨찾기