Spark Streaming의 통계 소켓 단어 수

5463 단어 Spark

1. socket 단어 수 통계


TCP 소켓의 데이터 서버에서 수신한 텍스트 데이터의 단어 수입니다.
 

2. maven 설정




  4.0.0

  com.mk
  spark-test
  1.0

  spark-test
  http://spark.mk.com

  
    UTF-8
    1.8
    1.8
    2.11.1
    2.4.4
    2.6.0
  

  
    
    
      org.scala-lang
      scala-library
      ${scala.version}
    

    
    
      org.apache.spark
      spark-core_2.11
      ${spark.version}
    
    
      org.apache.spark
      spark-sql_2.11
      ${spark.version}
    
    
      org.apache.spark
      spark-streaming_2.11
      ${spark.version}
    


    
      junit
      junit
      4.11
      test
    
    
      org.projectlombok
      lombok
      1.18.10
    


  
    
      

        
          maven-clean-plugin
          3.1.0
        

        
          maven-resources-plugin
          3.0.2
        
        
          maven-compiler-plugin
          3.8.0
        
        
          maven-surefire-plugin
          2.22.1
        
        
          maven-jar-plugin
          3.0.2
        



      
    
  


 

3. 프로그래밍 코드

public class SocketApp implements SparkConfInfo {

    public static void main(String[] args) throws InterruptedException {

        JavaStreamingContext streamingContext = new SocketApp().getStreamingContext("SocketApp", 5);
        JavaReceiverInputDStream lines = streamingContext.socketTextStream("localhost", 8891);
        JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")).stream()
                .filter(v->v.length()>0).iterator());
        JavaPairDStream pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream wordCounts = pairs.reduceByKey(Integer::sum);
        wordCounts.foreachRDD(v->{
            v.foreach(s-> System.out.println(s._1+":" + s._2));
            System.out.println("---------------------------");
        });
        streamingContext.start();
        streamingContext.awaitTermination();
    }
}


public interface SparkConfInfo {

    default JavaStreamingContext getStreamingContext(String appName, int second){
        SparkConf sparkConf = getSparkConf();
        sparkConf.setAppName(appName);
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(second));
        return jssc;
    }

    default SparkSession getSparkSession(String appName){
        SparkConf sparkConf = getSparkConf();

        SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();
        return session;
    }

    default SparkConf getSparkConf() {
        SparkConf sparkConf = new SparkConf();
        if(System.getProperty("os.name").toLowerCase().contains("win")) {
            sparkConf.setMaster("local[4]");
            System.out.println(" spark");
        }else
        {
            sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");
            sparkConf.set("spark.driver.host","192.168.150.1");// ip, spark , : 
            sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});// 
        }
        return sparkConf;
    }
}

 
입력 내용
Tom        Lucy
Tom        Jack
Jone        Lucy
Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
Philip        Alma
Mark        Terry
Mark        Alma

결과 내보내기
Mark:2
Tom:2
Jesse:2
Philip:2
Alice:2
Jone:2
Terry:4
Alma:2
Ben:1
Lucy:4
Mary:1
Jack:4
---------------------------

좋은 웹페이지 즐겨찾기