【 Spark 88 】 Spark 스 트 리밍 누산기 조작 (updateStateByKey)
5628 단어 Spark
예 를 들 어 Nginx 의 access. log 에 대해 실시 간 모니터링 요청 404 를 할 때 특정한 시간 간격 으로 나타 난 횟수 를 통계 해 야 하 는 것 을 제외 하고 하루 에 몇 번 이나 나타 난 404 를 통계 해 야 한다. 즉, 404 모니터링 이 여러 시간 간격 을 가로 지 르 는 것 이다.
Spark Streaming 의 솔 루 션 은 누적 기 입 니 다. 작업 원 리 는 전역 적 으로 업데이트 가능 한 변 수 를 정의 하 는 것 입 니 다. 모든 시간 창 에서 얻 은 통계 값 을 이전 시간 창 에서 얻 은 값 으로 누적 합 니 다. 그러면 이 누적 값 은 여러 시간 간격 을 가로 지 르 는 것 입 니 다.
package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
/**
* Counts words cumulatively in UTF8 encoded, '
' delimited text received from the network every
* second starting with initial value of word count.
* Usage: StatefulNetworkWordCount
* and describe the TCP server that Spark Streaming would connect to receive
* data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example
* org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
/// , Some(Int),
/// Key value , ,
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
/// , Key、 Key Value 、
///newUpdateFunc iterator[(String,Int)]
val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
/// Key updateFunc ( Key Value 、 )
/// Key
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[3]")
// Create the context with a 5 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint(".")
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of
delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream("192.168.26.140", 9999)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
// updateStateByKey ,
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
위의 핵심 작업 시 DStream 의 updateStateByKey 함수 작업 은 네 개의 인 자 를 받 아들 입 니 다.
2. DStream. updateStateByKey 분석
방법 설명:
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
rememberPartitioner, Some(initialRDD))
}
입 참: 3 원 그룹 교체 기, 3 원 그룹 에서 K 는 Key 를 표시 합 니 다. Seq [V] 는 한 시간 간격 에서 발생 하 는 Key 에 대응 하 는 Value 집합 (Seq 형식, 이 집합 정의 누적 함수 논 리 를 누적 해 야 합 니 다) 을 표시 합 니 다. option [S] 는 이전 시간 간격의 누적 값 (이 Key 의 이전 시간 점 상 태 를 표시 합 니 다) 을 표시 합 니 다.
출 참: 이원 그룹 교체 기, 이원 그룹 에서 K 는 Key 를 표시 하고 S 는 현재 시간 대 실행 이 끝 난 후에 얻 은 누적 값 (즉 최신 상태) 을 표시 합 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark Streaming의 통계 소켓 단어 수1. socket 단어 수 통계 TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다. 2. maven 설정 3. 프로그래밍 코드 입력 내용 결과 내보내기...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.