Sparkstreaming 소비 Kafka Elasticsearch 쓰기

5046 단어

1. 버전 설명


    jdk  1.8
    spark  2.3
     elasticsearch  7.4
     scala    2.11
     Kafka    0.10
 

2.pom 의존

   
      org.apache.kafka
      kafka_2.11
      1.0.0
    
    
      org.apache.spark
      spark-core_${scala.version}
      ${spark.version}
      provided
    

    
      org.apache.spark
      spark-streaming_${scala.version}
      ${spark.version}
      provided
    

    
      org.apache.spark
      spark-sql_2.11
      ${spark.version}
    

    
      org.apache.spark
      spark-streaming-kafka-0-10_${scala.version}
      ${spark.version}
    

    
      org.elasticsearch
      elasticsearch-hadoop
      7.4.0
    


    
      org.scalikejdbc
      scalikejdbc_2.11
      2.2.1
    
    
      mysql
      mysql-connector-java
      5.1.38
    

    
      org.elasticsearch.client
      elasticsearch-rest-high-level-client
      7.4.0
    

3. 코드


(이 코드는 kafka에서 데이터를 읽고 Es에 쓰고 kafka offset을 MySQL에 저장합니다.)
object DataToElasticsearch {
    private val log = LoggerFactory.getLogger(RunApplication.getClass)
    def main(args: Array[String]): Unit = {
        val ssc = run_task()
        ssc.start()
        ssc.awaitTermination()
    }

    def run_task(): StreamingContext = {
        val conf = ConfigFactory.load("application.conf")
        val dt: String = LocalDate.now.toString
        val spark = SparkSession.builder().appName("xxxxxxx")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.streaming.stopGracefullyOnShutdown", "true")
                .config("spark.streaming.kafka.maxRatePerPartition", conf.getString("maxRatePerPartition"))
                .config("es.index.auto.create", "true")
                .config("es.nodes", "xx.xx.xx.xx")
                .config("es.port", "9200")
                .getOrCreate()

        val sc = spark.sparkContext
        val batchDuration = conf.getInt("batchDuration")
        val ssc = new StreamingContext(sc, Seconds(batchDuration))
       
        //kafka  
        val topic = conf.getString("kafka.topic")
        val brokers = conf.getString("kafka.brokers")
        val group = conf.getString("kafka.group")
        val topics = Array(topic)

        // JDBC 
        val jdbcDriver = conf.getString("jdbc.driver")
        val jdbcUrl = conf.getString("jdbc.url")
        val jdbcUser = conf.getString("jdbc.user")
        val jdbcPassword = conf.getString("jdbc.password")
        val jdbcTable = conf.getString("jdbc.table")

        OffsetDetails(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)

        // offset
        val fromOffsets: java.util.Map[TopicPartition, java.lang.Long] = get_offset(topic, jdbcTable)
        var flag = false

        // Kafka 
        val kafkaParams = get_kafkaParams(brokers, group)

        // DStream
        val kafkaStream = create_kafkaStream(ssc, kafkaParams, fromOffsets, topics, fromOffsets.keySet())

        // 
        kafkaStream.foreachRDD(rdd => {
            try {
                if (!rdd.isEmpty()) {
                    val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                    val lines = rdd.map(_.value)
                    rdd.map(_.key()).foreach(key=>println("+key++++++++++++++"+key))
                    EsSpark.saveJsonToEs(lines, "spark/docs")
                    if ((!fromOffsets.isEmpty) | flag) {
                        //update offset
                        save_offset(offsetRanges, jdbcTable)
                    }


                    else {
                        //init offset
                        insert_offset(offsetRanges, jdbcTable)
                        flag = true
                    }
                }

            } catch {
                case e: Throwable =>
                    System.out.println("Error" + e.printStackTrace())
            }
        })

        ssc
    }

}

좋은 웹페이지 즐겨찾기