Spring boot 통합 kafka 및 storm

31720 단어 spring-boot
전재 하 다.
머리말
업무 수요 로 인해 Strom 과 kafka 를 spring boot 프로젝트 에 통합 시 켜 다른 서비스 출력 로 그 를 kafka 구독 화제 로 실현 해 야 합 니 다. storm 은 이 화 제 를 실시 간 으로 처리 하여 데이터 모니터링 과 다른 데이터 통 계 를 완성 해 야 합 니 다. 그러나 인터넷 튜 토리 얼 이 비교적 적 습 니 다. 오늘 쓰 고 싶 은 것 은 storm + kafka 를 spring boot 까지 어떻게 정리 하 는 지 하 는 것 입 니 다. 그리고 제 가 만난 구 덩이 를 말씀 드 리 겠 습 니 다.
사용 도구 및 환경 설정
1. 자바 버 전 jdk - 1.8
2. 컴 파일 도구 사용 IDEA - 2017
3. maven 프로젝트 관리
​ 4.spring boot-1.5.8.RELEASE
수요 구현
1. 왜 spring boot 에 통합 해 야 합 니까?
spring boot 를 사용 하여 각종 마이크로 서 비 스 를 통일 적 으로 관리 하고 여러 개의 분산 설정 을 피 할 수 있 습 니 다.
2. 구체 적 인 사고 와 통합 원인
spring boot 를 사용 하여 kafka, storm, redis 등 필요 한 bean 을 통일 적 으로 관리 하고 다른 서비스 로 그 를 통 해 Kafka 에 수집 하 며 KafKa 는 실시 간 으로 로 그 를 storm 에 보 내 고 strom bolt 시 해당 하 는 처리 작업 을 합 니 다.
닥 친 문제
1. spring boot 를 사용 하면 통합 storm 이 없습니다.
2. spring boot 시작 방식 으로 제출 Topolgy 를 어떻게 터치 하 는 지 모 르 겠 습 니 다.
3. 토폴로지 제출 시 numbis not client localhost 문제 발생
4. Storm bolt 에 서 는 주 해 를 통 해 실례 화 된 bean 을 얻 을 수 없습니다.
해결 방향
통합 하기 전에 우 리 는 해당 하 는 spring boot 의 시작 방식 과 설정 을 알 아야 합 니 다. (본문 을 읽 을 때 기본적으로 storm, kafka 및 spring boot 에 대해 알 고 사용 합 니 다)
  • spring boot 가 storm 을 통합 하 는 예 는 인터넷 에서 매우 적 지만 해당 하 는 수요 가 있 기 때문에 통합 이 필요 합 니 다. 먼저 필요 한 jar 가방 을 가 져 옵 니 다.
    
            org.apache.kafka
            kafka-clients
            0.10.1.1
        
    
        
            org.springframework.cloud
            spring-cloud-starter-stream-kafka
            
                
                    zookeeper
                    org.apache.zookeeper
                
                
                    spring-boot-actuator
                    org.springframework.boot
                
                
                    kafka-clients
                    org.apache.kafka
                
            
        
    
        
            org.springframework.kafka
            spring-kafka
            
                
                    kafka-clients
                    org.apache.kafka
                
            
        
    
    
        
            org.springframework.data
            spring-data-hadoop
            2.5.0.RELEASE
            
                
                    org.slf4j
                    slf4j-log4j12
                
                
                    commons-logging
                    commons-logging
                
                
                    netty
                    io.netty
                
                
                    jackson-core-asl
                    org.codehaus.jackson
                
                
                    curator-client
                    org.apache.curator
                
                
                    jettison
                    org.codehaus.jettison
                
                
                    jackson-mapper-asl
                    org.codehaus.jackson
                
                
                    jackson-jaxrs
                    org.codehaus.jackson
                
                
                    snappy-java
                    org.xerial.snappy
                
                
                    jackson-xc
                    org.codehaus.jackson
                
                
                    guava
                    com.google.guava
                
                
                    hadoop-mapreduce-client-core
                    org.apache.hadoop
                
                
                    zookeeper
                    org.apache.zookeeper
                
                
                    servlet-api
                    javax.servlet
                
    
            
        
        
            org.apache.zookeeper
            zookeeper
            3.4.10
            
                
                    slf4j-log4j12
                    org.slf4j
                
            
        
        
            org.apache.hbase
            hbase-client
            1.2.4
            
                
                    log4j
                    log4j
                
                
                    zookeeper
                    org.apache.zookeeper
                
                
                    netty
                    io.netty
                
                
                    hadoop-common
                    org.apache.hadoop
                
                
                    guava
                    com.google.guava
                
                
                    hadoop-annotations
                    org.apache.hadoop
                
                
                    hadoop-yarn-common
                    org.apache.hadoop
                
                
                    slf4j-log4j12
                    org.slf4j
                
            
        
        
            org.apache.hadoop
            hadoop-common
            2.7.3
            
                
                    commons-logging
                    commons-logging
                
                
                    curator-client
                    org.apache.curator
                
                
                    jackson-mapper-asl
                    org.codehaus.jackson
                
                
                    jackson-core-asl
                    org.codehaus.jackson
                
                
                    log4j
                    log4j
                
                
                    snappy-java
                    org.xerial.snappy
                
                
                    zookeeper
                    org.apache.zookeeper
                
                
                    guava
                    com.google.guava
                
                
                    hadoop-auth
                    org.apache.hadoop
                
                
                    commons-lang
                    commons-lang
                
                
                    slf4j-log4j12
                    org.slf4j
                
                
                    servlet-api
                    javax.servlet
                
            
        
        
            org.apache.hadoop
            hadoop-mapreduce-examples
            2.7.3
            
                
                    commons-logging
                    commons-logging
                
                
                    netty
                    io.netty
                
                
                    guava
                    com.google.guava
                
                
                    log4j
                    log4j
                
                
                    servlet-api
                    javax.servlet
                
            
        
    
        
        
            org.apache.storm
            storm-core
            ${storm.version}
            ${provided.scope}
            
                
                    org.apache.logging.log4j
                    log4j-slf4j-impl
                
                
                    servlet-api
                    javax.servlet
                
            
        
    
        
            org.apache.storm
            storm-kafka
            1.1.1
            
                
                    kafka-clients
                    org.apache.kafka
                
            
        
    그 중에서 jar 가방 을 제거 하 는 것 은 프로젝트 구축 의존 이 얼마나 되 는 지 에 대한 질문 입 니 다. storm 버 전 은 1.1.0 spring boot 관련 의존
    
          
              org.springframework.boot
              spring-boot-starter
              
                  
                      org.springframework.boot
                      spring-boot-starter-logging
                  
              
          
          
              org.springframework.boot
              spring-boot-starter-web
          
          
              org.springframework.boot
              spring-boot-starter-aop
          
          
              org.springframework.boot
              spring-boot-starter-test
              test
          
          
              org.springframework.boot
              spring-boot-starter-log4j2
          
          
              org.mybatis.spring.boot
              mybatis-spring-boot-starter
              ${mybatis-spring.version}
          
          
              org.springframework.boot
              spring-boot-configuration-processor
              true
          
    입 니 다.ps: maven 의 jar 패 키 지 는 프로젝트 사용 수요 때문에 가장 간단 하지 않 습 니 다. 참고 하 시기 바 랍 니 다. 프로젝트 구조: config - 서로 다른 환경 설정 파일 저장
  • java - config 저장 소 구축 spring boot 관련 구현 클래스 기타 구성 이름
    spring boot 를 시작 할 때 저희 가 발견 할 수 있 습 니 다.
  • 사실 통합 을 시작 하기 전에 storm 에 대해 아 는 것 이 적 고 처음에 접촉 하지 않 은 것 에 속 합 니 다. 나중에 참고 한 결과 spring boot 에 통합 되 어 spring boot 를 시작 한 후에 해당 하 는 방식 으로 Topolgy 를 제출 하 는 함 수 를 촉발 하지 않 았 기 때문에 spring boot 을 시작 한 후에 끝 난 줄 알 았 는데 30 분 동안 아무 일 도 일어나 지 않 았 습 니 다. -트리거 제출 함수 가 실현 되 지 않 았 음 을 알 게 되 었 습 니 다. 이 문 제 를 해결 하기 위해 서 제 생각 은 spring boot - > kafka 감청 Topic 을 만 들 고 Topolgy 를 시작 하 는 것 입 니 다. 그러나 이러한 문 제 를 해결 하기 위해 kafka 감청 이라는 주 제 는 반복 적 으로 Topolgy 를 촉발 합 니 다. 이것 은 우리 가 원 하 는 것 이 아 닙 니 다. 잠시 보 니 spring 에 관련 된 시작 이 완 료 된 후에 특정한 시간 을 실행 하 는 방법 이 있 습 니 다.이것 은 나 에 게 있어 서 그야말로 구원 의 신 이다. 그래서 지금 Topolgy 를 촉발 하 는 사고방식 은 스프링 boot 를 시작 하 는 것 - > 실행 촉발 방법 - > 상응하는 촉발 조건 구축 방법 을 완성 하 는 것 은:
    /**
    * @author Leezer
    * @date  2017/12/28
    * spring          Topology
    **/
    @Configuration
    @Component
    public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {
    
     private static String BROKERZKSTR;
     private static String TOPIC;
     private static String HOST;
     private static String PORT;
     public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,
                     @Value("${zookeeper.host}") String host,
                     @Value("${zookeeper.port}") String port,
                     @Value("${kafka.default-topic}") String topic
     ){
         BROKERZKSTR = brokerZkstr;
         HOST= host;
         TOPIC= topic;
         PORT= port;
     }
    
     @Override
     public void onApplicationEvent(ContextRefreshedEvent event) {
         try {
             //   topologyBuilder 。
             TopologyBuilder topologyBuilder = new TopologyBuilder();
             //            ,                   。
             BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
             //   Kafka   Topic,  zookeeper          
             SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
             spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
             spoutConfig.zkServers = Collections.singletonList(HOST);
             spoutConfig.zkPort = Integer.parseInt(PORT);
             // Kafka        
             spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
             KafkaSpout receiver = new KafkaSpout(spoutConfig);
             topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
             topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
             Config config = new Config();
             config.setDebug(false);
             /*   topology storm         slot ,  slot   supervisor      worker  ,      spot              worker    ,        ,              topology      2 worker  ,         4    topology  ,    topology                   。    kill   topology      slot     topology        。
             */
             config.setNumWorkers(1);
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
    }
  • 주:
  • 프로젝트 를 시작 할 때 tomcat 를 내장 하여 시작 하기 때문에 다음 과 같은 오류 가 발생 할 수 있 습 니 다
  • [Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
    java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
        at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
        at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
        at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
        at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
        at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
        at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

    이 는 가 져 온 jar 패키지 가 servlet - api 버 전 을 내장 버 전보 다 낮 게 도 입 했 기 때 문 입 니 다. 우리 가 해 야 할 일 은 Maven 의존 을 열 어 제거 하 는 것 입 니 다.
    
       servlet-api
       javax.servlet
    

    그리고 다시 시작 하면 돼.
  • 시작 과정 에서 보고 할 수 있 습 니 다.
    org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
    이 문 제 를 오래 생각 했 는데 인터넷 의 해석 이 모두 storm 설정 문제 로 인해 잘못 되 었 다 는 것 을 알 게 되 었 습 니 다. 그러나 제 storm 은 서버 에 배 치 된 것 입 니 다. 관련 설정 이 없고 이치 에 따라 서비스 기 에서 관련 설정 을 읽 어야 합 니 다. 그러나 결 과 는 그렇지 않 습 니 다.마지막 으로 몇 가지 방법 을 시 도 했 는데 모두 틀 렸 다 는 것 을 알 게 되 었 습 니 다. 클 러 스 터 를 구축 할 때 storm 은 해당 하 는 로 컬 클 러 스 터
    LocalCluster cluster = new LocalCluster();
    를 제공 하여 로 컬 테스트 를 했 습 니 다. 로 컬 테스트 에서 배치 테스트 를 하면 서버 에 배치 하려 면 다음 과 같은 작업 을 제출 해 야 합 니 다.
    cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
    //   :
    StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
    위 에서 언급 한 문제 1 - 3 문 제 를 해결 하 였 습 니 다. 4: bolt 에서 관련 bean 인 스 턴 스 를 사용 하 는 것 입 니 다. 저 는 @ Component 를 사용 하여 spring 에 가입 해도 인 스 턴 스 를 얻 을 수 없다 는 것 을 알 게 되 었 습 니 다. 제 추측 은 우리 가 Topolgy 를 구축 할 때
    topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
    bolt 를 실행 하 는 것 입 니 다.
    @Override
       public void prepare(Map stormConf, TopologyContext context,
                           OutputCollector collector) {
           this.collector = collector;
           StormLauncher stormLauncher = StormLauncher.getStormLauncher();
           dataRepositorys =(AlarmDataRepositorys)                 stormLauncher.getBean("alarmdataRepositorys");
       }
    bolt 를 예화 하지 않 고 스 레 드 가 다 르 기 때문에 spring 을 얻 지 못 할 것 입 니 다.(여기 서 저도 잘 모 르 겠 습 니 다. 만약 에 큰 사람 이 한 파 를 공유 할 수 있다 는 것 을 알 게 된다 면) 우리 가 spring boot 를 사용 하 는 의 미 는 바로 이런 복잡 한 대상 을 얻 는 데 있 습 니 다. 이 문 제 는 저 를 오랫동안 괴 롭 혔 습 니 다. 마지막 으로 우 리 는 문맥 getbean 을 통 해 인 스 턴 스 를 얻 을 수 있 을 지 모 르 겠 습 니 다. 그리고 저 는 정 의 를 내 렸 습 니 다. 예 를 들 어 저 는 bolt 에서 서 비 스 를 사용 해 야 합 니 다.여기 서 제 가 이 bean 의 이름 을 지정 하면 bolt 에서 prepare 를 실행 할 때 getbean 방법 으로 관련 bean 을 얻 으 면 해당 하 는 작업 을 완성 할 수 있 습 니 다.
  • 그리고 kafka 구독 테 마 를 bolt 에 보 내 서 관련 처 리 를 합 니 다. 여기 서 getbean 의 방법 은 bootmain 함수 정 의 를 시작 하 는 것 입 니 다.
    /**
    * @author Leezer
    * @date 2017/12/27
    *         
    **/
    @Service("alarmdataRepositorys")
    public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
       private static final String ERRO = "erro";
    
       /**
        * @param type   
        * @param key key 
        * @return     
        **/
       @Override
       public String getErrNumFromRedis(String type,String key) {
           if(type==null || key == null){
               return null;
           }else {
               ValueOperations valueOper = primaryStringRedisTemplate.opsForValue();
               return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
           }
    
       }
    
    
       /**
        * @param type     
        * @param key key 
        * @param value    
        **/
       @Override
       public void setErrNumToRedis(String type, String key,String value) {
           try {
               ValueOperations valueOper = primaryStringRedisTemplate.opsForValue();
               valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
           }catch (Exception e){
               logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key %s  redis  ",key));
           }
       }
    
    

    이 통합 storm 과 kafka 에서 spring boot 까지 끝 났 습 니 다. kafka 및 기타 설정 은 github 에 넣 겠 습 니 다.
    참, 여기 카 프 카 클 라 이언 트 구덩이 가 하나 더 있어 요.
    @SpringBootApplication
    @EnableTransactionManagement
    @ComponentScan({"service","storm"})
    @EnableMongoRepositories(basePackages = {"storm"})
    @PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
    @ImportResource(locations = {
            "classpath:/configs/spring-hadoop.xml",
            "classpath:/configs/spring-hbase.xml"})
    public class StormLauncher extends SpringBootServletInitializer {
    
        //       launcher  
        private volatile static StormLauncher stormLauncher;
        //     
        private ApplicationContext context;
    
        public static void main(String[] args) {
    
            SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
         // application.web(false).run(args);    spring boot  web    
            application.run(args);
            StormLauncher s = new StormLauncher();
            s.setApplicationContext(application.context());
            setStormLauncher(s);
        }
    
        private static void setStormLauncher(StormLauncher stormLauncher) {
            StormLauncher.stormLauncher = stormLauncher;
        }
        public static StormLauncher getStormLauncher() {
            return stormLauncher;
        }
    
        @Override
        protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
            return application.sources(StormLauncher.class);
        }
    
    
        /**
         *      
         *
         * @return the application context
         */
        public ApplicationContext getApplicationContext() {
            return context;
        }
    
        /**
         *      .
         *
         * @param appContext    
         */
        private void setApplicationContext(ApplicationContext appContext) {
            this.context = appContext;
        }
    
        /**
         *      name      Bean.
         *
         * @param name the name
         * @return the bean
         */
        public Object getBean(String name) {
            return context.getBean(name);
        }
    
        /**
         *   class  Bean.
         *
         * @param    the type parameter
         * @param clazz the clazz
         * @return the bean
         */
        public  T getBean(Class clazz) {
            return context.getBean(clazz);
        }
    
        /**
         *   name,  Clazz     Bean
         *
         * @param    the type parameter
         * @param name  the name
         * @param clazz the clazz
         * @return the bean
         */
        public  T getBean(String name, Class clazz) {
            return context.getBean(name, clazz);
        }

    프로젝트 는 kafka client 문 제 를 보고 할 것 입 니 다. 이것 은 storm - kafka 에서 kafka 는 0.8 버 전 을 사 용 했 고 NetworkSend 는 0.9 이상 의 버 전 입 니 다. 여기 서 집성 은 당신 이 집성 한 kafka 관련 버 전과 일치 해 야 합 니 다.
    집적 은 비교적 간단 하지만 참고 가 비교적 적다. 게다가 처음에 storm 을 접 했 기 때문에 생각 이 비교적 많 고 여기에 기록 해 보 자.
    프로젝트 주소 - github
    참고 문헌: springboot - storm - integration

    좋은 웹페이지 즐겨찾기