SparkStreaming+Kafka는 캐시 기반의 실시간 wordcount 프로그램을 실현합니다
5483 단어 sparkkafkaSparkStreamingSpark
전언
본고는 SparkStreaming과 Kafka를 이용하여 캐시 기반의 실시간 wordcount 프로그램을 실현하고자 한다. 무슨 뜻인가. 일반적인 SparkStreaming의 wordcount 프로그램, 예를 들어 홈페이지의 경우 최신 시간 간격 내의 모든 단어의 수량만 통계할 수 있을 뿐 역사를 누적할 수 없기 때문이다. 본고는 강좌를 본 후에 스스로 Kafka의 프로그램을 실현하여 여기에 기록하고자 한다.사실 어렵지 않아요. 업데이트 StateByKey 산수 하나만 쓰면 이루어질 수 있어요. 이 산수를 처음 써봐서 딱 배워볼게요.
1. 데이터
데이터는 내가 랜덤으로 카프카에서 생산한 몇 가지로 단어는 빈칸으로 구분된다
2、kafka topic
우선 kafka에서 topic: Update StateBykeyWordCount에 사용할 프로그램을 만듭니다
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic UpdateStateBykeyWordCount
3. checkpoint의hdfs 디렉터리 만들기
내 디렉토리:/spark/dkl/kafka/wordcountcheckpoint
hadoop fs -mkdir -p /spark/dkl/kafka/wordcount_checkpoint
4. 스파크 코드
다음 프로그램 시작하기
package com.dkl.leanring.spark.kafka
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object UpdateStateBykeyWordCount {
def main(args: Array[String]): Unit = {
// , SparkSession
val spark = SparkSession.builder().appName("sskt").master("local[2]").enableHiveSupport().getOrCreate()
// , sparkContext
val sc = spark.sparkContext
// , StreamingContext,batchDuration 1
val ssc = new StreamingContext(sc, Seconds(5))
// checkpoint
ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/wordcount_checkpoint")
//kafka
val server = "ambari.master.com:6667"
//
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> server, //kafka
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "UpdateStateBykeyWordCount", //
"auto.offset.reset" -> "latest", //latest earliest 、none
"enable.auto.commit" -> (false: java.lang.Boolean)) // true,
val topics = Array("UpdateStateBykeyWordCount") //
// Direct DStream
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
// WordCount
// , (word,1)
val words = stream.flatMap(_.value().split(" ")).map((_, 1))
val wordCounts = words.updateStateByKey(
// batch
// key , , (hello,1)(hello,1), values (1,1)
// key
(values: Seq[Int], state: Option[Int]) => {
var newValue = state.getOrElse(0)
values.foreach(newValue += _)
Option(newValue)
})
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
5. 몇 가지 데이터 생산
아무렇게나 몇 개 쓰면 된다
bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic UpdateStateBykeyWordCount
6. 결과
결과에 의하면 역사의 단어도 통계적으로 인쇄되었다
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.