json 데이터를 대상으로 비추는 처리 방법
                                            
 3965 단어  spark
                    
    case class Person(channel: String,
                      IP: String,
                      mid: String,
                      user_id_temp: String,
                      user_id: String,
                      request_type: String,
                      request_method: String,
                      access_time: String) extends Serializable {
      private val serialVersionUID = 7247714666080613254L
    }
val conf = new SparkConf()
    //    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //    conf.registerKryoClasses(Array(classOf[Person]))
    val spark = SparkSession.builder().master("local[*]").appName("sql").config(conf).getOrCreate()
    val input5 = spark.sparkContext.textFile("file:///C:/.../Request", 100)
    val input = input5.flatMap(line => {
      implicit val formats = Serialization.formats(ShortTypeHints(List()))  // 
      val data = parse(line)      // json 
      val eventa = data.extract[Person]
      Some(eventa)
    }).persist()
    input.foreachPartition(s => {
      val topic = "xx"
      val brokers = "xx"
      val props = new Properties()
      props.put("metadata.broker.list", brokers)
      props.put("serializer.class", "kafka.serializer.StringEncoder")
      val kafkaConfig = new ProducerConfig(props)
      val producer = new Producer[String, Object](kafkaConfig)
      val event = new JSONObject()
      while(s.hasNext){
        val x = s.next()
        event
          .put("access_time", x.access_time)
          .put("channel", x.channel) //
          .put("ip", x.IP) //
          .put("mid", x.mid) //
          .put("user_id", x.user_id)
          .put("user_id_temp", x.user_id_temp)
          .put("request_type", x.request_type)
          .put("request_method", x.request_method)
        println(event)
        producer.send(new KeyedMessage[String, Object](topic, event.toString))
        Thread.sleep(100)
      }
                이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.