34-storm 학습-storm 튜토리얼:WordCount 프로그램을 맨손으로 두드리기

storm 핵심의 기본 원리는 지난 편에서 대충 알아봤습니다.
33-storm 학습-역사상 가장 통속적이고 알기 쉬운 Storm 강좌: 대백화 소개 Storm
이제 우리는 코드를 쓰고storm의 프로그램이 어떻게 개발되었는지 체험해 봅시다. 코드를 이해한 후에 전에 설명한 기본 원리를 돌아보면 알 수 있습니다.
단어 카운터를 만들다.
너는storm이 끊임없이 문장을 받아들인 후에 문장 중의 단어의 출현 횟수를 실시간으로 통계해야 한다고 생각할 수 있다
(1) 건설 공사 환경



  4.0.0
  storm-wordcount
  jar

  storm-wordcount

  
    UTF-8
  

  
    
      org.apache.storm
      storm-core
      1.1.0
    
    
      commons-collections
      commons-collections
      3.2.1
    
    
      com.google.guava
      guava
    
  

  
    src/main/java
    test/main/java

    
        
            org.apache.maven.plugins
            maven-shade-plugin
            
                true
                
                    
                        *:*
                        
                            META-INF/*.SF
                            META-INF/*.sf
                            META-INF/*.DSA
                            META-INF/*.dsa
                            META-INF/*.RSA
                            META-INF/*.rsa
                            META-INF/*.EC
                            META-INF/*.ec
                            META-INF/MSFTSIG.SF
                            META-INF/MSFTSIG.RSA
                        
                    
                
            
            
                
                    package
                    
                        shade
                    
                    
                        
                            
                            
                            
                        
                    
                
            
        

      
        org.codehaus.mojo
        exec-maven-plugin
        1.2.1
        
          
            
              exec
            
          
        
        
          java
          true
          false
          compile
          
        
      
    
  

(2) 코드 작성
/**
 *       
 */
public class WordCountTopology {

    /**
     * Spout -      BaseRichSpout,          ,    kafka     
     */
    public static class RandomSentenceSpout extends BaseRichSpout {

      SpoutOutputCollector _collector;
      Random _rand;

      /**
       * open    Spout       
       *        、      、    httpclient
       *  open        ,     SpoutOutputCollector,            。
       */
      @Override
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        //          
        _rand = new Random();
      }

      /**
       *   Spout    ,      task ,  worker     excuter       task
       *   task         nextTuple()  
       *                ,       Stream
       */
      @Override
      public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
                sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
        //  sentences.length            
        final String sentence = sentences[_rand.nextInt(sentences.length)];
        //     
        _collector.emit(new Values(sentence));
      }

      protected String sentence(String input) {
        return input;
      }

      @Override
      public void ack(Object id) {
      }

      @Override
      public void fail(Object id) {
      }

      /**
       * declareOutputFields  :           tuple    field   
       */
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
      }

    }

  /**
   * bolt : spout     bolt,
   *   bolt        worker     excuter       task
   *
   */
  public static class SplitSentence implements IRichBolt {

    private static final long serialVersionUID = 6604009953652729483L;
        
        private OutputCollector collector;
        
        /**
         *   bolt  ,     ,  prepare  ,      
         *    OutputCollector,    Bolt   tuple    
         */
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        /**
         * execute  
         *    ,          ,      executor     
         * 
         */
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence"); 
            String[] words = sentence.split(" "); 
            for(String word : words) {
                collector.emit(new Values(word)); 
            }
        }

        /**
         *        tuple,  field   
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));   
        }
  }

  
    public static class WordCount extends BaseRichBolt {

        private static final long serialVersionUID = 7208077706057284643L;
        
        private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);

        private OutputCollector collector;
        private Map wordCounts = new HashMap();
        
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        public void execute(Tuple tuple) {
            String word = tuple.getStringByField("word");
            
            Long count = wordCounts.get(word);
            if(count == null) {
                count = 0L;
            }
            count++;
            
            wordCounts.put(word, count);
            
            LOGGER.info("【    】" + word + "      " + count);  
            
            collector.emit(new Values(word, count));
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));    
        }
        
    }


    public static void main(String[] args) {
        //1、      
        //Storm       , JAVA         ,    TopologyBuilder    
        //  main   ,   spout bolts    ,       
        TopologyBuilder builder = new TopologyBuilder();

    
        //            ,     spout      
        //         ,      spout   
        //         ,    spout executor   
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        builder.setBolt("SplitSentence", new SplitSentence(), 5)
                .setNumTasks(10)
                .shuffleGrouping("RandomSentence");
        //      ,   ,     , SplitSentence     ,
                           task 
        //      ,               
        //        ,hello,  task1   3 hello,task2   2 hello
        //    5 hello,        task,       5 hello。
        builder.setBolt("WordCount", new WordCount(), 10)
                .setNumTasks(20)
                .fieldsGrouping("SplitSentence", new Fields("word"));  
        
        //2、    
        //    ?      ?
        Config config = new Config();
    
        //          ,     storm    
        if(args != null && args.length > 0) {
            //                       topology
            config.setNumWorkers(3);  
            try {
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());  
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            //     eclipse      
            config.setMaxTaskParallelism(20);  
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("WordCountTopology", config, builder.createTopology());  
            
            Utils.sleep(60000); 
            
            cluster.shutdown();
        }
    }
}

로컬 부팅에서도 효과를 볼 수 있습니다.

좋은 웹페이지 즐겨찾기