spark streaming sql demo
6585 단어 hadoop
package com.hxzq.sjzx.realdata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.json.JSONObject;
import scala.Tuple2;
public class KafkaStreamingManager {
private static final Pattern SPACE = Pattern.compile(" ");
private KafkaStreamingManager() {
}
public static void main(String[] args) throws Exception {
// SparkContext sc = new SparkContext("local[2]","testsparkcontext");
// final SQLContext sqlCtx = new SQLContext(sc);
JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
"appname", new Duration(10000));
final SQLContext sqlCtx = new SQLContext(jssc.sparkContext());
Map map = new HashMap();
map.put("message", 5);
JavaPairReceiverInputDStream lines = KafkaUtils
.createStream(jssc, "192.168.20.237:2181", "message", map);
// lines.foreachRDD(foreachFunc);
JavaDStream newline = lines
.map(new Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2;
}
});
lines.foreachRDD(new VoidFunction2, Time>() {
public void call(JavaPairRDD paramT1, Time paramT2)
throws Exception {
// TODO Auto-generated method stub
System.out.println("======" + paramT2);
JavaRDD rowRDD = paramT1
.map(new Function, ServerRecord>() {
public ServerRecord call(
Tuple2 paramT1)
throws Exception {
// TODO Auto-generated method stub
JSONObject ret = new JSONObject(paramT1._2);
String serverid = ret.getString("serverid");
String requestId = ((JSONObject) ret
.get("traceinfo")).get("requestid")
.toString();
String uuid = ((JSONObject) ret
.get("traceinfo")).get("uuid")
.toString();
java.util.Random random = new java.util.Random();
String testData = String.valueOf(random
.nextInt(10));
ServerRecord sr = new ServerRecord();
sr.setAllTime(testData);
sr.setErrorCode(testData);
sr.setSeverId(testData);
sr.setStatus(testData);
sr.setNum(testData);
return sr;
}
});
DataFrame sServerFrame = sqlCtx.createDataFrame(rowRDD,
ServerRecord.class);
sServerFrame.registerTempTable("test000");
// private String severId;
// private String status;
// private String errorCode;
// private String allTime;
// private String num;
DataFrame teenagers = sqlCtx
.sql("SELECT count(*) from test000 group by severId,status,errorCode,allTime,num");
List list = teenagers.collectAsList();
if (list != null && list.size() > 0) {
for (int i = 0; i < list.size(); i++) {
Row row = list.get(i);
System.out.println("lllllll========" + list.get(i));
}
}
}
});
// lines.print();
// lines.print();
// lineLengths.print();
jssc.start();
jssc.awaitTermination();
}
}
package com.hxzq.sjzx.realdata;
public class ServerRecord {
private String severId;
private String status;
private String errorCode;
private String allTime;
private String num;
public String getSeverId() {
return severId;
}
public void setSeverId(String severId) {
this.severId = severId;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getErrorCode() {
return errorCode;
}
public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
public String getAllTime() {
return allTime;
}
public void setAllTime(String allTime) {
this.allTime = allTime;
}
public String getNum() {
return num;
}
public void setNum(String num) {
this.num = num;
}
}
4.0.0
com.hxzq.sjzx
common
0.0.1-SNAPSHOT
org.apache.kafka
kafka-clients
0.10.1.0
com.googlecode.json-simple
json-simple
1.1.1
org.json
json
20160810
org.apache.spark
spark-streaming_2.10
1.6.2
org.apache.spark
spark-streaming-kafka_2.10
1.6.2
org.apache.spark
spark-sql_2.10
1.6.2
org.apache.spark
spark-core_2.10
1.6.2
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Azure HDInsight + Microsoft R Server에서 연산 처리 분산Microsoft Azure HDInsight는 Microsoft가 제공하는 Hadoop의 PaaS 서비스로 인프라 주변의 구축 노하우를 몰라도 훌륭한 Hadoop 클러스터를 구축할 수 있는 훌륭한 서비스입니다. 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.