Spark UDF 처리 Array < struct > 구조

975 단어 spark
UDF 처리 Array 구조
긴 말 하지 말고 바로 코드 를 올 려 라
데이터 구조:
root
 |-- id: string (nullable = true)
 |-- offset: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tagId: integer (nullable = true)
 |    |    |-- weight: double (nullable = true)
 
/ / udf 함수 정의, 열 추가
val computeScoreFun:mutable.WrappedArray[Row]=>Double ={tags=>
  val scoreMap = tags.map{row =>
    val tagId = row.getInt(row.fieldIndex("tagId"))
    val weight = row.getDouble(row.fieldIndex("weight"))
    tagId -> weight
  }.toMap
  val addScore =newAdds.map(id =>scoreMap.getOrElse(id,0.0)).reduce(_ + _)
  val minusScore = newMinus.map(id =>scoreMap.getOrElse(id,0.0)).reduce(_ + _)
  addScore - minusScore
}

val scoreUDF =udf(computeScoreFun)
//    score
spark.read.parquet(input).withColumn("score",scoreUDF($"tags"))

좋은 웹페이지 즐겨찾기