flume 소스 코드 분석

5833 단어 Flume
flume 는 신뢰성 이 높 은 분포 식 큰 파일 수집 시스템 이다.그것 은 데 이 터 를 잃 어 버 리 지 않도록 transaction 을 제공 했다.
flume 홈 페이지:http://flume.apache.org/
Flume 문서:http://flume.apache.org/FlumeUserGuide.html,http://flume.apache.org/FlumeDeveloperGuide.html
 
설치:홈 페이지 에서 flume 를 다운로드 한 후 압축 해제
시작:nohup bin/flume-ng 에이전트--conf --conf-file --name -Dflume.root.logger=DEBUG,console &
 
Flume 은 주로 세 부분 을 포함 합 니 다.source,channel,sink.source 는 데 이 터 를 수신 하 는 데 사 용 됩 니 다.channel 은 버퍼 채널 이 고 sink 은 데 이 터 를 목적 단 으로 보 냅 니 다.source 는 여러 채널 을 설정 할 수 있 습 니 다.channel 은 channel Select 를 통 해 그 channel 로 보 낼 수 있 습 니 다.각 channel 로 보 낼 수도 있 고 매개 변 수 를 설정 할 수도 있 습 니 다.특정한 값 을 만족 시 킬 때 특정한 channel 로 보 낼 수도 있 습 니 다.채널 마다 여러 개의 sink 을 설정 할 수 있 습 니 다.sinkprocess 를 통 해 load balance 나 failover 를 만 듭 니 다.
 
flume-ng 명령 은 애플 리 케 이 션 의 main 함 수 를 호출 합 니 다.reloadconfigure 파일 이 필요 하 다 면 애플 리 케 이 션 을 eventBus 에 등록 하고 파일 이 변경 되 었 을 때 애플 리 케 이 션 의 handle ConfigurationEvent 방법 을 호출 합 니 다.
 public static void main(String[] args) {
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else {
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
        application = new Application();
        application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }
      application.start();
  }

    application 의 start 방법 은 슈퍼 visor.supervise()를 호출 합 니 다.이 방법 은 component 의 start 방법 을 호출 하려 고 시도 합 니 다.component 목록 에는 pollingProperties FileConfigurationProvider 대상 이 포함 되 어 있 습 니 다.이 대상 의 start 방법 은 스 레 드 를 시작 하여 파일 의 변경 을 감시 합 니 다.초기 상태 파일 은 변경 되 었 습 니 다.이 어 애플 리 케 이 션 의 handle ConfigurationEvent 방법 을 호출 합 니 다.
  public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

  ...에 있다 handle ConfigurationEvent 에서 먼저 호출 Properties FileConfigurationProvider 의 getConfiguration 방법 은 설정 파일 을 통 해 source,sink,channel 을 만 들 고 각 구성 요소 의 configure 방법 을 호출 한 다음  startAllComponents 방법 을 호출 하여 channel,source,sink 을 시작 하고 Monitor 를 불 러 와 flume 을 감시 하 는 metrics 를 사용 합 니 다.
 private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;

    for (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try{
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e){
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    /*
     * Wait for all channels to start.
     */
    for(Channel ch: materializedConfiguration.getChannels().values()){
      while(ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)){
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }

    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
        .entrySet()) {
      try{
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    for (Entry<String, SourceRunner> entry : materializedConfiguration
        .getSourceRunners().entrySet()) {
      try{
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();
  }

 

좋은 웹페이지 즐겨찾기