Snowflake 머신러닝: Snowpak에서 RandomForest 추론 수행


본 보도의 내용

  • Snowpark
  • 사용
  • Snowflake의 컴퓨팅 리소스로 추론하고자 함
  • PML을 사용하여 Snowflake에서 아이리스를 만듭니다!
  • 참고 문장

  • Snowpark 0.6.0 - com.snowflake.snowpark
  • Setting Up Visual Studio Code for Snowpark — Snowflake Documentation
  • Creating User-Defined Functions (UDFs) for DataFrames — Snowflake Documentation
  • 데이터 프레임의 사용자정의 함수(UDFs) 생성 - UDF에서 파일 읽기
  • 컨디션

  • OS:
  • MacOS BigSur
  • VSCode:
  • Metals: 1.10.6
  • Scala:
  • scala: version 2.12.13
  • snowpark: 0.6.0
  • typesafe: 1.4.1
  • pmml4s: 0.9.11
  • Python:
  • python: 3.8.6
  • pandas: 1.2.3
  • sklearn: 0.24.2
  • sklearn2pmml: 0.73.2
  • pypmml: 0.9.11
  • 미리 준비하다

  • VScode에서 Snowpark 처리 가능
    https://docs.snowflake.com/en/developer-guide/snowpark/quickstart-vscode.html
  • iris.데이터를 다운로드하여 Snowflak에 업로드
    https://datumstudio.jp/blog/1225_snowflake07/
  • 물줄기


    이번 보도의 흐름은
    sklearn을 사용합니다.PMML 형식으로 RandomForest로 데이터를 분류하는 모델 만들기
  • Snowpark에서 PMML 파일을 사용하는 아이리스.데이터 분류에 대한 UDF 생성
  • Snowpark 실행 및 분류 모델을 Snowflake에 적용한 데이터
  • HAPPY
  • 추론용 모형을 만들다


    우선, 우리는 sklearn의 RandomForestClassifier를 사용하여 클라리넷 모형을 실현할 것이다.
    오류를 시험하면서 작업하고 싶어서 Jupter Notebook을 사용했어요.
    먼저 다운로드한 아이리스.데이터 읽기에서 시작합니다.
    Snowflak에 로드된 머리글의 열 이름과 일치합니다.
    create_clasification_model.py
    import pandas as pd
    
    headers = [
    	"SEPAL_LENGTH",
    	"SEPAL_WIDTH",
    	"PETAL_LENGTH",
    	"PETAL_WIDTH",
    	"CLASS"
    ]
    iris_df = pd.read_csv("iris.data", names=headers)
    iris_df.head()
    
    SEPAL_LENGTH	SEPAL_WIDTH	PETAL_LENGTH	PETAL_WIDTH	CLASS
    0	5.1	3.5	1.4	0.2	Iris-setosa
    1	4.9	3.0	1.4	0.2	Iris-setosa
    2	4.7	3.2	1.3	0.2	Iris-setosa
    3	4.6	3.1	1.5	0.2	Iris-setosa
    4	5.0	3.6	1.4	0.2	Iris-setosa
    
    읽기 확인은 매우 간단합니다.
    그런 다음 PML 형식으로 모델을 만듭니다.
    데이터를 특징량과 목표로 나누다
    create_clasification_model.py
    from sklearn.model_selection import train_test_split
    
    iris_X = iris_df[iris_df.columns.difference(["CLASS"])]
    iris_y = iris_df["CLASS"]
    
    X_train, X_test, y_train, y_test = train_test_split(iris_X, iris_y, test_size=0.3) # 70% 
    
    PMML 형식으로 분류 모델 만들기
    create_clustering_model.py
    from sklearn.ensemble import RandomForestClassifier
    from sklearn2pmml.pipeline import PMMLPipeline
    
    pipeline = PMMLPipeline([
        ("classifier", RandomForestClassifier(n_estimators=100))
    ])
    pipeline.fit(X_train, y_train)
    
    우선 정밀도를 확인한다.
    create_clustering_model.py
    from sklearn import metrics
    
    y_pred = pipeline.predict(X_test)
    metrics.accuracy_score(y_test, y_pred)
    
    0.9111111111111111
    
    accuracy의 분류 모델을 어느 정도 제작할 수 있다.
    다음은pmml 파일로 씁니다.
    create_clustering_model.py
    from sklearn2pmml import sklearn2pmml
    
    sklearn2pmml(pipeline, "RandomForestIris.pmml", with_repr = True)
    
    나는 현재 디렉터리 RandomForestIris.pmml 에 파일이 있을 것이라고 생각한다.
    신중하게 보기 위해서 모델이 생성되었는지 잘 확인해 주세요.
    파일 읽기, iris-df에 적용됩니다.
    create_clustering_model.py
    from pypmml import Model
    
    model = Model.fromFile("RandomForestIris.pmml")
    result = model.predict(iris_df)
    
    result.head()
    
      probability(Iris-setosa)	probability(Iris-versicolor)	probability(Iris-virginica)
    0	1.00	0.00	0.0
    1	0.99	0.01	0.0
    2	1.00	0.00	0.0
    3	1.00	0.00	0.0
    4	1.00	0.00	0.0
    
    잘 추론할 수 있을 것 같은데.

    의존 관계를 해결하기 위한 준비


    제작된 모델과 다양한 UDF 실행에 필요한 파일/프로그램 라이브러리를jar로 설정합니다.
    위치는 src/main/resouces 아래에 있습니다.

    모델


    제작된 PML 모델을jar 파일로 만듭니다.
    프로젝트 경로에 따라
    cd src/main/resouces && jar cvf iris.jar path/to/model/RandomForestIris.pmml
    

    기타


    다음 프로그램 라이브러리에 대해jar 파일을 다운로드/생성합니다
  • pmml4s
  • spray-json
  • scala-xml
    mvnrepository에서 다운로드해 왔어요.
    https://mvnrepository.com/
  • Snowpark을 사용하여 UDF를 만들어 Snowflak에 업로드


    여기서부터 VS코드에서 일합니다.
    Snowflake 공식 튜토리얼을 전제로 메탈스 설치가 완료됐고, hello-world PJ 제작이 완료됐다.
    우선build입니다.sbt에 의존 관계를 추가합니다.
    build.sbt
    # 追記
    resolvers += "OSGeo Release Repository" at "https://repo.osgeo.org/repository/release/"
    
    libraryDependencies ++= Seq(
        "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2",
        "com.snowflake" % "snowpark" % "0.6.0",
        "com.typesafe" % "config" % "1.4.1",
        "org.pmml4s" %% "pmml4s" % "0.9.11"
    )
    
    
    변경 사항을 저장하고 import change를 누르면 의존 패키지를 다운로드합니다.

    데이터에 적용해 보다


    튜토리얼에도 Snowflake 세션을 먼저 만듭니다.
    이번에는 typesafe의 ConfigFactory를 취미로 사용해 config를 제작할 예정이다.
    src/main/resources/application.conf
    snowflake {
        url = "https://{YOUR_SNOWFLAKE_ACCOUNT}.snowflakecomputing.com:443",
        user = "{USER}",
        password = "{PASSWORD}",
        role = "{ROLE}",
        warehouse = "{WAREHOUSE}",
        db = "{DATABASE}",
        schema = "{SCHEMA}"
    }
    
    
    세션 생성
    src/main/scala/Main.scala
    object Main {
      def main(args: Array[String]): Unit = {
    ...
    
    	val conf = ConfigFactory.load
    	val configs = Map(
    		"URL" -> conf.getString("snowflake.url"),
    		"USER" -> conf.getString("snowflake.user"),
    		"PASSWORD" -> conf.getString("snowflake.password"),
    		"ROLE" -> conf.getString("snowflake.role"),
    		"WAREHOUSE" -> conf.getString("snowflake.warehouse"),
    		"DB" -> conf.getString("snowflake.db"),
    		"SCHEMA" -> conf.getString("snowflake.schema")
    	)
    
    	val session = Session.builder.configs(configs).create
    
    ...
    }
    
    
    세션에 의존 관계를 포함합니다.
    이 시점의 주의 사항은 데이터 프레임의 사용자정의 함수(UDFs) 생성 - UDF에서 파일 읽기에 나와 있습니다.
    Snowpark 라이브러리는 UDFS를 서버에 업로드하여 실행합니다.UDF가 파일에서 데이터를 읽어야 하는 경우 파일이 UDF와 함께 업로드되는지 확인해야 합니다.
    이번에는 UDF에서 이미 배운 PMML 모델 파일을 읽어야 하기 때문에 모델 파일도 의존자에게 추가됩니다.
    src/main/scala/Main.scala
    object Main {
      def main(args: Array[String]): Unit = {
    ...
    
    	val libPath = new java.io.File("").getAbsolutePath
    	session.addDependency(s"$libPath/src/main/resources/pmml4s_2.12-0.9.11.jar")
    	session.addDependency(s"$libPath/src/main/resources/spray-json_2.12-1.3.6.jar")
    	session.addDependency(s"$libPath/src/main/resources/scala-xml_2.12-1.2.0.jar")
    	session.addDependency(s"$libPath/src/main/resources/iris.jar")
    
    ...
    }
    
    
    일부 데이터를 그릴 수 있는지 없는지를 시험해 보자.
    src/main/scala/Main.scala
    object Main {
      def main(args: Array[String]): Unit = {
    ...
    
    	val irisSchema = StructType(
    		StructField("sepal_length", DoubleType, nullable = true) ::
    		StructField("sepal_width", DoubleType, nullable = true) ::
    		StructField("petal_length", DoubleType, nullable = true) ::
    		StructField("petal_width", DoubleType, nullable = true) ::
    		StructField("class", StringType, nullable = true) ::
    		Nil
    	)
    	val df = session.read.schema(irisSchema).table("iris_data")
    	println(df.show())
    
    ...
    }
    
    VS코드의 경우 편집기에서 Main Object 위에 있는 행run|debug에서 확인을 시작합니다.
    ---------------------------------------------------------------------------------
    |"SEPAL_LENGTH"  |"SEPAL_WIDTH"  |"PETAL_LENGTH"  |"PETAL_WIDTH"  |"CLASS"      |
    ---------------------------------------------------------------------------------
    |5.1             |3.5            |1.4             |0.2            |Iris-setosa  |
    |4.9             |3.0            |1.4             |0.2            |Iris-setosa  |
    |4.7             |3.2            |1.3             |0.2            |Iris-setosa  |
    |4.6             |3.1            |1.5             |0.2            |Iris-setosa  |
    |5.0             |3.6            |1.4             |0.2            |Iris-setosa  |
    |5.4             |3.9            |1.7             |0.4            |Iris-setosa  |
    |4.6             |3.4            |1.4             |0.3            |Iris-setosa  |
    |5.0             |3.4            |1.5             |0.2            |Iris-setosa  |
    |4.4             |2.9            |1.4             |0.2            |Iris-setosa  |
    |4.9             |3.1            |1.5             |0.1            |Iris-setosa  |
    ---------------------------------------------------------------------------------
    
    결과가 돌아왔다!
    그런 다음 모델을 적용할 UDF를 정의합니다.
    세션에 추가된jar 파일에서 모델을 읽기 위해서는 UDF에서 정의하는 것이 좋습니다.
    var resourceName = "/RandomForestIris.pmml"
    몇 가지 주의사항이 있습니다.
  • jar 파일 내용을 펼칠 때 PMML 파일 경로를 지정합니다.
  • runmodel.predict()로 되돌아오기 때문에 수치 CAST를 수치로 처리하고 함수를 처리할 수 있습니다.
  • RandomForestIris.pmml 파일을 보았는데 schema가 정의한 열의 순서는petal->sepal의 순서입니다.(학습할 때 이러한 열 순서에 따라 지정될 수 있음)과 상응하여 모델에 전달된 열을 정렬합니다.
  • 유석에 함부로 배열하지 않을 것 같아서 조작 절차의 오류라고 생각했다
  • RandomForest는 분류하고 싶은 종류마다probability가 있기 때문에 이 수치에서 max가 된 index에서 모델 클래스 내의classes에서 실제 분류 목표를 뺀다.
  • 더 적합한 방법이 있으면 알려주세요.
  • src/main/scala/Main.scala
    class SerTestFunc extends Serializable {
      val rfFunc = (
        petal_length: Double,
        petal_width: Double,
        sepal_length: Double,
        sepal_width: Double) => {
          import java.io._
          var resourceName = "/RandomForestIris.pmml"
          var inputStream = classOf[com.snowflake.snowpark.DataFrame]
            .getResourceAsStream(resourceName)
          val model = Model.fromInputStream(inputStream)
          val v = Array[Double](petal_length, petal_width, sepal_length, sepal_width)
          val pred = model.predict(v).map(_.asInstanceOf[Double])
          model.classes(pred.indices.maxBy(pred)).toString()
        }
    }
    
    그럼 드디어 모형을 실제로 응용해 봅시다!
    Spark DataFrame과 마찬가지로 평가가 지연되기 때문에.
    println(df)
    
    그래도 데이터 프레임의 내용을 평가하고 수치를 표시하지 않는다.
    src/main/scala/Main.scala
    object Main {
      def main(args: Array[String]): Unit = {
        val df = getIrisDf(session)
    
        val transformationFunc = new SerTransformationFunc()
        val irisTransformationUDF = udf(transformationFunc.rfFunc)
      
        val dfFitted = df.withColumn(
          "label", irisTransformationUDF(
            col("petal_length"), col("petal_width"), col("sepal_length"), col("sepal_width"))
        )
        println(dfFitted.show(150))
    
    
    오---!!
    CLASS와 LABEL은 대체로 일치합니다!!다 된 것 같아!!
    요즘은 스칼라를 만나기가 어려워서 생각하면서 하고 있어요.

    좋은 웹페이지 즐겨찾기