[spark 87] Given a Driver Program, how to determine which codes are running on the Driver and which codes are executed on the Worker

10935 단어 driver
The Driver Program is an application written by the user and submitted to the Spark cluster for execution. It consists of two parts

  • As a driver: Driver cooperates with Master and Worker to complete application process startup, DAG division, computing task encapsulation, computing task distribution to each computing node (Worker), computing resource allocation, etc.
  • The calculation logic itself, when the calculation task is executed in the Worker, execute the calculation logic to complete the calculation task of the application

  • The next question is, given a driver programming, which are executed in the Driver process as "driver code"and which "task logic code"is wrapped into tasks and then distributed to compute nodes for computation?
     

    1. Basic Spark driver application


    package spark.examples.databricks.reference.apps.loganalysis
    
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.SparkContext._
    
    
    object LogAnalyzer {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]")
        val sc = new SparkContext(sparkConf)
    
    
        //logFile path is provided by the program args
    
    
    
        val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log"
    
        //transform each line of the logFile into ApacheAccessLog object,
        //RDD[T],T is of type ApacheAccessLog
        //Because it will be used more than more, so cache it.
        //// , ApacheAccessLog, cache
        val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()
    
        // Calculate statistics based on the content size.
        //Retrieve the contentSize column and cache it
        val contentSizes = accessLogs.map(log => log.contentSize).cache()
        ///reduce action,count action
        println("Content Size Avg: %s, Min: %s, Max: %s".format(
          contentSizes.reduce(_ + _) / contentSizes.count,
          contentSizes.min,
          contentSizes.max))
    
        // Compute Response Code to Count.
        //Take first 100 responseCode, no sort here
        //take action
        val responseCodeToCount = accessLogs
          .map(log => (log.responseCode, 1))
          .reduceByKey(_ + _)
          .take(100)
        println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")
    
        // Any IPAddress that has accessed the server more than 10 times.
        //take action
        val ipAddresses = accessLogs
          .map(log => (log.ipAddress, 1))
          .reduceByKey(_ + _)
          .filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times
          .map(_._1) //Map to the IP Address column
          .take(100)
        println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")
    
        // Top Endpoints.
        //top action
        val topEndpoints = accessLogs
          .map(log => (log.endpoint, 1))
          .reduceByKey(_ + _)
          .top(10)(OrderingUtils.SecondValueOrdering)
        println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")
    
        sc.stop()
      }
    }
    
     
    The Spark application first starts to run the main function on the Driver, during the execution process. The calculation logic starts from reading a data source (such as a file in HDFS) and creating an RDD. RDD is divided into two operations: transform and action. Transform uses lazy execution, and action operation will trigger the submission of Job.
    Job submission means DAG division, computing task encapsulation, computing task distribution to each computing node (Worker), computing resource allocation, etc. This is the real start of task distribution. Therefore, the first thing to identify from the code is which actions.
    Another question, how to identify the logic code executed during task execution?
    The Application is executed on the Driver. After encountering the action of the RDD, it starts to submit the job. When the job is executed, the subsequent jobs are submitted one after another. That is to say, although a Spark Application can have multiple Jobs (each action corresponds to a Job) ), these Jobs are executed sequentially, and Job(X+1) will be executed after the execution of Job(X) is completed.
     
    When encountering Job execution, such as contentSize.reduce(_+_) above, the so-called calculation logic is the function _+_, which is a summation operation. When the specific task logic is executed, the RDD is backtracked from the action, and the final data of the Partition to which the task belongs is obtained through conversion in the middle, and then reduce(_+_) is executed.
     

    2. Spark Stream program


     
    package spark.examples.streaming
    
    import java.sql.{PreparedStatement, Connection, DriverManager}
    import java.util.concurrent.atomic.AtomicInteger
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    
    //No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?
    
    
    object SparkStreamingForPartition {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("NetCatWordCount")
        conf.setMaster("local[3]")
        val ssc = new StreamingContext(conf, Seconds(5))
        val dstream = ssc.socketTextStream("192.168.26.140", 9999)
        //foreachRDD DStream , Job , RDD 。 RDD RDD , Job ?
        dstream.foreachRDD(rdd => {
          //embedded function
          def func(records: Iterator[String]) {
            var conn: Connection = null
            var stmt: PreparedStatement = null
            try {
              val url = "jdbc:mysql://192.168.26.140:3306/person";
              val user = "root";
              val password = ""
              conn = DriverManager.getConnection(url, user, password)
              records.flatMap(_.split(" ")).foreach(word => {
                val sql = "insert into TBL_WORDS(word) values (?)";
                stmt = conn.prepareStatement(sql);
                stmt.setString(1, word)
                stmt.executeUpdate();
              })
            } catch {
              case e: Exception => e.printStackTrace()
            } finally {
              if (stmt != null) {
                stmt.close()
              }
              if (conn != null) {
                conn.close()
              }
            }
          }
          /// RDD , 
          val repartitionedRDD = rdd.repartition(3)
          /// func ,func (Iterator)
          repartitionedRDD.foreachPartition(func)
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
     
    The foreachRDD of DStream is an Output Operation, similar to the action of RDD. Therefore, the function parameters of the higher-order function foreachRDD will be executed on the worker. The func here is an internal function defined as the foreachRDD parameter function, so it will be sent to the Worker for execution. If the func is defined in the outermost layer, such as the direct internal function of the main function, can it be successfully serialized from the Driver to the Worker? ? I think it is possible. A function is just an ordinary object in Scala without a state, and a MySQL connection needs to be created when serializing and deserializing.
     

    3. The rdd in foreachRDD continues to execute the action operator


     
    package spark.examples.streaming
    
    import java.sql.{PreparedStatement, Connection, DriverManager}
    import java.util.concurrent.atomic.AtomicInteger
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._
    
    object SparkStreamingForPartition2 {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("NetCatWordCount")
        conf.setMaster("local[3]")
        val ssc = new StreamingContext(conf, Seconds(5))
        val dstream = ssc.socketTextStream("192.168.26.140", 9999)
        dstream.foreachRDD(rdd => {
          // RDD flatMap , 
          if (!rdd.isEmpty()) {
            ///RDD Record 
            val recordCount = rdd.count()
            ///RDD 
            val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _)
            println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount)
          } else {
            println("Empty RDD, No Data")
          }
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
     
    It should be noted that for an RDD without any elements (RDD.isEmpty is true), transformation operators such as flatMap cannot be executed. Is this a common situation in Spark Streaming to read an empty RDD? But this null check is performed in the reduce function of the RDD
     
      def reduce(f: (T, T) => T): T = {
        val cleanF = sc.clean(f)
        val reducePartition: Iterator[T] => Option[T] = iter => {
          if (iter.hasNext) {
            Some(iter.reduceLeft(cleanF))
          } else {
            None
          }
        }
        var jobResult: Option[T] = None
        val mergeResult = (index: Int, taskResult: Option[T]) => {
          if (taskResult.isDefined) {
            jobResult = jobResult match {
              case Some(value) => Some(f(value, taskResult.get))
              case None => taskResult
            }
          }
        }
        sc.runJob(this, reducePartition, mergeResult)
        // Get the final result out of our Option, or throw an exception if the RDD was empty
        jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
      }
     
    The following operations are possible for an empty RDD and return 0
      rdd.flatMap(_.split(" ")).map(_ => 1).count()
     
     

    좋은 웹페이지 즐겨찾기