spark 고급 데이터 분석 - 네트워크 트 래 픽 이상 검 측 (실전 업그레이드)

제 전편 에서 제 가 쓴 것 은 개인 적 으로 KMeans 가 이 프로젝트 에 분 류 된 부분 일 뿐 입 니 다. 오늘 은 오 랜 시간 동안 이 코드 를 작성 하고 완전한 운행 테스트 를 마 쳤 습 니 다. 편폭 이 매우 길 고 제 가 앞에서 쓴 것 과 제 가 완 선 된 이상 검 측 부분 을 결합 시 켰 습 니 다. 말 을 많이 하지 않 고 직접 코드 실전 입 니 다.
package internet

import org.apache.spark.mllib.clustering.{KMeansModel, KMeans}
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by     on 2016/7/24.
  */
object CheckAll {

  def main(args: Array[String]) {
    //      
    val conf = new SparkConf().setAppName("CheckAll").setMaster("local")
    val sc= new SparkContext(conf)
    val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data"
    val rawData = sc.textFile(HDFS_DATA_PATH)

    /** **/
//    clusteringTake1(rawData)
    /**   k  **/
//    clusteringTake2(rawData)
//    clusteringTake3(rawData)
//    clusteringTake4(rawData)
//    clusteringTake5(rawData)
    /** R      **/
    /**      **/
    var beg = System.currentTimeMillis()
    anomalies(rawData)
    var end = System.currentTimeMillis()
    println("" + (end - beg) / 1000 + "s")
  }



  //ClusteringTask1
  def clusteringTake1(rawData: RDD[String]) = {
    //    rawData.map(_.split(",").last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)


    val labelsAndData = rawData.map {
      line =>
        // csvbuffer        val buffer = line.split(",").toBuffer
        //     1         
        buffer.remove(1, 3)
        //          
        val label = buffer.remove(buffer.length - 1)
        //                (Double   )  
        val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
        //            
        (label, vector)
    }

    /**
      *      labelsAndData => data  ?
      * 1k               (              )
      * 2data  RDD                 
      * 32        RDD values    ,      ,    
      */
    //          
    val data = labelsAndData.values.cache()

    //   Kmeans   
    val kmeans = new KMeans()
    //  KMeansModel
    val model = kmeans.run(data)
    //        
    model.clusterCenters.foreach(println)

    val clusterLabelCount = labelsAndData.map {
      case (label, datum) =>
        //    datum   cluster
        val cluster = model.predict(datum)
        //    -    
        (cluster, label)
    }.countByValue()

    //  -    clusterLabelCount.toSeq.sorted.foreach {
      case ((cluster, label), count) =>
        println(f"$cluster%1s$label%18s$count%8s")
    }
    data.unpersist()
  }

  /**
    *       
    * a.toArray.zip(b.toArray)   "        "
    * map(p => p._1 - p._2)   " "
    * map(d => d*d).sum   "   "
    * math.sqrt()   "   "
    * @param a
    * @param b
    * @return
    */
  def distance(a: Vector, b: Vector) =
    math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum)

  /**
    *          model 
    * KMeansModel.predict      KMeans   findCloest  
    * @param datum
    * @param model
    * @return
    */
  def distToCenter(datum: Vector, model: KMeansModel) = {
    //    datum   cluster
    val cluster = model.predict(datum)
    //    
    val center = model.clusterCenters(cluster)
    //      
    distance(center, datum)
  }

  /**
    *       
    * @param data
    * @param k
    * @return
    */
  def clusteringScore(data: RDD[Vector], k: Int): Double = {
    val kmeans = new KMeans()
    //  k 
    kmeans.setK(k)
    //  KMeansModel
    val model = kmeans.run(data)
    //  k modelmean()     
    data.map(datum => distToCenter(datum, model)).mean()
  }

  /**
    *         
    * @param data
    * @param k
    * @param run       
    * @param epsilon    
    * @return
    */
  def clusteringScore2(data: RDD[Vector], k: Int, run: Int, epsilon: Double): Double = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    //  k     
    kmeans.setRuns(run)
    //    
    kmeans.setEpsilon(epsilon)
    val model = kmeans.run(data)
    data.map(datum => distToCenter(datum, model)).mean()
  }

  //ClusteringTake2
  def clusteringTake2(rawData: RDD[String]): Unit ={
    val data = rawData.map {
      line =>
        val buffer = line.split(",").toBuffer
        buffer.remove(1, 3)
        buffer.remove(buffer.length - 1)
        Vectors.dense(buffer.map(_.toDouble).toArray)
    }.cache()

    val run = 10
    val epsilon = 1.0e-4
    // (5,30)    5         k     
    (5 to 30 by 5).map(k => (k, clusteringScore(data, k))).foreach(println)
    // (20,120)    10         k     
    (30 to 100 by 10).par.map(k => (k, clusteringScore2(data, k, run, epsilon))).foreach(println)

    data.unpersist()
  }


  /**
    *    R       HDFS 
    * @param rawData
    * @param k
    * @param run
    * @param epsilon
    */
  def visualizationInR(rawData: RDD[String], k: Int, run: Int, epsilon: Double): Unit ={
    val data = rawData.map {
      line =>
        val buffer = line.split(",").toBuffer
        buffer.remove(1, 3)
        buffer.remove(buffer.length - 1)
        Vectors.dense(buffer.map(_.toDouble).toArray)
    }.cache()

    val kmeans = new KMeans()
    kmeans.setK(k)
    kmeans.setRuns(run)
    kmeans.setEpsilon(epsilon)
    val model = kmeans.run(data)

    val sample = data.map(
      datum =>
        model.predict(datum) + "," + datum.toArray.mkString(",")
    ).sample(false, 0.05)   //   5% 

    sample.saveAsTextFile("hdfs://nodel:9000/user/spark/R/sample")
    data.unpersist()
  }

  /**
    *
    * @param data
    * @return
    */
  def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = {
    //      Array
    val dataAsArray = data.map(_.toArray)
    //           
    val numCols = dataAsArray.first().length
    //          
    val n = dataAsArray.count()
    //            
    val sums = dataAsArray.reduce((a, b) => a.zip(b).map(t => t._1 + t._2))
    // RDD           
    val sumSquares = dataAsArray.aggregate(new Array[Double](numCols))(
      (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2),
      (a, b) => a.zip(b).map(t => t._1 + t._2)
    )

    /** zip                        pair      *              ,           。
      *                           
      */
    val stdevs = sumSquares.zip(sums).map {
      case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
    }
    val means = sums.map(_ / n)

    (datum : Vector) => {
      val normalizedArray = (datum.toArray, means, stdevs).zipped.map(
        (value, mean, stdev) =>
          if(stdev <= 0) (value- mean) else (value - mean) /stdev
      )
      Vectors.dense(normalizedArray)
    }
  }


  //clusteringTask3
  def clusteringTake3(rawData: RDD[String]): Unit ={
    val data = rawData.map { line =>
      val buffer = line.split(',').toBuffer
      buffer.remove(1, 3)
      buffer.remove(buffer.length - 1)
      Vectors.dense(buffer.map(_.toDouble).toArray)
    }

    val run = 10
    val epsilon = 1.0e-4

    val normalizedData = data.map(buildNormalizationFunction(data)).cache()

    (60 to 120 by 10).par.map(
      k => (k, clusteringScore2(normalizedData, k, run, epsilon))
    ).toList.foreach(println)

    normalizedData.unpersist()
  }

  /**
    *   one-hot             
    * @param rawData
    * @return
    */
  def buildCategoricalAndLabelFunction(rawData: RDD[String]): (String => (String, Vector))  = {
    val splitData = rawData.map(_.split(","))
    //      
    val protocols = splitData.map(_(1)).distinct().collect().zipWithIndex.toMap   //    100
    val services = splitData.map(_(2)).distinct().collect().zipWithIndex.toMap    //    010
    val tcpStates = splitData.map(_(3)).distinct().collect().zipWithIndex.toMap   //    001
    //
    (line: String) => {
      val buffer = line.split(",").toBuffer
      val protocol = buffer.remove(1)
      val service = buffer.remove(1)
      val tcpState = buffer.remove(1)
      val label = buffer.remove(buffer.length - 1)
      val vector = buffer.map(_.toDouble)

      val newProtocolFeatures = new Array[Double](protocols.size)
      newProtocolFeatures(protocols(protocol)) = 1.0
      val newServiceFeatures = new Array[Double](services.size)
      newServiceFeatures(services(service)) = 1.0
      val newTcpStateFeatures = new Array[Double](tcpStates.size)
      newTcpStateFeatures(tcpStates(tcpState)) = 1.0

      vector.insertAll(1, newTcpStateFeatures)
      vector.insertAll(1, newServiceFeatures)
      vector.insertAll(1, newProtocolFeatures)

      (label, Vectors.dense(vector.toArray))
    }
  }


  //ClusteringTask4
  def clusteringTake4(rawData: RDD[String]): Unit ={
    val paraseFunction = buildCategoricalAndLabelFunction(rawData)
    val data = rawData.map(paraseFunction).values
    val normalizedData = data.map(buildNormalizationFunction(data)).cache()

    val run = 10
    val epsilon = 1.0e-4

    (80 to 160 by 10).map(
      k=> (k, clusteringScore2(normalizedData, k, run, epsilon))
    ).toList.foreach(println)

    normalizedData.unpersist()
  }


  //Clustering, Task5
  /**
    *     * @param counts
    * @return
    */
  def entropy(counts: Iterable[Int]) = {
    val values = counts.filter(_ > 0)
    val n: Double = values.sum
    values.map {
      v =>
        val p = v / n
        -p * math.log(p)
    }.sum
  }

  /**
    *         
    * @param normalizedLabelsAndData
    * @param k
    * @param run
    * @param epsilon
    * @return
    */
  def clusteringScore3(normalizedLabelsAndData: RDD[(String, Vector)], k: Int, run: Int, epsilon: Double) = {
    val kmeans = new KMeans()
    kmeans.setK(k)
    kmeans.setRuns(run)
    kmeans.setEpsilon(epsilon)

    //  KMeansModel
    val model = kmeans.run(normalizedLabelsAndData.values)
    //           
    val labelAndClusters = normalizedLabelsAndData.mapValues(model.predict)
    // RDD[(String, Vector)]  => RDD[(String, Vector)], swap Keys / Values    val clustersAndLabels = labelAndClusters.map(_.swap)
    //        
    val labelsInCluster = clustersAndLabels.groupByKey().values
    //            (label)    val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size))
    //    val n = normalizedLabelsAndData.count()
    //             
    labelCounts.map(m => m.sum * entropy(m)).sum() / n
  }


  def clusteringTake5(rawData: RDD[String]): Unit ={
    val parseFunction = buildCategoricalAndLabelFunction(rawData)
    val labelAndData = rawData.map(parseFunction)
    val normalizedLabelsAndData = labelAndData.mapValues(buildNormalizationFunction(labelAndData.values)).cache()

    val run = 10
    val epsilon = 1.0e-4

    (80 to 160 by 10).map(
      k => (k, clusteringScore3(normalizedLabelsAndData, k, run, epsilon))
    ).toList.foreach(println)

    normalizedLabelsAndData.unpersist()
  }


  //Detect anomalies(    )
  def bulidAnomalyDetector(data: RDD[Vector], normalizeFunction: (Vector => Vector)): (Vector => Boolean) = {
    val normalizedData = data.map(normalizeFunction)
    normalizedData.cache()

    val kmeans = new KMeans()
    kmeans.setK(150)
    kmeans.setRuns(10)
    kmeans.setEpsilon(1.0e-6)
    val model = kmeans.run(normalizedData)

    normalizedData.unpersist()

    //               
    val distances = normalizedData.map(datum => distToCenter(datum, model))
    //                  100        
    val threshold = distances.top(100).last

    //    (datum: Vector) => distToCenter(normalizeFunction(datum), model) > threshold
  }

  /**
    *     
    * @param rawData
    */
  def anomalies(rawData: RDD[String]) = {
    val parseFunction = buildCategoricalAndLabelFunction(rawData)
    val originalAndData = rawData.map(line => (line, parseFunction(line)._2))
    val data = originalAndData.values
    val normalizeFunction = buildNormalizationFunction(data)
    val anomalyDetector = bulidAnomalyDetector(data, normalizeFunction)
    val anomalies = originalAndData.filter {
      case (original, datum) => anomalyDetector(datum)
    }.keys
    // 10        
    anomalies.take(10).foreach(println)
  }
}

좀 난잡 하 게 썼 지만, 전부 스스로 봉 했다.운행 해도 괜 찮 습 니 다. 여러분 이 참고 하여 공부 하 실 수 있 도록 제 가 쓴 주석 에 관심 을 가 져 주시 면 됩 니 다.
힘 들 어 죽 겠 어 요. 제 컴퓨터 가 안 돼 서 그런 지 1G 데 이 터 를 계산 하 는 데 이렇게 오래 걸 렸 어 요. 지금 제 가 이상 검 측 부분 운행 결 과 를 보 여 드릴 게 요.
16/07/24 22:48:18 INFO Executor: Running task 0.0 in stage 65.0 (TID 385) 16/07/24 22:48:18 INFO HadoopRDD: Input split: hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data:0+134217728 16/07/24 22:48:30 INFO Executor: Finished task 0.0 in stage 65.0 (TID 385). 3611 bytes result sent to driver 16/07/24 22:48:30 INFO TaskSetManager: Finished task 0.0 in stage 65.0 (TID 385) in 11049 ms on localhost (1/1) 9,tcp,telnet,SF,307,2374,0,0,1,0,0,1,0,1,0,1,3,1,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,69,4,0.03,0.04,0.01,0.75,0.00,0.00,0.00,0.00,normal. 16/07/24 22:48:30 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool 16/07/24 22:48:30 INFO DAGScheduler: ResultStage 65 (take at CheckAll.scala:413) finished in 11.049 s 16/07/24 22:48:30 INFO DAGScheduler: Job 41 finished: take at CheckAll.scala:413, took 11.052917 s 0,tcp,http,S1,299,26280,0,0,0,1,0,1,0,1,0,0,0,0,0,0,0,0,15,16,0.07,0.06,0.00,0.00,1.00,0.00,0.12,231,255,1.00,0.00,0.00,0.01,0.01,0.01,0.00,0.00,normal. 0,tcp,telnet,S1,2895,14208,0,0,0,0,0,1,0,0,0,0,13,0,0,0,0,0,1,1,1.00,1.00,0.00,0.00,1.00,0.00,0.00,21,2,0.10,0.10,0.05,0.00,0.05,0.50,0.00,0.00,normal. 23,tcp,telnet,SF,104,276,0,0,0,0,5,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,2,1.00,0.00,1.00,1.00,0.00,0.00,0.00,0.00,guess_passwd. 13,tcp,telnet,SF,246,11938,0,0,0,0,4,1,0,0,0,0,2,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,89,2,0.02,0.04,0.01,0.00,0.00,0.00,0.00,0.00,normal. 12249,tcp,telnet,SF,3043,44466,0,0,0,1,0,1,13,1,0,0,12,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,61,8,0.13,0.05,0.02,0.00,0.00,0.00,0.00,0.00,normal. 60,tcp,telnet,S3,125,179,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,1,1,1.00,1.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,1.00,1.00,0.00,0.00,guess_passwd. 60,tcp,telnet,S3,126,179,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,2,2,0.50,0.50,0.50,0.50,1.00,0.00,0.00,23,23,1.00,0.00,0.04,0.00,0.09,0.09,0.91,0.91,guess_passwd. 583,tcp,telnet,SF,848,25323,0,0,0,1,0,1,107,1,1,100,1,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal. 11447,tcp,telnet,SF,3131,45415,0,0,0,1,0,1,0,1,0,0,15,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,100,10,0.09,0.72,0.01,0.20,0.01,0.10,0.69,0.20,사용 시간: 4602 s 16 / 07 / 24 22: 48: 30 INFO SparkContext: Invoking stop () from shutdown hook 16 / 07 / 24 22: 48: 30 INFO SparkUI: Stopped Spark web UI athttp://192.168.1.102:4040 16/07/24 22:48:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/07/24 22:48:30 INFO MemoryStore: MemoryStore cleared 16/07/24 22:48:30 INFO BlockManager: BlockManager stopped 16/07/24 22:48:30 INFO BlockManagerMaster: BlockManagerMaster stopped 16/07/24 22:48:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/07/24 22:48:30 INFO SparkContext: Successfully stopped SparkContext 16/07/24 22:48:30 INFO ShutdownHookManager: Shutdown hook called 16/07/24 22:48:30 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-1ab0ec11-672d-4778-9ae8-2050f44a5f91 16/07/24 22:48:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/07/24 22:48:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. Process finished with exit code 0
운행 결과 의 열 가지 데 이 터 를 나 는 이미 빨간색 으로 표시 하 였 으 니, 여러분 주의 하 세 요. 나 는 한 시간 이 넘 을 까 봐 두 렵 습 니 다.

좋은 웹페이지 즐겨찾기