Spark Streaming 연산 자 개발 사례

Spark Streaming 연산 자 개발 사례
transform 연산 자 개발
transform 작업 은 DStream 에 적 용 될 때 임의의 RDD 에서 RDD 로 전환 하 는 작업 을 수행 할 수 있 고 DStream API 에서 제공 되 지 않 은 작업 을 실현 할 수 있 습 니 다.예 를 들 어 DStream API 에서 하나의 DStream 중의 모든 batch 를 제공 하지 않 고 특정한 RDD 와 join 하 는 작업 을 수행 할 수 있 습 니 다.DStream 의 join 연산 자 는 join 다른 DStream 만 할 수 있 습 니 다.그러나 우 리 는 스스로 transform 조작 을 사용 하여 이 기능 을 실현 할 수 있다.
인 스 턴 스:블랙리스트 사용자 실시 간 필터

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *        
 */
object TransformDemo {
 def main(args: Array[String]): Unit = {
  //      
  Logger.getLogger("org").setLevel(Level.WARN)
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(2))

  //        RDD
  val blackRDD =
   ssc.sparkContext.parallelize(Array(("zs", true), ("lisi", true)))

  //  socket nc     
  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)

  /**
   *          
   * zs sb sb sb sb
   * lisi fuck fuck fuck
   * jack hello
   */
  linesDStream
   .map(x => {
    val info = x.split(" ")
    (info(0), info.toList.tail.mkString(" "))
   })
   .transform(rdd => { //transform   RDD->RDD   ,        RDD
    /**
     *   leftouterjoin    ,       :
     * (zs,(sb sb sb sb),Some(true)))
     * (lisi,(fuck fuck fuck),some(true)))
     * (jack,(hello,None))
     */
    val joinRDD = rdd.leftOuterJoin(blackRDD)

    //   Some(true) ,         ,   None ,        ,            
    val filterRDD = joinRDD.filter(x => x._2._2.isEmpty)

    filterRDD
   })
   .map(x=>(x._1,x._2._1)).print()

  ssc.start()
  ssc.awaitTermination()
 }
}
테스트
nc 시작,사용자 및 발언 정보 전송

프로그램 이 블랙리스트 에 있 는 사용자 의 발언 을 실시 간 으로 걸 러 내 는 것 을 볼 수 있다.

updateStateByKey 연산 자 개발
updateStateByKey 연산 자 는 임의의 상 태 를 유지 할 수 있 으 며,동시에 새로운 정 보 를 계속 업데이트 할 수 있 습 니 다.이 연산 자 는 모든 key 에 state 를 유지 하고,끊임없이 state 를 업데이트 할 수 있 습 니 다.모든 batch 에 있어 Spark 는 이전에 존 재 했 던 key 에 State 업데이트 함 수 를 한 번 씩 적용 합 니 다.이 key 가 batch 에 새로운 값 이 있 든 없 든 State 업데이트 함수 가 되 돌아 오 는 값 이 none 이면 이 key 에 대응 하 는 state 는 삭 제 됩 니 다.새로 생 긴 키 에 대해 서도 state 업데이트 함 수 를 실행 합 니 다.
이 계산 자 를 사용 하려 면 반드시 두 가지 절 차 를 진행 해 야 한다.
  • state 를 정의 하 는 것 은 임의의 데이터 형식 일 수 있 습 니 다
  • state 업데이트 함 수 를 정의 합 니 다.하나의 함수 로 이전의 상 태 를 어떻게 사용 하 는 지 지정 하고 입력 흐름 에서 새로운 값 업데이트 상 태 를 가 져 옵 니 다
  • 메모:updateStateByKey 작업,Checkpoint 메커니즘 을 켜 야 합 니 다.
    인 스 턴 스:캐 시 기반 실시 간 WordCount
    
    package StreamingDemo
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     *        WordCount,              
     */
    object UpdateStateByKeyDemo {
     def main(args: Array[String]): Unit = {
      //      
      Logger.getLogger("org").setLevel(Level.WARN)
    
      /**
       *              Kerberos      null,    HADOOP_USER_NAME    ,
       *        Hadoop      hadoop username
       *                     ,       ,           
       */
      //System.setProperty("HADOOP_USER_NAME","Setsuna")
    
      val conf = new SparkConf()
       .setAppName(this.getClass.getSimpleName)
       .setMaster("local[2]")
      val ssc = new StreamingContext(conf, Seconds(2))
    
      //  Checkpoint     
      ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")
    
      //    DStream
      val lineDStream = ssc.socketTextStream("Hadoop01", 6666)
      val wordDStream = lineDStream.flatMap(_.split(" "))
      val pairsDStream = wordDStream.map((_, 1))
    
      /**
       * state:        
       * values:    batch key   values 
       */
      val resultDStream =
       pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
    
        // state none,            ,   0     count
        var count = state.getOrElse(0)
    
        //  values,         value 
        for (value <- values) {
         count += value
        }
    
        //  key    state,        
        Option(count)
       })
    
      //      
      resultDStream.print()
    
      ssc.start()
      ssc.awaitTermination()
     }
    }
    테스트
    nc 오픈,단어 입력

    콘 솔 실시 간 출력 결과

    window 슬라이딩 창 연산 자 개발
    Spark Streaming 은 미끄럼 창 동작 을 지원 합 니 다.미끄럼 창 에 있 는 데 이 터 를 계산 할 수 있 습 니 다.
    미끄럼 창 에는 일괄 처리 간격,창 간격,미끄럼 간격 이 포함 되 어 있 습 니 다.
    창 작업 의 경우 창 내부 에 N 개의 일괄 처리 데이터 가 있 습 니 다4.567917.일괄 처리 데이터 의 크기 는 창 간격 에 의 해 결정 되 고 창 간격 은 창의 지속 시간,즉 창의 길 이 를 말 합 니 다4.567917.미끄럼 시간 간격 은 얼마나 걸 려 서 창 이 한 번 미 끄 러 지고 새로운 창 이 형성 되 는 지 를 말 합 니 다.미끄럼 간격 은 기본 적 인 상황 에서 일괄 처리 시간 간격 과 같 습 니 다메모:미끄럼 시간 간격 과 창 시간 간격의 크기 는 일괄 처리 간격의 정수 배로 설정 해 야 합 니 다
    공식 적 인 그림 으로 설명 하 다.

    일괄 처리 간격 은 1 시간 단위 이 고 창 간격 은 3 시간 단위 이 며 미끄럼 간격 은 2 시간 단위 입 니 다.초기 창 time1-time 3 에 대해 서 는 창 간격 이 만족 해야만 데 이 터 를 처리 할 수 있 습 니 다.따라서 미끄럼 창 작업 은 두 개의 인자,창 길이 와 미끄럼 시간 간격 을 지정 해 야 합 니 다.Spark Streaming 에서 미끄럼 창 에 대한 지원 은 Storm 보다 더 완벽 합 니 다.
    Window 슬라이딩 연산 자 조작
    계산 하 다
    묘사 하 다.
    window()
    모든 슬라이딩 창의 데이터 에 대해 사용자 정의 계산 을 실행 합 니 다.
    countByWindow()
    모든 미끄럼 창 데이터 에 count 작업 을 수행 합 니 다.
    reduceByWindow()
    모든 슬라이딩 창의 데이터 에 대해 reduce 작업 을 실행 합 니 다.
    reduceByKeyAndWindow()
    모든 슬라이딩 창의 데이터 에 대해 reduceByKey 동작 을 실행 합 니 다.
    countByValueAndWindow()
    모든 슬라이딩 창의 데이터 에 대해 countByValue 작업 을 실행 합 니 다.
    reduceByKeyAndWindow 연산 자 개발
    인 스 턴 스:온라인 핫 이 슈 검색 어 실시 간 미끄럼 통계
    2 초 간격 으로 최근 5 초 간 검색 어 중 가장 높 은 3 개 검색 어 와 출현 횟수 를 집계 한다.
    
    package StreamingDemo
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     *   :  2  ,    5             3          
     */
    object ReduceByKeyAndWindowDemo {
     def main(args: Array[String]): Unit = {
    
      //      
      Logger.getLogger("org").setLevel(Level.WARN)
      //    
      val conf = new SparkConf()
       .setAppName(this.getClass.getSimpleName)
       .setMaster("local[2]")
    
      //        1s
      val ssc = new StreamingContext(conf, Seconds(1))
    
      val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
      linesDStream
       .flatMap(_.split(" ")) //        
       .map((_, 1)) //  (word,1)
       .reduceByKeyAndWindow(
        //           
        //x          ,y      Key            
        (x: Int, y: Int) => x + y,
        //     5 
        Seconds(5),
        //       2 
        Seconds(2)
       )
       .transform(rdd => { //transform   rdd   ,      rdd
        //  Key          ,      ,      3    
        val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)
        // Array   resultRDD
        val resultRDD = ssc.sparkContext.parallelize(info)
        resultRDD
       })
       .map(x => s"${x._1}      :${x._2}")
       .print()
    
      ssc.start()
      ssc.awaitTermination()
    
     }
    }
    테스트 결과

    DStream 출력 조작 개관
    Spark Streaming 은 DStream 의 데 이 터 를 외부 시스템 으로 출력 할 수 있 습 니 다.DSteram 의 모든 계산 은 output 작업 에 의 해 실 행 됩 니 다.foreachRDD 출력 작업 도 안에서 RDD 에 action 작업 을 수행 해 야 모든 batch 에 대한 계산 논 리 를 촉발 할 수 있 습 니 다.
    바꾸다
    묘사 하 다.
    print()
    Driver 에서 DStream 의 데이터 의 10 개 요 소 를 출력 합 니 다.주로 테스트 에 사용 되 거나 output 작업 이 필요 없 을 때 job 를 간단하게 터치 하 는 데 사 용 됩 니 다.
    saveAsTextFiles(prefix,
    [suffix])
    DStream 의 내용 을 텍스트 파일 로 저장 합 니 다.그 중에서 일괄 처리 간격 에서 발생 하 는 파일 은 prefix-TIME 입 니 다.IN_MS[.suffix]방식 으로 명명 합 니 다.
    saveAsObjectFiles(prefix
    , [suffix])
    DStream 의 내용 을 대상 에 따라 정렬 하고 SequenceFile 형식 으로 저장 합 니 다.그 중 매번 일괄 처리 간격 으로 생 성 되 는 파일 은 prefix-TIMEIN_MS[.suffix]방식 으로 명명 합 니 다.
    saveAsHadoopFiles(pref
    ix, [suffix])
    DStream 의 내용 을 텍스트 로 Hadoop 파일 로 저장 합 니 다.일괄 처리 간격 에서 발생 하 는 파일 입 니 다.
    prefix-TIME 로IN_MS[.suffix]방식 으로 명명 합 니 다.
    foreachRDD(func)
    가장 기본 적 인 출력 작업 은 func 함 수 를 DStream 의 RDD 에 적용 합 니 다.이 작업 은 외부 시스템 으로 데 이 터 를 출력 합 니 다.
    예 를 들 어 RDD 를 파일 이나 네트워크 데이터베이스 에 저장 하 는 등 입 니 다.주의해 야 할 것 은 func 함수 가 이 streaming 을 실행 하고 있 습 니 다.
    응용 드라이버 프로 세 스에 서 실 행 됩 니 다.
    foreachRDD 연산 자 개발
    foreachRDD 는 가장 자주 사용 하 는 output 작업 으로 DStream 에서 발생 하 는 모든 RDD 를 옮 겨 다 니 며 처리 한 다음 에 모든 RDD 의 데 이 터 를 외부 저장 소 에 기록 할 수 있 습 니 다.예 를 들 어 파일,데이터 베이스,캐 시 등 은 보통 RDD 에 대해 action 작업 을 수행 합 니 다.예 를 들 어 foreach.
    foreachRDD 작업 데이터베이스 사용
    일반적으로 foreachRDD 에 서 는 JDBC Connection 과 같은 Connection 을 만 든 다음 Connection 을 통 해 데 이 터 를 외부 저장 소 에 기록 합 니 다.
    오류 1:RDD 의 foreach 작업 외부 에 Connection 만 들 기
    
    dstream.foreachRDD { rdd =>
      val connection=createNewConnection()
      rdd.foreach { record => connection.send(record)
      }
    }
    이러한 방식 은 잘못된 것 입 니 다.이러한 방식 은 Connection 대상 이 직렬 화 된 후에 모든 task 에 전 송 될 수 있 습 니 다.그러나 Connection 대상 은 직렬 화 를 지원 하지 않 기 때문에 전송 할 수 없습니다.
    오류 2:RDD 의 foreach 작업 내부 에 Connection 만 들 기
    
    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }
    이 방식 은 가능 하지만 실행 효율 이 낮 습 니 다.RDD 의 모든 데이터 에 대해 Connection 대상 을 만 들 수 있 기 때문에 보통 Connection 대상 의 생 성 은 성능 을 소모 합 니 다.
    합 리 적 인 방식
  • 첫 번 째:RDD 의 foreachPartition 작업 을 사용 하고 이 작업 내부 에 Connection 대상 을 만 드 는 것 은 RDD 의 모든 partition 에 Connection 대상 을 만 드 는 것 과 같 아서 많은 자원 을 절약 했다
  • 두 번 째:정적 연결 풀 을 수 동 으로 밀봉 하고 RDD 의 foreachPartition 작업 을 사용 하 며 이 작업 내부 에서 정적 연결 풀 에서 정적 방법 으로 연결 을 얻 고 연결 이 끝 난 후에 연결 풀 에 다시 넣 습 니 다.이렇게 하면 여러 RDD 의 partition 사이 에 다시 연결 할 수 있 습 니 다
  • 인 스 턴 스:실시 간 으로 WordCount 를 집계 하고 결 과 를 MySQL 데이터베이스 에 저장 합 니 다.
    MySQL 데이터베이스 작성 표 문 구 는 다음 과 같 습 니 다.
    
    CREATE TABLE wordcount (
      word varchar(100) CHARACTER SET utf8 NOT NULL,
      count int(10) NOT NULL,
      PRIMARY KEY (word)
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
    IDEA 에 mysql-connector-java-5.1.40-bin.jar 추가

    코드 는 다음 과 같다.
    연못 을 연결 하 는 코드 는 처음에 정적 블록 으로 연못 을 써 서 직접 얻 으 려 고 했 지만 연못 의 너비 가 부족 한 문 제 를 고려 하면 이런 방식 이 더 좋 았 습 니 다.처음에 연결 풀 을 예화 하고 호출 되 어 연결 을 얻 었 습 니 다.연결 이 모두 가 져 왔 을 때 연못 이 비 었 으 면 연못 을 다시 예화 해서 나 왔 습 니 다.
    
    package StreamingDemo
    
    import java.sql.{Connection, DriverManager, SQLException}
    import java.util
    
    object JDBCManager {
     var connectionQue: java.util.LinkedList[Connection] = null
    
     /**
      *               
      * @return
      */
     def getConnection(): Connection = {
      synchronized({
       try {
        //        ,        Connection     
        if (connectionQue == null) {
         connectionQue = new util.LinkedList[Connection]()
         for (i <- 0 until (10)) {
          //  10   ,       
          val connection = DriverManager.getConnection(
           "jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8",
           "root",
           "root")
          //   push    
          connectionQue.push(connection)
         }
        }
       } catch {
        //       
        case e: SQLException => e.printStackTrace()
       }
       //        ,       ,         
       return connectionQue.poll()
      })
     }
    
     /**
      *         ,            
      * @param connection
      */
     def returnConnection(connection: Connection) = {
      //    
      connectionQue.push(connection)
     }
    
     def main(args: Array[String]): Unit = {
      //main    
      getConnection()
      println(connectionQue.size())
     }
    }
    wordcount 코드
    
    package StreamingDemo
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, streaming}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object ForeachRDDDemo {
     def main(args: Array[String]): Unit = {
      //      ,  INFO    
      Logger.getLogger("org").setLevel(Level.WARN)
    
      //  Hadoop   ,     
      System.setProperty("HADOOP_USER_NAME", "Setsuna")
    
      //Spark    
      val conf = new SparkConf()
       .setAppName(this.getClass.getSimpleName)
       .setMaster("local[2]")
      val ssc = new StreamingContext(conf, streaming.Seconds(2))
    
      //     updateStateByKey,      checkpoint
      ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")
    
      //  socket, nc     
      val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
      val wordCountDStream = linesDStream
       .flatMap(_.split(" "))   //       
       .map((_, 1)) //  (word,1)
       .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
        //        
        var count = state.getOrElse(0)
        for (value <- values) {
         count += value
        }
        Option(count)
       })
    
      wordCountDStream.foreachRDD(rdd => {
       if (!rdd.isEmpty()) {
        rdd.foreachPartition(part => {
         //         
         val connection = JDBCManager.getConnection()
         part.foreach(data => {
          val sql = // wordcount    wordcount  ,on duplicate key update           
           s"insert into wordcount (word,count) " +
            s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}"
          //  prepareStatement   sql  
          val pstmt = connection.prepareStatement(sql)
          pstmt.executeUpdate()
         })
         //          ,        
         JDBCManager.returnConnection(connection)
        })
       }
      })
    
      ssc.start()
      ssc.awaitTermination()
     }
    }
    nc 를 열 고 데 이 터 를 입력 하 십시오.

    다른 터미널 에서 wordcount 의 결 과 를 조회 하면 실시 간 으로 변화 가 발생 한 것 을 발견 할 수 있다

    이상 이 본문의 전부 입 니 다.여러분 의 학습 에 도움 이 되 기 를 바 랍 니 다.그리고 여러분 의 많은 성원 을 바 랍 니 다.\#niming{position:fixed; bottom:0; z-index:9999;right:0;width:100%;background:#e5e5e5;}.fengyu{width:99%;padding:0.5%;text-align:center;}a{text-decoration: none;color:#000;}.tuiguangbiaoji{font-size:10px;margin:0px 10px;}

    좋은 웹페이지 즐겨찾기