【 Spark 88 】 Spark 스 트 리밍 누산기 조작 (updateStateByKey)

5628 단어 Spark
실시 간 으로 계 산 된 실제 응용 에 서 는 한 시간 간격 내의 데이터 에 관심 을 가 져 야 할 뿐만 아니 라 전체 실시 간 으로 계 산 된 모든 시간 간격 에서 발생 하 는 관련 데 이 터 를 통계 할 수도 있다.
예 를 들 어 Nginx 의 access. log 에 대해 실시 간 모니터링 요청 404 를 할 때 특정한 시간 간격 으로 나타 난 횟수 를 통계 해 야 하 는 것 을 제외 하고 하루 에 몇 번 이나 나타 난 404 를 통계 해 야 한다. 즉, 404 모니터링 이 여러 시간 간격 을 가로 지 르 는 것 이다.
 
Spark Streaming 의 솔 루 션 은 누적 기 입 니 다. 작업 원 리 는 전역 적 으로 업데이트 가능 한 변 수 를 정의 하 는 것 입 니 다. 모든 시간 창 에서 얻 은 통계 값 을 이전 시간 창 에서 얻 은 값 으로 누적 합 니 다. 그러면 이 누적 값 은 여러 시간 간격 을 가로 지 르 는 것 입 니 다.
 
package spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

/**
 * Counts words cumulatively in UTF8 encoded, '
' delimited text received from the network every * second starting with initial value of word count. * Usage: StatefulNetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive * data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ object StatefulNetworkWordCount { def main(args: Array[String]) { /// , Some(Int), /// Key value , , val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } /// , Key、 Key Value 、 ///newUpdateFunc iterator[(String,Int)] val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { /// Key updateFunc ( Key Value 、 ) /// Key iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[3]") // Create the context with a 5 second batch size val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint(".") // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of
delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream("192.168.26.140", 9999) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) // updateStateByKey , val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() } }

 
위의 핵심 작업 시 DStream 의 updateStateByKey 함수 작업 은 네 개의 인 자 를 받 아들 입 니 다.
 
2. DStream. updateStateByKey 분석
방법 설명:
 /**
   * Return a new "state" DStream where the state for each key is updated by applying
   * the given function on the previous state of the key and the new values of each key.
   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   * @param updateFunc State update function. Note, that this function may generate a different
   *                   tuple with a different key than the input key. Therefore keys may be removed
   *                   or added in this way. It is up to the developer to decide whether to
   *                   remember the  partitioner despite the key being changed.
   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
   *                    DStream
   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
   * @param initialRDD initial state value of each key.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
      partitioner: Partitioner, 
      rememberPartitioner: Boolean,
      initialRDD: RDD[(K, S)]
    ): DStream[(K, S)] = {
     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
       rememberPartitioner, Some(initialRDD))
  }
  • initialRDD 는 (K, S) 유형의 RDD 로 Key 의 초기 상 태 를 나타 내 고 각각 (K, S) 은 Key 와 이에 대응 하 는 State 상 태 를 나타 낸다.K 는 updateStateByKey 의 Key 유형 을 나타 낸다. 예 를 들 어 String, S 는 Key 가 대응 하 는 상태 (State) 유형 을 나타 낸다. 상례 에서 Int
  • 이다.
  • rememberPartitioner: 다음 Spark Streaming 실행 과정 에서 발생 하 는 RDD 가 같은 파 티 션 알고리즘 을 사용 하 는 지 여부
  • paritioner: 파 티 션 알고리즘, 상례 에서 사용 한 Hash 파 티 션 알고리즘, 파 티 션 수 는 ssc. sparkContext. default Parallelism
  • updateFunc 는 함수 상수, 유형 (Iterator [(K, Seq [V], Option [S])] = > Iterator [(K, S)], 상태 업데이트 함수
  • (Iterator [(K, Seq [V], Option [S])]) = > Iterator [(K, S)] 는 어떻게 해석 합 니까?
    입 참: 3 원 그룹 교체 기, 3 원 그룹 에서 K 는 Key 를 표시 합 니 다. Seq [V] 는 한 시간 간격 에서 발생 하 는 Key 에 대응 하 는 Value 집합 (Seq 형식, 이 집합 정의 누적 함수 논 리 를 누적 해 야 합 니 다) 을 표시 합 니 다. option [S] 는 이전 시간 간격의 누적 값 (이 Key 의 이전 시간 점 상 태 를 표시 합 니 다) 을 표시 합 니 다.
    출 참: 이원 그룹 교체 기, 이원 그룹 에서 K 는 Key 를 표시 하고 S 는 현재 시간 대 실행 이 끝 난 후에 얻 은 누적 값 (즉 최신 상태) 을 표시 합 니 다.
     
     
     
     
     
     
     
     

    좋은 웹페이지 즐겨찾기