Scala-005DataFrame 에서 UDF 사용
14967 단어 ★★★Scala#★★Scala 응용
구조 데이터
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{
Vector, Vectors}
import org.apache.spark.sql.{
DataFrame, Row, SparkSession}
Intitializing Scala interpreter ...
Spark Web UI available at 11111111111:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1598929668275)
SparkSession available as 'spark'
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
val builder = SparkSession
.builder()
.appName("learningScala")
.config("spark.executor.heartbeatInterval","60s")
.config("spark.network.timeout","120s")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer.max","512m")
.config("spark.dynamicAllocation.enabled", false)
.config("spark.sql.inMemoryColumnarStorage.compressed", true)
.config("spark.sql.inMemoryColumnarStorage.batchSize", 10000)
.config("spark.sql.broadcastTimeout", 600)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.crossJoin.enabled", true)
.master("local[*]")
val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@64837d8
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@542c0943
var df = Seq(
("A", 1, 4,7),
("B", 2, 5,8),
("C", 3 ,6,9)).toDF("id", "x", "y","z")
df.show(truncate=false)
+---+---+---+---+
|id |x |y |z |
+---+---+---+---+
|A |1 |4 |7 |
|B |2 |5 |8 |
|C |3 |6 |9 |
+---+---+---+---+
df: org.apache.spark.sql.DataFrame = [id: string, x: int ... 2 more fields]
df.printSchema()
root
|-- id: string (nullable = true)
|-- x: integer (nullable = false)
|-- y: integer (nullable = false)
|-- z: integer (nullable = false)
방법 1
이 방법 은 외부 에서 볼 수 있 으 며 DataFrame 에서 직접 사용 할 수 있 으 나 spark.sql 에서 사용 할 수 없습니다.
def add_one(useCol1:Int,useCol2:Int)={
useCol1+useCol2
}
add_one: (useCol1: Int, useCol2: Int)Int
import org.apache.spark.sql.functions.{
udf,col}
val add_one_udf = udf(add_one(_:Int,_:Int))
import org.apache.spark.sql.functions.{udf, col}
add_one_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,IntegerType,Some(List(IntegerType, IntegerType)))
df.withColumn("sum",add_one_udf(col("y"),col("z"))).show(truncate=false)
+---+---+---+---+---+
|id |x |y |z |sum|
+---+---+---+---+---+
|A |1 |4 |7 |11 |
|B |2 |5 |8 |13 |
|C |3 |6 |9 |15 |
+---+---+---+---+---+
방법 2
이 방법 은 spark.sql 에서 사용 해 야 하 는데,callUDF 를 통 해 DataFrame 에서 도 사용 할 수 있 습 니 다.
spark.udf.register("add_one_udf2", add_one _)
res16: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(,IntegerType,Some(List(IntegerType, IntegerType)))
import org.apache.spark.sql.functions
df.withColumn("sum", functions.callUDF("add_one_udf2", col("y"),col("z"))).show(truncate=false)
+---+---+---+---+---+
|id |x |y |z |sum|
+---+---+---+---+---+
|A |1 |4 |7 |11 |
|B |2 |5 |8 |13 |
|C |3 |6 |9 |15 |
+---+---+---+---+---+
import org.apache.spark.sql.functions
df.createOrReplaceTempView("df")
spark.sql("select *,add_one_udf2(y,z) AS sum from df").show()
+---+---+---+---+---+
| id| x| y| z|sum|
+---+---+---+---+---+
| A| 1| 4| 7| 11|
| B| 2| 5| 8| 13|
| C| 3| 6| 9| 15|
+---+---+---+---+---+
고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 고 난 징 시 강녕 구 구 룡 호
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Git에서 개발 환경을 정리해 보았습니다.로컬에 작업 디렉토리 만들기 mkdir [ワーキングディレクトリ名] 작업 디렉토리로 이동 cd [ワーキングディレクトリ名] 작업 디렉토리 초기화 git init git로 연결할 원격 리포지토리를 만듭니다. 이 때 REA...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.