SparkStreaming+Kafka는 캐시 기반의 실시간 wordcount 프로그램을 실현합니다

내 오리지널 주소:https://dongkelun.com/2018/06/14/updateStateBykeyWordCount/
전언
본고는 SparkStreaming과 Kafka를 이용하여 캐시 기반의 실시간 wordcount 프로그램을 실현하고자 한다. 무슨 뜻인가. 일반적인 SparkStreaming의 wordcount 프로그램, 예를 들어 홈페이지의 경우 최신 시간 간격 내의 모든 단어의 수량만 통계할 수 있을 뿐 역사를 누적할 수 없기 때문이다. 본고는 강좌를 본 후에 스스로 Kafka의 프로그램을 실현하여 여기에 기록하고자 한다.사실 어렵지 않아요. 업데이트 StateByKey 산수 하나만 쓰면 이루어질 수 있어요. 이 산수를 처음 써봐서 딱 배워볼게요.
1. 데이터
데이터는 내가 랜덤으로 카프카에서 생산한 몇 가지로 단어는 빈칸으로 구분된다
2、kafka topic
우선 kafka에서 topic: Update StateBykeyWordCount에 사용할 프로그램을 만듭니다
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic UpdateStateBykeyWordCount

3. checkpoint의hdfs 디렉터리 만들기
내 디렉토리:/spark/dkl/kafka/wordcountcheckpoint
hadoop fs -mkdir -p /spark/dkl/kafka/wordcount_checkpoint

4. 스파크 코드
다음 프로그램 시작하기
package com.dkl.leanring.spark.kafka

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object UpdateStateBykeyWordCount {

  def main(args: Array[String]): Unit = {
    //   ,  SparkSession
    val spark = SparkSession.builder().appName("sskt").master("local[2]").enableHiveSupport().getOrCreate()
    //   ,  sparkContext
    val sc = spark.sparkContext
    //   ,  StreamingContext,batchDuration 1 
    val ssc = new StreamingContext(sc, Seconds(5))

    //  checkpoint  
    ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/wordcount_checkpoint")

    //kafka    
    val server = "ambari.master.com:6667"

    //     
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> server, //kafka    
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "UpdateStateBykeyWordCount", //     
      "auto.offset.reset" -> "latest", //latest                 earliest 、none
      "enable.auto.commit" -> (false: java.lang.Boolean)) //   true,                  
    val topics = Array("UpdateStateBykeyWordCount") //    

    //  Direct    DStream
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    //    WordCount  

    //           ,     (word,1)  
    val words = stream.flatMap(_.value().split(" ")).map((_, 1))
    val wordCounts = words.updateStateByKey(
      //      batch             
      //        key      ,     ,  (hello,1)(hello,1),  values (1,1)
      //        key        
      (values: Seq[Int], state: Option[Int]) => {

        var newValue = state.getOrElse(0)
        values.foreach(newValue += _)
        Option(newValue)

      })
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }

}

5. 몇 가지 데이터 생산
아무렇게나 몇 개 쓰면 된다
bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic UpdateStateBykeyWordCount 

6. 결과
결과에 의하면 역사의 단어도 통계적으로 인쇄되었다

좋은 웹페이지 즐겨찾기