Spark 사례 실습

14670 단어
사례 실습
Spark Shell은 프로그램을 테스트하고 검증하는 데만 많이 사용되며, 프로덕션 환경에서는 일반적으로 IDE
에서 프로그램을 작성한 다음jar 패키지로 만들어서 그룹에 제출합니다. 가장 자주 사용하는 것은 Maven 프로젝트를 만들고 이용하는 것입니다.
Maven은jar 패키지의 의존을 관리합니다.
 
 
1 WordCount 프로그램 작성
 
1) Maven 프로젝트 WordCount 작성 및 종속 가져오기


         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4.0.0

    com.lxl
    spark02
    pom
    1.0-SNAPSHOT
    
        sparkCore
    

    
        
            org.apache.spark
            spark-core_2.11
            2.1.1
        
    
    
        
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            
        
    

 
 
 
2) 코드 작성
package com.lxl

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {

    /*
    1.      
     */
    val conf = new SparkConf().setAppName("wc")


    /*
    2.  sparkcontext
     */
    val sc = new SparkContext(conf)



    /*
    3.  
     */
    //    
    val lines = sc.textFile(args(0)) //    

    //   flatMap
    val words = lines.flatMap(_.split(" "))

    //map(word,1)
    val k2v = words.map((_, 1))

    //resuceBykey(word, x)
    val result = k2v.reduceByKey(_ + _)

    ////result.collect()
    
    //       
    result.saveAsTextFile(args(1)) //          


    //    
    sc.stop()



  }

}

 
 
 
 
3) 패키지 플러그 인
 


         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
        spark01
        com.atlxl
        1.0-SNAPSHOT
    
    4.0.0

    sparkCore
    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        
                            WordCount
                        
                    
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    


 
 
 
4) 클러스터 테스트에 패키지
 
먼저jar 패키지를spark의 집 디렉터리에 복사해서wordcount로 변경합니다.
[lxl@hadoop102 spark]$ mv sparkCore-1.0-SNAPSHOT.jar wordcount.jar

 
[lxl@hadoop102 spark]$ bin/spark-submit \
--class com.lxl.WordCount \
--master spark://hadoop102:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
./wordcount.jar \
hdfs://hadoop102:9000/fruit.tsv \
hdfs://hadoop102:9000/out

 
 
 
 
 
 
 
2 로컬 디버그
로컬 Spark 프로그램 디버깅은 local 제출 모드를 사용해야 하며, 본 컴퓨터를 실행 링으로 삼을 것입니다
경계, 마스터와 워커는 모두 이 컴퓨터입니다.실행할 때 인터럽트 디버깅을 하면 됩니다.다음과 같습니다.
SparkConf를 만들 때 로컬 실행을 나타내는 추가 속성을 설정합니다.
val conf = new SparkConf().setAppName("WC").setMaster("local[*]")

 
 
전체 코드: (WordCount 2단계 코드를 조금만 수정하면 됩니다.)
package com.lxl

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    /*
    1.      
     */
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")


    /*
    2.  sparkcontext
     */
    val sc = new SparkContext(conf)



    /*
    3.  
     */
    //    
    val lines = sc.textFile("C:\\Users\\67001\\Desktop\\word.txt") //    

    //   flatMap
    val words = lines.flatMap(_.split(" "))

    //map(word,1)
    val k2v = words.map((_, 1))

    //resuceBykey(word, x)
    val result = k2v.reduceByKey(_ + _)

    ////    result.collect()

    //    
//    result.saveAsTextFile(args(1))

    //      
    result.foreach(println)


    //    
    sc.stop()
  }

}

 
 
 
 
 
3 원격 디버깅
IDEA를 통한 원격 디버깅은 주로 IDEA를 Driver로 응용 프로그램을 제출하고,
구성 프로세스는 다음과 같습니다.
sparkConf를 수정하여 최종 실행 중인 Jar 패키지, Driver 프로그램의 주소를 추가합니다.
Master의 커밋 주소를 설정합니다.
val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
.setJars(List("D:\\Workspace\\IDEA_work\\Spark_Work\\spark02\\sparkCore\\target\\sparkCore-1.0-SNAPSHOT.jar"))

 
 
전체 코드:
package com.lxl

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    /*
    1.      
     */
    val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
      .setJars(List("D:\\Workspace\\IDEA_work\\Spark_Work\\spark02\\sparkCore\\target\\sparkCore-1.0-SNAPSHOT.jar"))


    /*
    2.  sparkcontext
     */
    val sc = new SparkContext(conf)



    /*
    3.  
     */
    //    
    val lines = sc.textFile("hdfs://hadoop102:9000/fruit.tsv") //HDFS  

    //   flatMap
    val words = lines.flatMap(_.split(" "))

    //map(word,1)
    val k2v = words.map((_, 1))

    //resuceBykey(word, x)
    val result = k2v.reduceByKey(_ + _)

    ////    result.collect()

    //    
    result.saveAsTextFile("hdfs://hadoop102:9000/out1") //   HDFS   

    //      
//    result.foreach(println)


    //    
    sc.stop()
  }

}

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
전재 대상:https://www.cnblogs.com/LXL616/p/11139436.html

좋은 웹페이지 즐겨찾기