scikit-learn, Spark.ml, TensorFlow에서 선형 회귀 ~ (3) spark.ml

15192 단어 스파크기계 학습
Spark의 기계 학습 라이브러리에는 RDD 기반 API의 mllib와 DataFrame 기반 API의 ml가 있습니다.
scikit-learn의 선형 회귀는 매우 간단했지만 Spark는 모두 DataFrame을 통해 조금 번거로웠습니다.

3.1 학습 데이터 준비



Spark의 데이터 프레임은 spark의 라이브러리 함수로 csv 파일을 읽으면 만들 수 있습니다. 여기는 간단하다고 말하면 간단합니다.

spLR.py
# read data                                                                          
dataFile = 'sampleLR.csv'
input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)

데이터 로딩에 대해서는 이하의 것도 알았습니다.
  • option("inferSchema", "true") 는 데이터가 숫자이면 숫자로 변환하여 읽습니다. 디폴트는 false 이므로, 이 옵션 없음이면 수치는 캐릭터 라인 데이터가 됩니다.
  • tsv를 로드하는 경우 구분 기호 옵션을 추가합니다. spark.read.format("com.databricks.spark.csv").option("delimiter", "\t").option("inferSchema", "true").load(dataFile)
  • option("header", "true") 를 사용하면 첫 번째 행을 데이터의 열 이름으로 인식합니다. 옵션 지정이 없는 경우는 디폴트의 컬럼명 '_c0', '_c1' ... 가 설정됩니다.

  • spark.ml 의 학습 데이터는 x1, x2 ... 를 벡터화한 features 라는 열과 y 에 대한 label 이라는 열이 있는 데이터 프레임이어야 합니다 (컬럼 이름은 모델을 만들 때 선택적으로 지정 가능). 열을 벡터화하려면 VectorAssembler를 사용하면 간단합니다.

    spLR.py
    # カラム名を設定する
    data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'label')
    
    # x をベクトル化して、featuresカラムにする                                                                    
    from pyspark.ml.feature import VectorAssembler
    vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:1], outputCol="features")
    featurized_DF = vecAssembler.transform(data_DF)
    

    3.2 모델 만들기



    Spark.ml의 선형 회귀 라이브러리는 GeneralizedLinearRegression입니다. features, label 의 컬럼명을 가지는 데이터 프레임을 모델에 건네주어 fit() 를 실시하면(자) 선형 회귀가 행해집니다. 계산 결과의 a1, a2 ... 는 model.coefficients, b 는 model.intercept 로 참조할 수 있습니다.

    spLR.py
    # build the model                                                                    
    from pyspark.ml.regression import GeneralizedLinearRegression
    glr = GeneralizedLinearRegression()
    
    # training                                                                           
    model = glr.fit(featurized_DF)
    
    # model parameters
    a = model.coefficients[0]
    b = model.intercept
    

    3.3 모델을 이용한 예측



    fit() 으로 작성한 model 에 transform() 를 실시하면 예측 데이터가 돌아옵니다. transform() 가 돌려주는 것은 prediction 칼럼이 추가된 데이터 프레임으로, 예측치는 여기에 담겨져 있습니다.
  • spark 데이터 프레임을 나열하려면 .collect()를 사용합니다.
  • # prediction                                                                         
    results = model.transform(featurized_DF)
    elms = results.collect()
    x0 = [float(elm.features[0]) for elm in elms]
    y0 = [float(elm.label) for elm in elms]
    p0 = [float(elm.prediction) for elm in elms]
    x = np.array(x0)
    y = np.array(y0)
    p = np.array(p0)
    
    # output                                                                             
    from plotLR import plotLR
    title = 'predLR_with_Spark'
    plotLR(title, x, y, p, 0.4, 0.8, a, b)
    

    plotLR은 scikit-learn의 선형 회귀에서 사용한 것과 같습니다. Spark.ml의 출력은 다음과 같으며 a, b는 모두 scikit-learn과 동일합니다.


    3.4 spark-submit



    spark python API를 사용하려면 일반 python 대신 pyspark를 사용하십시오. 그러나 ver 2.1 현재 pyspark는 대화식으로 사용할 수 있지만 명령 형식으로 실행할 수는 없습니다.
    # pyspark は成功
    % pyspark
    Python 3.5.2 (default, Dec 15 2016, 00:35:09)
    ...
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
          /_/
    
    Using Python version 3.5.2 (default, Dec 15 2016 00:35:09)
    SparkSession available as 'spark'.
    >>> 
    
    # pyspark x.py は失敗
    pyspark spLR.py
    Running python applications through 'pyspark' is not supported as of Spark 2.0.
    Use ./bin/spark-submit <python file>
    

    명령 형식으로 실행하고 싶은 경우는 위의 메세지대로 spark-submit 를 사용합니다만, 이 때, pyspark로 디폴트로 존재하는 spark 와 sc 객체를 스스로 만들어 둘 필요가 있습니다. spark-submit에서 작동하는 코드는 다음과 같습니다.
    plotLR.py는 scikit-learn 편에 코드가 있습니다.

    spLR.py
    import numpy as np
    import pyspark
    
    # spark-submit で実行する場合は spark, sc を生成する                                 
    on_spark_submit = 1
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    if on_spark_submit:
        master = 'local'
        appName = 'Linear Regeession'
        sc = SparkContext(master, appName)
        spark = SparkSession.builder.master(master).appName(appName).config("spark.some.config.option", "some-value").getOrCreate()
    
    # read data                                                                          
    dataFile = 'sampleLR.csv'
    input_DF = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(dataFile)
    data_DF = input_DF.withColumnRenamed('_c0', 'x').withColumnRenamed('_c1', 'label')
    
    # transform data                                                                     
    from pyspark.ml.feature import VectorAssembler
    vecAssembler = VectorAssembler(inputCols=data_DF.columns[0:1], outputCol="features")
    featurized_DF = vecAssembler.transform(data_DF)
    
    # build the model                                                                    
    from pyspark.ml.regression import GeneralizedLinearRegression
    glr = GeneralizedLinearRegression()
    
    # training                                                                           
    model = glr.fit(featurized_DF)
    
    # model parametersa                                                                  
    a = model.coefficients[0]
    b = model.intercept
    
    # prediction                                                                         
    results = model.transform(featurized_DF)
    elms = results.collect()
    x0 = [float(elm.features[0]) for elm in elms]
    y0 = [float(elm.label) for elm in elms]
    p0 = [float(elm.prediction) for elm in elms]
    x = np.array(x0)
    y = np.array(y0)
    p = np.array(p0)
    
    # output                                                                             
    from plotLR import plotLR
    title = 'predLR_with_Spark'
    plotLR(title, x, y, p, 0.4, 0.8, a, b)
    

    좋은 웹페이지 즐겨찾기