2.Spark1.5에서 SparkStreaming 개발 [데이터 집계편]

8635 단어 스파크Scala

이번 목표



이번에는 SparkStreaming에 SparkSQL, DataFrame을 조합하여 실용적인 집계를 구현합니다.

SparkSQL, DataFrame 정보



SparkSQL과 DataFrame은 Spark에서 구조 데이터를 다루는 모듈로 분산 처리 쿼리 엔진으로 동작한다.
Hive 데이터를 받아서 조작할 수도 있다.

※ 참조 원 htp : /// spark. 아파치. rg/sql/

준비



SparkStreaming 개발 환경 설정은 이전 게시물을 참조하십시오.
h tp : 소 m/과 z3284/있어 MS/72DC7483872C412b6화 7

코드 작성


  • 본체 : sbt 프로젝트의 src > main > scala로 만든다.

  • ss1.scala
    package org.apache.spark.streaming
    
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{Logging, SparkConf}
    import org.apache.spark.storage.StorageLevel
    
    /**
     *  `$ nc -lk 9999`で入力データを設定する。
     */
    object NetworkWordCountSQL extends Logging {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("NetworkWordCountSQL")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
    
        // Create a socket stream on target ip:port and count the
        val windowData = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    
        windowData.foreachRDD{jsonRdd =>
          if (jsonRdd.count() > 0) {
            val sqlContext = SQLContext.getOrCreate(jsonRdd.sparkContext)
            val rowDf = sqlContext.read.json(jsonRdd)
            println("Summary by DataFrame")
            rowDf.groupBy("a_id","b_id").sum("count").show()
    
            println("Summary by SQL")
            rowDf.registerTempTable("json_data")
            val sumAdf = sqlContext.sql("SELECT a_id, b_id, sum(count) AS SUM FROM json_data GROUP BY a_id, b_id")
            sumAdf.show()
          }
        }
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
  • 이번 코드의 github는 이하
    htps : // 기주 b. 이 m/또는 z3284/4 굳어진/bぉb/마s r/s파 rkst 레아민 g/s2/src/마이/s인가/s2. s?

  • build하다


    cd /Users/kaz3284/github/4qiita/sparkstreaming/ss1 #sbtプロジェクトのホーム
    sbt assembly
    

    실행하다



  • 준비
    pre-build판을 DL하여 실행환경을 만든다. ※손으로 spark-submit를 움직일 수 있도록 하기 위해.
  • 아래의 링크처에서 「1.5.2」「Pre-built for Hadoop 2.6 and later」를 선택해 DL, 해동해 ${SPARK_HOME}로 한다.
    htp : // s park. 아파치. 오 rg / 도w 응아 ds. HTML

  • sparkstreaming 시작
    Mac에서 실행하는 예
  • export SPARK_HOME=/Users/kaz3284/develop/spark/spark-1.5.2-bin-hadoop2.6
    export SS_SRC=/Users/kaz3284/github/4qiita/sparkstreaming/ss1
    
    ${SPARK_HOME}/bin/spark-submit ${SS_SRC}/target/scala-2.11/ss1-assembly-1.0.jar
    
  • nc로 메시지 보내기
  • nc -lk 9999
    {"a_id":1,"b_id":10,"c_id":100,"count":1000}
    {"a_id":1,"b_id":10,"c_id":100,"count":1001}
    {"a_id":1,"b_id":10,"c_id":101,"count":1011}
    {"a_id":1,"b_id":10,"c_id":101,"count":1012}
    {"a_id":1,"b_id":10,"c_id":101,"count":1013}
    {"a_id":1,"b_id":10,"c_id":110,"count":1100}
    {"a_id":1,"b_id":12,"c_id":120,"count":1200}
    {"a_id":2,"b_id":20,"c_id":200,"count":2000}
    {"a_id":2,"b_id":20,"c_id":210,"count":2100}
    
  • 아래와 같은 표준 출력이 나오면 이번 프로그램 완성! !
  • 보충:개발시에 불필요한 메시지를 지우는 경우는 이하를 실시한다.
  • cp ${SPARK_HOME}/conf/log4j.properties.template ${SPARK_HOME}/conf/log4j.properties
    vi ${SPARK_HOME}/conf/log4j.properties
    
  • 다음과 같이 표준 출력 부분을 INFO => WARN으로

  • log4j.properties
    # Set everything to be logged to the console
    log4j.rootCategory=WARN, console
    

    마지막으로



    이번에는 Streaming과 DataFrame, SQL 구조적인 데이터를 캡처하여 집계를 구현했습니다.
    DataFrame으로 가져와 버리면 집계의 기본인 「GroupBy」와 같은 처리를 간단하게 걸 수 있는 것을 실감할 수 있습니다.
    다음은 보다 실용적인 분산 환경에서의 개발 및 구현을 소개합니다.

    이번 개발의 참고까지



    스파크 git



    spark docs

    좋은 웹페이지 즐겨찾기