Spark Streaming - OrdCount 프로그램

public class WordCount {
	
	public static void main(String[] args) throws Exception {
		//   SparkConf  
		//          ,          Master  ,           local  
		// local          ,        ,                  
		// Spark Streaming  
		SparkConf conf = new SparkConf()
				.setMaster("local[2]")
				.setAppName("WordCount");  
		
		//   JavaStreamingContext  
		//    ,    Spark Core  JavaSparkContext,    Spark SQL  SQLContext
		//        SparkConf      
		//        batch interval  ,   ,          ,     batch,    
		//       
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
	
		//   ,    DStream,         (  kafka、socket)            
		//   JavaStreamingContext socketTextStream()  ,          Socket     
		//    ,JavaReceiverInputStream,        DStream
		// socketTextStream()          ,              ,          
		JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999);
		
		//      ,      JavaReceiverInputDStream  ,    ,    RDD,     
		//           
		// RDD      String,        
		//   ,  JavaReceiverInputStream     ,          RDD     
		
		//          ,    ,  Spark Core     ,     DStream   
		//    ,      DStream       RDD,       DStream    
		//     RDD,    DStream  RDD
		JavaDStream words = lines.flatMap(new FlatMapFunction() {

			private static final long serialVersionUID = 1L;

			@Override
			public Iterable call(String line) throws Exception {
				return Arrays.asList(line.split(" "));  
			}
			
		});
		
		//     ,     ,       ,          ,words DStream  RDD     
		//          
		
		//   ,    flatMap、reduceByKey  
		JavaPairDStream pairs = words.mapToPair(
				
				new PairFunction() {

					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2 call(String word)
							throws Exception {
						return new Tuple2(word, 1);
					}
					
				});
		
		//   ,      ,        , Spark Streaming    , Spark Core   
		//       Spark Core  JavaRDD、JavaPairRDD,    JavaDStream、JavaPairDStream
		
		JavaPairDStream wordCounts = pairs.reduceByKey(
				
				new Function2() {
			
					private static final long serialVersionUID = 1L;

					@Override
					public Integer call(Integer v1, Integer v2) throws Exception {
						return v1 + v2;
					}
					
				});
		
		//     ,         wordcount   
		//         ,      
		//         socket      ,   lines DStream   
		//   lines DStream       ,          ,  hell world,     RDD
		//    ,         RDD,             
		//   , lins RDD   flatMap  ,    words RDD,  words DStream    RDD
		//     ,        ,wordCounts RDD,  wordCounts DStream    RDD
		//   ,    ,               
		//   ,     ,Spark Streaming     ,    ,                
		//     redis   
		//        Storm      ,storm             ,      ,     
		//      ,            
		//   Spark             ,  ,   words pairs DStream ,        
		//     
		//             wordCounts       RDD,       ,     DB
		
		//   ,     ,                
		//    5  ,          
		
		Thread.sleep(5000);  
		wordCounts.print();
			
		//    JavaSteamingContext        
		//     JavaStreamingContext start()  ,  Spark Streaming Application      
		//         
		jssc.start();
		jssc.awaitTermination();
		jssc.close();
	}
	
}

좋은 웹페이지 즐겨찾기