Sparkstreaming 소비 Kafka Elasticsearch 쓰기
1. 버전 설명
jdk 1.8
spark 2.3
elasticsearch 7.4
scala 2.11
Kafka 0.10
2.pom 의존
org.apache.kafka
kafka_2.11
1.0.0
org.apache.spark
spark-core_${scala.version}
${spark.version}
provided
org.apache.spark
spark-streaming_${scala.version}
${spark.version}
provided
org.apache.spark
spark-sql_2.11
${spark.version}
org.apache.spark
spark-streaming-kafka-0-10_${scala.version}
${spark.version}
org.elasticsearch
elasticsearch-hadoop
7.4.0
org.scalikejdbc
scalikejdbc_2.11
2.2.1
mysql
mysql-connector-java
5.1.38
org.elasticsearch.client
elasticsearch-rest-high-level-client
7.4.0
3. 코드
(이 코드는 kafka에서 데이터를 읽고 Es에 쓰고 kafka offset을 MySQL에 저장합니다.)
object DataToElasticsearch {
private val log = LoggerFactory.getLogger(RunApplication.getClass)
def main(args: Array[String]): Unit = {
val ssc = run_task()
ssc.start()
ssc.awaitTermination()
}
def run_task(): StreamingContext = {
val conf = ConfigFactory.load("application.conf")
val dt: String = LocalDate.now.toString
val spark = SparkSession.builder().appName("xxxxxxx")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.streaming.kafka.maxRatePerPartition", conf.getString("maxRatePerPartition"))
.config("es.index.auto.create", "true")
.config("es.nodes", "xx.xx.xx.xx")
.config("es.port", "9200")
.getOrCreate()
val sc = spark.sparkContext
val batchDuration = conf.getInt("batchDuration")
val ssc = new StreamingContext(sc, Seconds(batchDuration))
//kafka
val topic = conf.getString("kafka.topic")
val brokers = conf.getString("kafka.brokers")
val group = conf.getString("kafka.group")
val topics = Array(topic)
// JDBC
val jdbcDriver = conf.getString("jdbc.driver")
val jdbcUrl = conf.getString("jdbc.url")
val jdbcUser = conf.getString("jdbc.user")
val jdbcPassword = conf.getString("jdbc.password")
val jdbcTable = conf.getString("jdbc.table")
OffsetDetails(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
// offset
val fromOffsets: java.util.Map[TopicPartition, java.lang.Long] = get_offset(topic, jdbcTable)
var flag = false
// Kafka
val kafkaParams = get_kafkaParams(brokers, group)
// DStream
val kafkaStream = create_kafkaStream(ssc, kafkaParams, fromOffsets, topics, fromOffsets.keySet())
//
kafkaStream.foreachRDD(rdd => {
try {
if (!rdd.isEmpty()) {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val lines = rdd.map(_.value)
rdd.map(_.key()).foreach(key=>println("+key++++++++++++++"+key))
EsSpark.saveJsonToEs(lines, "spark/docs")
if ((!fromOffsets.isEmpty) | flag) {
//update offset
save_offset(offsetRanges, jdbcTable)
}
else {
//init offset
insert_offset(offsetRanges, jdbcTable)
flag = true
}
}
} catch {
case e: Throwable =>
System.out.println("Error" + e.printStackTrace())
}
})
ssc
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.