Spark UDF 처리 Array < struct > 구조
975 단어 spark
긴 말 하지 말고 바로 코드 를 올 려 라
데이터 구조:
root
|-- id: string (nullable = true)
|-- offset: long (nullable = true)
|-- tags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- tagId: integer (nullable = true)
| | |-- weight: double (nullable = true)
/ / udf 함수 정의, 열 추가
val computeScoreFun:mutable.WrappedArray[Row]=>Double ={tags=>
val scoreMap = tags.map{row =>
val tagId = row.getInt(row.fieldIndex("tagId"))
val weight = row.getDouble(row.fieldIndex("weight"))
tagId -> weight
}.toMap
val addScore =newAdds.map(id =>scoreMap.getOrElse(id,0.0)).reduce(_ + _)
val minusScore = newMinus.map(id =>scoreMap.getOrElse(id,0.0)).reduce(_ + _)
addScore - minusScore
}
val scoreUDF =udf(computeScoreFun)
// score
spark.read.parquet(input).withColumn("score",scoreUDF($"tags"))
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.