json 데이터를 대상으로 비추는 처리 방법

3965 단어 spark
    case class Person(channel: String,
                      IP: String,
                      mid: String,
                      user_id_temp: String,
                      user_id: String,
                      request_type: String,
                      request_method: String,
                      access_time: String) extends Serializable {
      private val serialVersionUID = 7247714666080613254L
    }
val conf = new SparkConf()
    //    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //    conf.registerKryoClasses(Array(classOf[Person]))

    val spark = SparkSession.builder().master("local[*]").appName("sql").config(conf).getOrCreate()

    val input5 = spark.sparkContext.textFile("file:///C:/.../Request", 100)

    val input = input5.flatMap(line => {
      implicit val formats = Serialization.formats(ShortTypeHints(List()))  // 
      val data = parse(line)      // json 
      val eventa = data.extract[Person]
      Some(eventa)
    }).persist()

    input.foreachPartition(s => {
      val topic = "xx"
      val brokers = "xx"
      val props = new Properties()
      props.put("metadata.broker.list", brokers)
      props.put("serializer.class", "kafka.serializer.StringEncoder")
      val kafkaConfig = new ProducerConfig(props)
      val producer = new Producer[String, Object](kafkaConfig)
      val event = new JSONObject()
      while(s.hasNext){
        val x = s.next()
        event
          .put("access_time", x.access_time)
          .put("channel", x.channel) //
          .put("ip", x.IP) //
          .put("mid", x.mid) //
          .put("user_id", x.user_id)
          .put("user_id_temp", x.user_id_temp)
          .put("request_type", x.request_type)
          .put("request_method", x.request_method)
        println(event)
        producer.send(new KeyedMessage[String, Object](topic, event.toString))
        Thread.sleep(100)
      }

좋은 웹페이지 즐겨찾기