[spark 87] Given a Driver Program, how to determine which codes are running on the Driver and which codes are executed on the Worker
10935 단어 driver
The next question is, given a driver programming, which are executed in the Driver process as "driver code"and which "task logic code"is wrapped into tasks and then distributed to compute nodes for computation?
1. Basic Spark driver application
package spark.examples.databricks.reference.apps.loganalysis
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object LogAnalyzer {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]")
val sc = new SparkContext(sparkConf)
//logFile path is provided by the program args
val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log"
//transform each line of the logFile into ApacheAccessLog object,
//RDD[T],T is of type ApacheAccessLog
//Because it will be used more than more, so cache it.
//// , ApacheAccessLog, cache
val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()
// Calculate statistics based on the content size.
//Retrieve the contentSize column and cache it
val contentSizes = accessLogs.map(log => log.contentSize).cache()
///reduce action,count action
println("Content Size Avg: %s, Min: %s, Max: %s".format(
contentSizes.reduce(_ + _) / contentSizes.count,
contentSizes.min,
contentSizes.max))
// Compute Response Code to Count.
//Take first 100 responseCode, no sort here
//take action
val responseCodeToCount = accessLogs
.map(log => (log.responseCode, 1))
.reduceByKey(_ + _)
.take(100)
println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")
// Any IPAddress that has accessed the server more than 10 times.
//take action
val ipAddresses = accessLogs
.map(log => (log.ipAddress, 1))
.reduceByKey(_ + _)
.filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times
.map(_._1) //Map to the IP Address column
.take(100)
println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")
// Top Endpoints.
//top action
val topEndpoints = accessLogs
.map(log => (log.endpoint, 1))
.reduceByKey(_ + _)
.top(10)(OrderingUtils.SecondValueOrdering)
println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")
sc.stop()
}
}
The Spark application first starts to run the main function on the Driver, during the execution process. The calculation logic starts from reading a data source (such as a file in HDFS) and creating an RDD. RDD is divided into two operations: transform and action. Transform uses lazy execution, and action operation will trigger the submission of Job.
Job submission means DAG division, computing task encapsulation, computing task distribution to each computing node (Worker), computing resource allocation, etc. This is the real start of task distribution. Therefore, the first thing to identify from the code is which actions.
Another question, how to identify the logic code executed during task execution?
The Application is executed on the Driver. After encountering the action of the RDD, it starts to submit the job. When the job is executed, the subsequent jobs are submitted one after another. That is to say, although a Spark Application can have multiple Jobs (each action corresponds to a Job) ), these Jobs are executed sequentially, and Job(X+1) will be executed after the execution of Job(X) is completed.
When encountering Job execution, such as contentSize.reduce(_+_) above, the so-called calculation logic is the function _+_, which is a summation operation. When the specific task logic is executed, the RDD is backtracked from the action, and the final data of the Partition to which the task belongs is obtained through conversion in the middle, and then reduce(_+_) is executed.
2. Spark Stream program
package spark.examples.streaming
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?
object SparkStreamingForPartition {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("NetCatWordCount")
conf.setMaster("local[3]")
val ssc = new StreamingContext(conf, Seconds(5))
val dstream = ssc.socketTextStream("192.168.26.140", 9999)
//foreachRDD DStream , Job , RDD 。 RDD RDD , Job ?
dstream.foreachRDD(rdd => {
//embedded function
def func(records: Iterator[String]) {
var conn: Connection = null
var stmt: PreparedStatement = null
try {
val url = "jdbc:mysql://192.168.26.140:3306/person";
val user = "root";
val password = ""
conn = DriverManager.getConnection(url, user, password)
records.flatMap(_.split(" ")).foreach(word => {
val sql = "insert into TBL_WORDS(word) values (?)";
stmt = conn.prepareStatement(sql);
stmt.setString(1, word)
stmt.executeUpdate();
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (stmt != null) {
stmt.close()
}
if (conn != null) {
conn.close()
}
}
}
/// RDD ,
val repartitionedRDD = rdd.repartition(3)
/// func ,func (Iterator)
repartitionedRDD.foreachPartition(func)
})
ssc.start()
ssc.awaitTermination()
}
}
The foreachRDD of DStream is an Output Operation, similar to the action of RDD. Therefore, the function parameters of the higher-order function foreachRDD will be executed on the worker. The func here is an internal function defined as the foreachRDD parameter function, so it will be sent to the Worker for execution. If the func is defined in the outermost layer, such as the direct internal function of the main function, can it be successfully serialized from the Driver to the Worker? ? I think it is possible. A function is just an ordinary object in Scala without a state, and a MySQL connection needs to be created when serializing and deserializing.
3. The rdd in foreachRDD continues to execute the action operator
package spark.examples.streaming
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object SparkStreamingForPartition2 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("NetCatWordCount")
conf.setMaster("local[3]")
val ssc = new StreamingContext(conf, Seconds(5))
val dstream = ssc.socketTextStream("192.168.26.140", 9999)
dstream.foreachRDD(rdd => {
// RDD flatMap ,
if (!rdd.isEmpty()) {
///RDD Record
val recordCount = rdd.count()
///RDD
val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _)
println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount)
} else {
println("Empty RDD, No Data")
}
})
ssc.start()
ssc.awaitTermination()
}
}
It should be noted that for an RDD without any elements (RDD.isEmpty is true), transformation operators such as flatMap cannot be executed. Is this a common situation in Spark Streaming to read an empty RDD? But this null check is performed in the reduce function of the RDD
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
The following operations are possible for an empty RDD and return 0
rdd.flatMap(_.split(" ")).map(_ => 1).count()
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Windows 네트워크 드라이브 경로를 전체 경로로 변환Windows 네트워크 드라이브는 매우 편리합니다. 하지만, 문서나 메일에 그 파일의 패스를 그대로 삽입하면, 타인은 그 파일의 패스를 인식할 수 없지요. 이 때문에 아래의 툴을 만들어 데스크탑 등에서 파일을 툴의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.