Spark Streaming 통합 Kafka 의 Offset 관리 [데이터 제로 잃 어 버 린 MySQL 관리 Offset]

22962 단어 spark
앞 에 쓰기:
Spark Streaming 을 사용 하여 Kafka 0.8 버 전 을 통합 할 때 spark - streaming - kafka - 0 - 8 은 offset 관 리 를 제공 하지 않 습 니 다.데이터 의 제로 손실 을 확보 하기 위해 서 우 리 는 스스로 이 오프셋 을 관리 해 야 한다.
참조:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html
저 희 는 MySQL 에 편 이 량 을 저장 하여 관리 합 니 다.
빠 른 입문 은 scalikejdbc 를 사용 하여 MySQL 을 조작 합 니 다:
1. 가 져 오기 의존
      
      
          org.scalikejdbc
          scalikejdbc-config_2.11
          2.5.0
      

2. resource 파일 에 새 application. conf 파일 을 만 듭 니 다. 설정 은 다음 과 같 습 니 다.
# MySQL example    
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8"
db.default.user="root"
db.default.password="123456"

3.Scalikejdbc Demo
package com.csylh.logAnalysis.scalikejdbc

import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs


/**
  * scala   mysql,  ScaLikeJdbc           
  */
object ScaLikeJdbcApp {

  def main(args: Array[String]): Unit = {

    //  application.conf   
    DBs.setup()
    //DBs.setupAll()
    DB.autoCommit {
        implicit session =>
        SQL("insert into people(name,age,fv) values(?,?,?)")
          .bind("  ", 22, 88)
          .update().apply()
        }
    }

    def delete() = {
      DB.autoCommit {
        implicit session =>
        SQL("delete from people where name = ?")
          .bind("  ")
          .update().apply()
      }
    }


    def update() {
      DB.autoCommit { implicit session =>
        SQL("update people set age = ? where name = ?")
          .bind(18, "  ")
          .update().apply()
      }
    }

  /**
    * select            rs    ,                
    */
  def select() = {
      DB.readOnly {
        implicit session =>
        val sql = SQL("select * from people ").map(rs =>
          (rs.string("name"), rs.int("age"))
        ).toList().apply()
      }
    }

}

제로 손실 데이터 솔 루 션 코드 는 다음 과 같 습 니 다.
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.config.DBs

/**
  * Description: Spark Streaming   Kafka        
  *           MySQL/zk/kafka/hbase/redis ...       offset      
  *         MySQL + scalikejdbc
  *
  * @Author:   36
  * @Date: 2019/8/8 11:24
  */
object OffsetApp {
  def main(args:Array[String]){

    val conf= new SparkConf().setAppName("OffsetApp").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    val kafkaParams = Map(
      "metadata.broker.list"->"192.168.1.116:9092",
      "group.id"->"liuge.group.id" ,//                
      "auto.offset.reset" -> "smallest"
    )
    val topics = "test".split(",").toSet
    //     :   offset
    import scalikejdbc._
    /**
      * tuple   map
      * ().list.apply().toMap
      */
    DBs.setup()
    val fromOffsets =
      DB.readOnly {
        implicit session => {
          SQL("select * from offsets_storage ").map(rs =>
            (TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
          ).list().apply()
        }
    }.toMap

    for (ele <- fromOffsets){
      println("  MySQL       : " + ele._1.topic + ":" + ele._1.partition +":"+ele._2)
    }

    //    : Direct     Kafka ,  InputDStream
    val stream = if (fromOffsets.isEmpty){
      //      ,    Kafka ==> stream
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)

    }else{ //     ,    Kafka ==> stream
      val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

    }
    //    :      +   offset
    stream.foreachRDD(rdd => {

      // TODO.. 3.1           ,  count()  
      println("         :" + rdd.count())


      // 3.2     offset     
      var  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println("          :"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

        DB.autoCommit{
          implicit session => {
            SQL("replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)")
              .bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
          }
        }
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

여기까지 만 하면 우 리 는 카 프 카 의 데 이 터 를 비교적 잘 소비 할 수 있 을 것 이다.
어떤 잘못된 점 이 있 으 면 지 증 을 환영 합 니 다!감사합니다 ~ ~
KafkaProducer [자바 버 전] 시 뮬 레이 션 데이터

좋은 웹페이지 즐겨찾기