Spark 0.9.0 머 신 러 닝 팩 MLlib - Classification 코드 읽 기

43990 단어 spark
이 장 에 서 는 주로 MLlib 가방 안의 분류 알고리즘 실현 을 다 루 고 있 으 며, 현재 구현 되 고 있 는 것 은 LogisticRegression, SVM, NaiveBayes 이다. ,앞의 두 가지 알고리즘 은 각자 의 목표 최적화 함수 와 정규 항목 에 대해 Optimization 모듈 에서 의 랜 덤 경사도 최적화 를 호출 했 습 니 다. 병행 실현 하 는 전략 은 주로 랜 덤 경사도 의 계산 에 있 습 니 다. 베 이 루스 의 병행 전략 은 주로 유형의 선험 확률 과 특징 을 계산 하 는 항목 확률 에 있 습 니 다. 상세 한 상황 은 다음 과 같 습 니 다.
LogisticRegression. scala 파일
제1 부분 물류 회귀 모델 종류
 1 /**

 2 

 3  * Classification model trained using Logistic Regression.  4 

 5  *  6 

 7  * @param weights Weights computed for every feature.  8 

 9  * @param intercept Intercept computed for this model. 10 

11  */

12 

13 class LogisticRegressionModel( 14 

15  override val weights: Array[Double], 16 

17  override val intercept: Double) 18 

19   extends GeneralizedLinearModel(weights, intercept) 20 

21  with ClassificationModel with Serializable { 22 

23  override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, 24 

25       intercept: Double) = { 26 

27     val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept 28 

29     round(1.0/ (1.0 + math.exp(margin * -1))) 30 

31  } 32 

33 }

       논리 회귀 의 predictPoint 함수, 함수 입력: 예측 할 데이터 샘플, 회귀 계수 weights, intercept 거리 항목, 논리 회귀 의 판별 함수 f = 1 / (1 + exp (- wx) 로 인해 코드 에서 margin = - wx, 마지막 으로 1 / (1 + exp (- wx) 값 의 반올림, 즉 예측 라벨 을 되 돌려 줍 니 다.
제2 부분 LogisticRegressionWithSGD 종류
 1 class LogisticRegressionWithSGD private (  2 

 3  var stepSize: Double,  4 

 5  var numIterations: Int,  6 

 7  var regParam: Double,  8 

 9  var miniBatchFraction: Double) 10 

11   extends GeneralizedLinearAlgorithm[LogisticRegressionModel] 12 

13  with Serializable { 14 

15   val gradient = new LogisticGradient() 16 

17   val updater = new SimpleUpdater() 18 

19   override val optimizer = new GradientDescent(gradient, updater) 20 

21  .setStepSize(stepSize) 22 

23  .setNumIterations(numIterations) 24 

25  .setRegParam(regParam) 26 

27  .setMiniBatchFraction(miniBatchFraction) 28 

29   override val validators = List(DataValidators.classificationLabels) 30 

31   /**

32 

33  * Construct a LogisticRegression object with default parameters 34 

35    */

36 

37   def this() = this(1.0, 100, 0.0, 1.0) 38 

39   def createModel(weights: Array[Double], intercept: Double) = { 40 

41     new LogisticRegressionModel(weights, intercept) 42 

43  } 44 

45 }

       소스 코드 는 먼저 gradient, updater 인 스 턴 스 (optimization 파일 아래) 를 정 의 했 습 니 다. 그 중에서 손실 함 수 는 log - loss 를 사 용 했 고 정규 항목 인 자 를 사용 하지 않 았 습 니 다. 이어서 optimizer 를 다시 씁 니 다. 연산 자 를 최적화 하고 마지막 으로 이러한 구성원 변수 stepSize, numIterations, regParam, miniBatchFraction 에 기본 값 을 설정 합 니 다.
 
제3 부분 
LogisticRegression WithSGD 상부 인터페이스
 1 object LogisticRegressionWithSGD {  2 

 3  def train(  4 

 5  input: RDD[LabeledPoint],  6 

 7  numIterations: Int,  8 

 9  stepSize: Double,  10 

 11  miniBatchFraction: Double,  12 

 13  initialWeights: Array[Double])  14 

 15     : LogisticRegressionModel =

 16 

 17  {  18 

 19     new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(  20 

 21  input, initialWeights)  22 

 23  }  24 

 25  def train(  26 

 27  input: RDD[LabeledPoint],  28 

 29  numIterations: Int,  30 

 31  stepSize: Double,  32 

 33  miniBatchFraction: Double)  34 

 35     : LogisticRegressionModel =

 36 

 37  {  38 

 39     new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(  40 

 41  input)  42 

 43  }  44 

 45  def train(  46 

 47  input: RDD[LabeledPoint],  48 

 49  numIterations: Int,  50 

 51  stepSize: Double)  52 

 53     : LogisticRegressionModel =

 54 

 55  {  56 

 57     train(input, numIterations, stepSize, 1.0)  58 

 59  }  60 



 61  def train(  62 

 63  input: RDD[LabeledPoint],  64 

 65  numIterations: Int)  66 

 67     : LogisticRegressionModel =

 68 

 69  {  70 

 71     train(input, numIterations, 1.0, 1.0)  72 

 73  }  74 

 75  def main(args: Array[String]) {  76 

 77     if (args.length != 4) {  78 

 79       println("Usage: LogisticRegression <master> <input_dir> <step_size> " +

 80 

 81         "<niters>")  82 

 83       System.exit(1)  84 

 85  }  86 

 87     val sc = new SparkContext(args(0), "LogisticRegression")  88 

 89     val data = MLUtils.loadLabeledData(sc, args(1))  90 

 91     val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)  92 

 93     println("Weights: " + model.weights.mkString("[", ", ", "]"))  94 

 95     println("Intercept: " + model.intercept)  96 

 97  sc.stop()  98 

 99  } 100 

101 }

     코드 에 서 는 입력 에 따라 4 가지 train 방식 을 정 의 했 습 니 다. main 함수 에 서 는 MLUtils. loadLabeledData (sc, args (1) 를 사 용 했 습 니 다. 이 함 수 는 파일 을 < 태그 >, < 특징 1 >, < 특징 2 >... 정 의 된 RDD [LabeledPoint] 형식 으로 입력 했 습 니 다.이 어 LR 을 호출 하여 훈련 을 하고 마지막 으로 회귀 계수 와 거리 항목 을 인쇄 한다.
 
SVM. scala 파일
제1 부분 SVMModel 종류
 1 class SVMModel(  2 

 3  override val weights: Array[Double],  4 

 5  override val intercept: Double)  6 

 7   extends GeneralizedLinearModel(weights, intercept)  8 

 9  with ClassificationModel with Serializable { 10 

11  

12 

13  override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, 14 

15       intercept: Double) = { 16 

17     val margin = dataMatrix.dot(weightMatrix) + intercept 18 

19     if (margin < 0) 0.0 else 1.0

20 

21  } 22 

23 }

 
LR 과 유사 하지만 이 안의 margin 은 WX + b 형식 으로 바 뀌 었 습 니 다.
 
제2 부분 SVMWithSGD  종류
 1 class SVMWithSGD private (  2 

 3  var stepSize: Double,  4 

 5  var numIterations: Int,  6 

 7  var regParam: Double,  8 

 9  var miniBatchFraction: Double) 10 

11   extends GeneralizedLinearAlgorithm[SVMModel] with Serializable { 12 

13  

14 

15   val gradient = new HingeGradient() 16 

17   val updater = new SquaredL2Updater() 18 

19   override val optimizer = new GradientDescent(gradient, updater) 20 

21  .setStepSize(stepSize) 22 

23  .setNumIterations(numIterations) 24 

25  .setRegParam(regParam) 26 

27  .setMiniBatchFraction(miniBatchFraction) 28 

29   override val validators = List(DataValidators.classificationLabels) 30 

31   def this() = this(1.0, 100, 1.0, 1.0) 32 

33   def createModel(weights: Array[Double], intercept: Double) = { 34 

35     new SVMModel(weights, intercept) 36 

37  } 38 

39 }

LR 과 유사 하 다 hinge - loss 에 대한 경사도 로 바 뀌 었 고, updater 는 L2 정규 로 바 뀌 었 다. 
 
제3 부분 
SVMWithSGD   
상부 인터페이스
 1 object SVMWithSGD {  2 

 3  def train(  4 

 5  input: RDD[LabeledPoint],  6 

 7  numIterations: Int,  8 

 9  stepSize: Double,  10 

 11  regParam: Double,  12 

 13  miniBatchFraction: Double,  14 

 15  initialWeights: Array[Double])  16 

 17     : SVMModel =

 18 

 19  {  20 

 21     new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,  22 

 23  initialWeights)  24 

 25  }  26 

 27  def train(  28 

 29  input: RDD[LabeledPoint],  30 

 31  numIterations: Int,  32 

 33  stepSize: Double,  34 

 35  regParam: Double,  36 

 37  miniBatchFraction: Double)  38 

 39     : SVMModel =

 40 

 41  {  42 

 43     new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)  44 

 45  }  46 

 47  

 48 

 49  def train(  50 

 51  input: RDD[LabeledPoint],  52 

 53  numIterations: Int,  54 

 55  stepSize: Double,  56 

 57  regParam: Double)  58 

 59     : SVMModel =

 60 

 61  {  62 

 63     train(input, numIterations, stepSize, regParam, 1.0)  64 

 65  }  66 

 67  

 68 

 69  def train(  70 

 71  input: RDD[LabeledPoint],  72 

 73  numIterations: Int)  74 

 75     : SVMModel =

 76 

 77  {  78 

 79     train(input, numIterations, 1.0, 1.0, 1.0)  80 

 81  }  82 

 83  

 84 

 85  def main(args: Array[String]) {  86 

 87     if (args.length != 5) {  88 

 89       println("Usage: SVM <master> <input_dir> <step_size> <regularization_parameter> <niters>")  90 

 91       System.exit(1)  92 

 93  }  94 

 95     val sc = new SparkContext(args(0), "SVM")  96 

 97     val data = MLUtils.loadLabeledData(sc, args(1))  98 

 99     val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) 100 

101     println("Weights: " + model.weights.mkString("[", ", ", "]")) 102 

103     println("Intercept: " + model.intercept) 104 

105  

106 

107  sc.stop() 108 

109  } 110 

111 }

 
LR 이랑 비슷 해 요.
 
NaiveBayes. scala 파일
제1 부분 NaiveBayesModel 종류
 1 class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])  2 

 3   extends ClassificationModel with Serializable {  4 

 5  

 6 

 7   // Create a column vector that can be used for predictions

 8 

 9   private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) 10 

11   private val _theta = new DoubleMatrix(theta) 12 

13  

14 

15   def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) 16 

17  

18 

19   def predict(testData: Array[Double]): Double = { 20 

21     val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) 22 

23     val result = _pi.add(_theta.mmul(dataMatrix)) 24 

25  result.argmax() 26 

27  } 28 

29 }

        소박 한 베 이 루스 분류 기, NaiveBayes Model 의 입력 은 훈련 후 얻 은 태그 유형 선험 확률 pi (P (y = 0), P (y = 1),..., P (y = K), 특징 속성 이 지 정 된 유형 에서 나타 나 는 조건 확률 theta (P (x = 1 / y), 특징 이 TF - IDF 형식 으로 바 뀌 면 텍스트 분류 에 사용 할 수 있 으 며 특징 이 0 - 1 인 코딩 으로 바 뀔 때베 르 누 리 모델 을 바탕 으로 분류 할 수 있 습 니 다. 첫 번 째 predict 함수 의 입력 은 테스트 데이터 세트 이 고 두 번 째 predict 함수 의 입력 은 하나의 테스트 샘플 입 니 다.원래 의 베 이 루스 정 리 는 P (y | x) ~ P (x | y) P (y) 에 따라 이 루어 졌 습 니 다. 여기 서 이 루어 졌 을 때 양쪽 에 대 수 를 취 했 습 니 다. 덧셈 의 계산 효율 은 곱셈 보다 높 았 습 니 다. 마지막 으로 result. argmax (), 즉 후 험 확률 이 가장 높 은 유형 으로 되 돌 아 왔 습 니 다.
 
제2 부분 
NaiveBayes  
종류
 1 class NaiveBayes private (var lambda: Double)  2 

 3   extends Serializable with Logging  4 

 5 {  6 

 7   def this() = this(1.0)  8 

 9   /** Set the smoothing parameter. Default: 1.0. */

10 

11   def setLambda(lambda: Double): NaiveBayes = { 12 

13     this.lambda = lambda 14 

15     this

16 

17  } 18 

19  

20 

21   def run(data: RDD[LabeledPoint]) = { 22 

23     val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] 24 

25     val aggregated = data.aggregate(zeroCombiner)({(combiner, point) =>

26 

27  point match { 28 

29         case LabeledPoint(label, features) =>

30 

31           val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) 32 

33           val fs = new DoubleMatrix(features.length, 1, features: _*) 34 

35           combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) 36 

37  } 38 

39     }, { (lhs, rhs) =>

40 

41       for ((label, (c, fs)) <- rhs) { 42 

43         val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) 44 

45         lhs(label) = (count + c, featuresSum.addi(fs)) 46 

47  } 48 

49  lhs 50 

51  }) 52 

53     // Kinds of label

54 

55     val C = aggregated.size 56 

57     // Total sample count

58 

59     val N = aggregated.values.map(_._1).sum 60 

61  

62 

63     val pi = new Array[Double](C) 64 

65     val theta = new Array[Array[Double]](C) 66 

67     val piLogDenom = math.log(N + C * lambda) 68 

69  

70 

71     for ((label, (count, fs)) <- aggregated) { 72 

73       val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) 74 

75       pi(label) = math.log(count + lambda) - piLogDenom 76 

77       theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) 78 

79  } 80 

81     new NaiveBayesModel(pi, theta) 82 

83  } 84 

85 }

      이 클래스 는 베 이 루스 알고리즘 을 실현 하 는 것 입 니 다. lambda 인 자 는 P (X | Y) = 0 의 어색 함 을 피 하 는 데 사 용 됩 니 다.DoubleMatrix 는 각 특징 이 이 유형 에서 의 조건 확률 을 나타 낸다.
 
제3 부분 
NaiveBayes  
호출 인터페이스
 1 object NaiveBayes {  2 

 3   def train(input: RDD[LabeledPoint]): NaiveBayesModel = {  4 

 5     new NaiveBayes().run(input)  6 

 7  }  8 

 9   def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { 10 

11     new NaiveBayes(lambda).run(input) 12 

13  } 14 

15  

16 

17  def main(args: Array[String]) { 18 

19     if (args.length != 2 && args.length != 3) { 20 

21       println("Usage: NaiveBayes <master> <input_dir> [<lambda>]") 22 

23       System.exit(1) 24 

25  } 26 

27     val sc = new SparkContext(args(0), "NaiveBayes") 28 

29     val data = MLUtils.loadLabeledData(sc, args(1)) 30 

31     val model = if (args.length == 2) { 32 

33  NaiveBayes.train(data) 34 

35     } else { 36 

37       NaiveBayes.train(data, args(2).toDouble) 38 

39  } 40 

41     println("Pi: " + model.pi.mkString("[", ", ", "]")) 42 

43     println("Theta:
" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "
", "]")) 44 45 46 47 sc.stop() 48 49 } 50 51 }

       베 이 루스 훈련 방식 은 lambda 파라미터 가 있 는 지 없 는 지 로 나 뉜 다. main 함 수 는 먼저 Spark Context 를 정의 한 다음 에 데이터 세트 를 RDD [LabelPoint] 유형 으로 바 꾸 고 훈련 을 통 해 pi 와 theta 를 인쇄 한 다음 에 이 알고리즘 은 Intel 에서 일 하 는 것 이다. 웨 이 보 는 영혼 기계 대신 이 쓴 것 으로 그의 github 사 이 트 를 따라 갈 수 있다.https://github.com/soulmachine
 
 
 

좋은 웹페이지 즐겨찾기