Spark 0.9.0 머 신 러 닝 팩 MLlib - Classification 코드 읽 기
43990 단어 spark
LogisticRegression. scala 파일
제1 부분 물류 회귀 모델 종류
1 /**
2
3 * Classification model trained using Logistic Regression. 4
5 * 6
7 * @param weights Weights computed for every feature. 8
9 * @param intercept Intercept computed for this model. 10
11 */
12
13 class LogisticRegressionModel( 14
15 override val weights: Array[Double], 16
17 override val intercept: Double) 18
19 extends GeneralizedLinearModel(weights, intercept) 20
21 with ClassificationModel with Serializable { 22
23 override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, 24
25 intercept: Double) = { 26
27 val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept 28
29 round(1.0/ (1.0 + math.exp(margin * -1))) 30
31 } 32
33 }
논리 회귀 의 predictPoint 함수, 함수 입력: 예측 할 데이터 샘플, 회귀 계수 weights, intercept 거리 항목, 논리 회귀 의 판별 함수 f = 1 / (1 + exp (- wx) 로 인해 코드 에서 margin = - wx, 마지막 으로 1 / (1 + exp (- wx) 값 의 반올림, 즉 예측 라벨 을 되 돌려 줍 니 다.
제2 부분 LogisticRegressionWithSGD 종류
1 class LogisticRegressionWithSGD private ( 2
3 var stepSize: Double, 4
5 var numIterations: Int, 6
7 var regParam: Double, 8
9 var miniBatchFraction: Double) 10
11 extends GeneralizedLinearAlgorithm[LogisticRegressionModel] 12
13 with Serializable { 14
15 val gradient = new LogisticGradient() 16
17 val updater = new SimpleUpdater() 18
19 override val optimizer = new GradientDescent(gradient, updater) 20
21 .setStepSize(stepSize) 22
23 .setNumIterations(numIterations) 24
25 .setRegParam(regParam) 26
27 .setMiniBatchFraction(miniBatchFraction) 28
29 override val validators = List(DataValidators.classificationLabels) 30
31 /**
32
33 * Construct a LogisticRegression object with default parameters 34
35 */
36
37 def this() = this(1.0, 100, 0.0, 1.0) 38
39 def createModel(weights: Array[Double], intercept: Double) = { 40
41 new LogisticRegressionModel(weights, intercept) 42
43 } 44
45 }
소스 코드 는 먼저 gradient, updater 인 스 턴 스 (optimization 파일 아래) 를 정 의 했 습 니 다. 그 중에서 손실 함 수 는 log - loss 를 사 용 했 고 정규 항목 인 자 를 사용 하지 않 았 습 니 다. 이어서 optimizer 를 다시 씁 니 다. 연산 자 를 최적화 하고 마지막 으로 이러한 구성원 변수 stepSize, numIterations, regParam, miniBatchFraction 에 기본 값 을 설정 합 니 다.
제3 부분
LogisticRegression WithSGD 상부 인터페이스
1 object LogisticRegressionWithSGD { 2
3 def train( 4
5 input: RDD[LabeledPoint], 6
7 numIterations: Int, 8
9 stepSize: Double, 10
11 miniBatchFraction: Double, 12
13 initialWeights: Array[Double]) 14
15 : LogisticRegressionModel =
16
17 { 18
19 new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run( 20
21 input, initialWeights) 22
23 } 24
25 def train( 26
27 input: RDD[LabeledPoint], 28
29 numIterations: Int, 30
31 stepSize: Double, 32
33 miniBatchFraction: Double) 34
35 : LogisticRegressionModel =
36
37 { 38
39 new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run( 40
41 input) 42
43 } 44
45 def train( 46
47 input: RDD[LabeledPoint], 48
49 numIterations: Int, 50
51 stepSize: Double) 52
53 : LogisticRegressionModel =
54
55 { 56
57 train(input, numIterations, stepSize, 1.0) 58
59 } 60
61 def train( 62
63 input: RDD[LabeledPoint], 64
65 numIterations: Int) 66
67 : LogisticRegressionModel =
68
69 { 70
71 train(input, numIterations, 1.0, 1.0) 72
73 } 74
75 def main(args: Array[String]) { 76
77 if (args.length != 4) { 78
79 println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
80
81 "<niters>") 82
83 System.exit(1) 84
85 } 86
87 val sc = new SparkContext(args(0), "LogisticRegression") 88
89 val data = MLUtils.loadLabeledData(sc, args(1)) 90
91 val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble) 92
93 println("Weights: " + model.weights.mkString("[", ", ", "]")) 94
95 println("Intercept: " + model.intercept) 96
97 sc.stop() 98
99 } 100
101 }
코드 에 서 는 입력 에 따라 4 가지 train 방식 을 정 의 했 습 니 다. main 함수 에 서 는 MLUtils. loadLabeledData (sc, args (1) 를 사 용 했 습 니 다. 이 함 수 는 파일 을 < 태그 >, < 특징 1 >, < 특징 2 >... 정 의 된 RDD [LabeledPoint] 형식 으로 입력 했 습 니 다.이 어 LR 을 호출 하여 훈련 을 하고 마지막 으로 회귀 계수 와 거리 항목 을 인쇄 한다.
SVM. scala 파일
제1 부분 SVMModel 종류
1 class SVMModel( 2
3 override val weights: Array[Double], 4
5 override val intercept: Double) 6
7 extends GeneralizedLinearModel(weights, intercept) 8
9 with ClassificationModel with Serializable { 10
11
12
13 override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix, 14
15 intercept: Double) = { 16
17 val margin = dataMatrix.dot(weightMatrix) + intercept 18
19 if (margin < 0) 0.0 else 1.0
20
21 } 22
23 }
LR 과 유사 하지만 이 안의 margin 은 WX + b 형식 으로 바 뀌 었 습 니 다.
제2 부분 SVMWithSGD 종류
1 class SVMWithSGD private ( 2
3 var stepSize: Double, 4
5 var numIterations: Int, 6
7 var regParam: Double, 8
9 var miniBatchFraction: Double) 10
11 extends GeneralizedLinearAlgorithm[SVMModel] with Serializable { 12
13
14
15 val gradient = new HingeGradient() 16
17 val updater = new SquaredL2Updater() 18
19 override val optimizer = new GradientDescent(gradient, updater) 20
21 .setStepSize(stepSize) 22
23 .setNumIterations(numIterations) 24
25 .setRegParam(regParam) 26
27 .setMiniBatchFraction(miniBatchFraction) 28
29 override val validators = List(DataValidators.classificationLabels) 30
31 def this() = this(1.0, 100, 1.0, 1.0) 32
33 def createModel(weights: Array[Double], intercept: Double) = { 34
35 new SVMModel(weights, intercept) 36
37 } 38
39 }
LR 과 유사 하 다 hinge - loss 에 대한 경사도 로 바 뀌 었 고, updater 는 L2 정규 로 바 뀌 었 다.
제3 부분
SVMWithSGD
상부 인터페이스
1 object SVMWithSGD { 2
3 def train( 4
5 input: RDD[LabeledPoint], 6
7 numIterations: Int, 8
9 stepSize: Double, 10
11 regParam: Double, 12
13 miniBatchFraction: Double, 14
15 initialWeights: Array[Double]) 16
17 : SVMModel =
18
19 { 20
21 new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input, 22
23 initialWeights) 24
25 } 26
27 def train( 28
29 input: RDD[LabeledPoint], 30
31 numIterations: Int, 32
33 stepSize: Double, 34
35 regParam: Double, 36
37 miniBatchFraction: Double) 38
39 : SVMModel =
40
41 { 42
43 new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input) 44
45 } 46
47
48
49 def train( 50
51 input: RDD[LabeledPoint], 52
53 numIterations: Int, 54
55 stepSize: Double, 56
57 regParam: Double) 58
59 : SVMModel =
60
61 { 62
63 train(input, numIterations, stepSize, regParam, 1.0) 64
65 } 66
67
68
69 def train( 70
71 input: RDD[LabeledPoint], 72
73 numIterations: Int) 74
75 : SVMModel =
76
77 { 78
79 train(input, numIterations, 1.0, 1.0, 1.0) 80
81 } 82
83
84
85 def main(args: Array[String]) { 86
87 if (args.length != 5) { 88
89 println("Usage: SVM <master> <input_dir> <step_size> <regularization_parameter> <niters>") 90
91 System.exit(1) 92
93 } 94
95 val sc = new SparkContext(args(0), "SVM") 96
97 val data = MLUtils.loadLabeledData(sc, args(1)) 98
99 val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) 100
101 println("Weights: " + model.weights.mkString("[", ", ", "]")) 102
103 println("Intercept: " + model.intercept) 104
105
106
107 sc.stop() 108
109 } 110
111 }
LR 이랑 비슷 해 요.
NaiveBayes. scala 파일
제1 부분 NaiveBayesModel 종류
1 class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]]) 2
3 extends ClassificationModel with Serializable { 4
5
6
7 // Create a column vector that can be used for predictions
8
9 private val _pi = new DoubleMatrix(pi.length, 1, pi: _*) 10
11 private val _theta = new DoubleMatrix(theta) 12
13
14
15 def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict) 16
17
18
19 def predict(testData: Array[Double]): Double = { 20
21 val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*) 22
23 val result = _pi.add(_theta.mmul(dataMatrix)) 24
25 result.argmax() 26
27 } 28
29 }
소박 한 베 이 루스 분류 기, NaiveBayes Model 의 입력 은 훈련 후 얻 은 태그 유형 선험 확률 pi (P (y = 0), P (y = 1),..., P (y = K), 특징 속성 이 지 정 된 유형 에서 나타 나 는 조건 확률 theta (P (x = 1 / y), 특징 이 TF - IDF 형식 으로 바 뀌 면 텍스트 분류 에 사용 할 수 있 으 며 특징 이 0 - 1 인 코딩 으로 바 뀔 때베 르 누 리 모델 을 바탕 으로 분류 할 수 있 습 니 다. 첫 번 째 predict 함수 의 입력 은 테스트 데이터 세트 이 고 두 번 째 predict 함수 의 입력 은 하나의 테스트 샘플 입 니 다.원래 의 베 이 루스 정 리 는 P (y | x) ~ P (x | y) P (y) 에 따라 이 루어 졌 습 니 다. 여기 서 이 루어 졌 을 때 양쪽 에 대 수 를 취 했 습 니 다. 덧셈 의 계산 효율 은 곱셈 보다 높 았 습 니 다. 마지막 으로 result. argmax (), 즉 후 험 확률 이 가장 높 은 유형 으로 되 돌 아 왔 습 니 다.
제2 부분
NaiveBayes
종류
1 class NaiveBayes private (var lambda: Double) 2
3 extends Serializable with Logging 4
5 { 6
7 def this() = this(1.0) 8
9 /** Set the smoothing parameter. Default: 1.0. */
10
11 def setLambda(lambda: Double): NaiveBayes = { 12
13 this.lambda = lambda 14
15 this
16
17 } 18
19
20
21 def run(data: RDD[LabeledPoint]) = { 22
23 val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)] 24
25 val aggregated = data.aggregate(zeroCombiner)({(combiner, point) =>
26
27 point match { 28
29 case LabeledPoint(label, features) =>
30
31 val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1))) 32
33 val fs = new DoubleMatrix(features.length, 1, features: _*) 34
35 combiner += label.toInt -> (count + 1, featuresSum.addi(fs)) 36
37 } 38
39 }, { (lhs, rhs) =>
40
41 for ((label, (c, fs)) <- rhs) { 42
43 val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1))) 44
45 lhs(label) = (count + c, featuresSum.addi(fs)) 46
47 } 48
49 lhs 50
51 }) 52
53 // Kinds of label
54
55 val C = aggregated.size 56
57 // Total sample count
58
59 val N = aggregated.values.map(_._1).sum 60
61
62
63 val pi = new Array[Double](C) 64
65 val theta = new Array[Array[Double]](C) 66
67 val piLogDenom = math.log(N + C * lambda) 68
69
70
71 for ((label, (count, fs)) <- aggregated) { 72
73 val thetaLogDenom = math.log(fs.sum() + fs.length * lambda) 74
75 pi(label) = math.log(count + lambda) - piLogDenom 76
77 theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom) 78
79 } 80
81 new NaiveBayesModel(pi, theta) 82
83 } 84
85 }
이 클래스 는 베 이 루스 알고리즘 을 실현 하 는 것 입 니 다. lambda 인 자 는 P (X | Y) = 0 의 어색 함 을 피 하 는 데 사 용 됩 니 다.DoubleMatrix 는 각 특징 이 이 유형 에서 의 조건 확률 을 나타 낸다.
제3 부분
NaiveBayes
호출 인터페이스
1 object NaiveBayes { 2
3 def train(input: RDD[LabeledPoint]): NaiveBayesModel = { 4
5 new NaiveBayes().run(input) 6
7 } 8
9 def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = { 10
11 new NaiveBayes(lambda).run(input) 12
13 } 14
15
16
17 def main(args: Array[String]) { 18
19 if (args.length != 2 && args.length != 3) { 20
21 println("Usage: NaiveBayes <master> <input_dir> [<lambda>]") 22
23 System.exit(1) 24
25 } 26
27 val sc = new SparkContext(args(0), "NaiveBayes") 28
29 val data = MLUtils.loadLabeledData(sc, args(1)) 30
31 val model = if (args.length == 2) { 32
33 NaiveBayes.train(data) 34
35 } else { 36
37 NaiveBayes.train(data, args(2).toDouble) 38
39 } 40
41 println("Pi: " + model.pi.mkString("[", ", ", "]")) 42
43 println("Theta:
" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "
", "]")) 44
45
46
47 sc.stop() 48
49 } 50
51 }
베 이 루스 훈련 방식 은 lambda 파라미터 가 있 는 지 없 는 지 로 나 뉜 다. main 함 수 는 먼저 Spark Context 를 정의 한 다음 에 데이터 세트 를 RDD [LabelPoint] 유형 으로 바 꾸 고 훈련 을 통 해 pi 와 theta 를 인쇄 한 다음 에 이 알고리즘 은 Intel 에서 일 하 는 것 이다. 웨 이 보 는 영혼 기계 대신 이 쓴 것 으로 그의 github 사 이 트 를 따라 갈 수 있다.https://github.com/soulmachine
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 프로그래밍 기본 사항(Python 버전)참조 웹사이트: Hadoop 환경이 있어야 합니다. 내 다른 블로그를 읽을 수 있습니다. 2.Spark 환경 변수 파일 수정 spark env SH 파일(vi ./conf/spark-env.sh)을 편집하고 첫 번째...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.