2.Spark1.5에서 SparkStreaming 개발 [데이터 집계편]
이번 목표
이번에는 SparkStreaming에 SparkSQL, DataFrame을 조합하여 실용적인 집계를 구현합니다.
SparkSQL, DataFrame 정보
SparkSQL과 DataFrame은 Spark에서 구조 데이터를 다루는 모듈로 분산 처리 쿼리 엔진으로 동작한다.
Hive 데이터를 받아서 조작할 수도 있다.
※ 참조 원 htp : /// spark. 아파치. rg/sql/
준비
SparkStreaming 개발 환경 설정은 이전 게시물을 참조하십시오.
h tp : 소 m/과 z3284/있어 MS/72DC7483872C412b6화 7
코드 작성
SparkSQL과 DataFrame은 Spark에서 구조 데이터를 다루는 모듈로 분산 처리 쿼리 엔진으로 동작한다.
Hive 데이터를 받아서 조작할 수도 있다.
※ 참조 원 htp : /// spark. 아파치. rg/sql/
준비
SparkStreaming 개발 환경 설정은 이전 게시물을 참조하십시오.
h tp : 소 m/과 z3284/있어 MS/72DC7483872C412b6화 7
코드 작성
ss1.scala
package org.apache.spark.streaming
import org.apache.spark.sql.SQLContext
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
/**
* `$ nc -lk 9999`で入力データを設定する。
*/
object NetworkWordCountSQL extends Logging {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCountSQL")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create a socket stream on target ip:port and count the
val windowData = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
windowData.foreachRDD{jsonRdd =>
if (jsonRdd.count() > 0) {
val sqlContext = SQLContext.getOrCreate(jsonRdd.sparkContext)
val rowDf = sqlContext.read.json(jsonRdd)
println("Summary by DataFrame")
rowDf.groupBy("a_id","b_id").sum("count").show()
println("Summary by SQL")
rowDf.registerTempTable("json_data")
val sumAdf = sqlContext.sql("SELECT a_id, b_id, sum(count) AS SUM FROM json_data GROUP BY a_id, b_id")
sumAdf.show()
}
}
ssc.start()
ssc.awaitTermination()
}
}
htps : // 기주 b. 이 m/또는 z3284/4 굳어진/bぉb/마s r/s파 rkst 레아민 g/s2/src/마이/s인가/s2. s?
build하다
cd /Users/kaz3284/github/4qiita/sparkstreaming/ss1 #sbtプロジェクトのホーム
sbt assembly
실행하다
준비
pre-build판을 DL하여 실행환경을 만든다. ※손으로 spark-submit를 움직일 수 있도록 하기 위해.
htp : // s park. 아파치. 오 rg / 도w 응아 ds. HTML
Mac에서 실행하는 예
export SPARK_HOME=/Users/kaz3284/develop/spark/spark-1.5.2-bin-hadoop2.6
export SS_SRC=/Users/kaz3284/github/4qiita/sparkstreaming/ss1
${SPARK_HOME}/bin/spark-submit ${SS_SRC}/target/scala-2.11/ss1-assembly-1.0.jar
nc -lk 9999
{"a_id":1,"b_id":10,"c_id":100,"count":1000}
{"a_id":1,"b_id":10,"c_id":100,"count":1001}
{"a_id":1,"b_id":10,"c_id":101,"count":1011}
{"a_id":1,"b_id":10,"c_id":101,"count":1012}
{"a_id":1,"b_id":10,"c_id":101,"count":1013}
{"a_id":1,"b_id":10,"c_id":110,"count":1100}
{"a_id":1,"b_id":12,"c_id":120,"count":1200}
{"a_id":2,"b_id":20,"c_id":200,"count":2000}
{"a_id":2,"b_id":20,"c_id":210,"count":2100}
cp ${SPARK_HOME}/conf/log4j.properties.template ${SPARK_HOME}/conf/log4j.properties
vi ${SPARK_HOME}/conf/log4j.properties
log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
마지막으로
이번에는 Streaming과 DataFrame, SQL 구조적인 데이터를 캡처하여 집계를 구현했습니다.
DataFrame으로 가져와 버리면 집계의 기본인 「GroupBy」와 같은 처리를 간단하게 걸 수 있는 것을 실감할 수 있습니다.
다음은 보다 실용적인 분산 환경에서의 개발 및 구현을 소개합니다.
이번 개발의 참고까지
스파크 git
spark docs
Reference
이 문제에 관하여(2.Spark1.5에서 SparkStreaming 개발 [데이터 집계편]), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/kaz3284/items/ec44243fb79f4d8d9501
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Reference
이 문제에 관하여(2.Spark1.5에서 SparkStreaming 개발 [데이터 집계편]), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/kaz3284/items/ec44243fb79f4d8d9501텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)