Flume 구조 와 소스 코드 분석-핵심 구성 요소 분석-2

13971 단어 Flume

4.전체 절차
위 부분 에서 알 수 있 듯 이 Source 든 Sink 이 든 모두 Channel 에 의존 합 니 다.그러면 시작 할 때 채널 을 먼저 시작 한 다음 에 Source 나 Sink 을 시작 하면 됩 니 다.
 
Flume 은 두 가지 시작 방식 이 있 습 니 다.Embeddedagent 를 사용 하여 자바 응용 프로그램 에 끼 워 넣 거나 응용 프로그램 을 사용 하여 하나의 프로 세 스 를 단독으로 시작 합 니 다.여 기 는 응용 프로그램 분석 을 위주 로 합 니 다.
 
먼저 org.apache.flume.node.Application 의 main 방법 으로 시작 합 니 다.
//1、         、       
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);

option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);

//2、         
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);

String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");

if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
  isZkConfigured = true;
}

if (isZkConfigured) {
    //3、     ZooKeeper  ,   ZooKeeper    ,    ,         
} else {
  //4、      ,          
  File configurationFile = new File(commandLine.getOptionValue('f'));
  if (!configurationFile.exists()) {
         throw new ParseException(
        "The specified configuration file does not exist: " + path);
  }
  List<LifecycleAware> components = Lists.newArrayList();

  if (reload) { //5、      reload    ,      
    //5.1、    Guava       
    EventBus eventBus = new EventBus(agentName + "-event-bus");
    //5.2、      ,          ,  30s    
    PollingPropertiesFileConfigurationProvider configurationProvider =
        new PollingPropertiesFileConfigurationProvider(
          agentName, configurationFile, eventBus, 30);
    components.add(configurationProvider);
    application = new Application(components); //5.3、 Application    
    //5.4、          ,EventBus     Application   @Subscribe     
    eventBus.register(application);

  } else { //5、         reload
    PropertiesFileConfigurationProvider configurationProvider =
        new PropertiesFileConfigurationProvider(
          agentName, configurationFile);
    application = new Application();
    //6.2、           Flume  
    application.handleConfigurationEvent(configurationProvider
      .getConfiguration());
  }
}
//7、  Flume  
application.start();

//8、         ,         Application stop      
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
  @Override
  public void run() {
    appReference.stop();
  }
});

 
상기 절 차 는 핵심 코드 중의 일부분 만 추출 했다.예 를 들 어 ZK 의 실현 은 무시 되 었 고 Flume 시작 은 대체적으로 다음 과 같다.
1.명령 행 인자 읽 기;
2.설정 파일 읽 기;
3.reload 가 필요 한 지 여부 에 따라 다른 정책 으로 Flume 을 초기 화 합 니 다.reload 가 필요 하 다 면 Guava 의 이벤트 버스 를 사용 하여 이 루어 집 니 다.Application 의 handle ConfigurationEvent 는 이벤트 구독 자 이 고,PollingProperties FileConfigurationProvider 는 이벤트 게시 자 이 며,정기 적 으로 교대 훈련 을 통 해 파일 변경 여 부 를 확인 합 니 다.변경 되면 프로필 을 다시 읽 고 프로필 이벤트 변경 을 발표 합 니 다.handle ConfigurationEvent 는 이 설정 변경 을 받 고 초기 화 됩 니 다.
4.애플 리 케 이 션 을 시작 하고 가상 컴퓨터 를 등록 하여 갈 고 리 를 닫 습 니 다.
 
handle ConfigurationEvent 방법 은 간단 합 니 다.먼저 stopAllComponents 를 호출 하여 모든 구성 요 소 를 중단 한 다음 startAllComponents 를 호출 하여 설정 파일 을 사용 하여 모든 구성 요 소 를 초기 화 합 니 다. 
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
  stopAllComponents();
  startAllComponents(conf);
} 

Materialized Configuration 은 Flume 이 실 행 될 때 필요 한 구성 요 소 를 저장 합 니 다.Source,Channel,Sink,SourceRunner,SinkRunner 등 은 ConfigurationProvider 를 통 해 초기 화 됩 니 다.예 를 들 어 PollingProperties FileConfigurationProvider 는 설정 파일 을 읽 고 구성 요 소 를 초기 화 합 니 다.
 
startAllComponents 의 실현 은 대체로 다음 과 같다. 
//1、    Channel
supervisor.supervise(Channels,
      new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//2、    Channel      
for(Channel ch: materializedConfiguration.getChannels().values()){
  while(ch.getLifecycleState() != LifecycleState.START
      && !supervisor.isComponentInErrorState(ch)){
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
        Throwables.propagate(e);
    }
  }
}
//3、  SinkRunner
supervisor.supervise(SinkRunners,  
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//4、  SourceRunner
supervisor.supervise(SourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//5、       
this.loadMonitoring(); 

다음 코드 에서 볼 수 있 듯 이 먼저 채널 을 준비 해 야 합 니 다.Source 와 Sink 이 이 를 조작 하기 때문에 채널 을 초기 화 하 는 데 실패 하면 전체 프로 세 스 가 실패 합 니 다.그리고 싱 크 러 너 를 시작 하여 먼저 소비 자 를 준비 합 니 다.이 어 SourceRunner 를 시작 하여 채집 로 그 를 시작 합 니 다.여기 서 우 리 는 두 개의 단독 구성 요소 인 Lifecycle Supervisor 와 Monitor Service 가 있 는 것 을 발견 했다.하 나 는 구성 요소 수호 보초병 이 고 하 나 는 모니터링 서비스 이다.수호 보초병 은 이 구성 요소 들 을 수호 합 니 다.문제 가 생 겼 다 고 가정 하면 기본 정책 은 이 구성 요 소 를 자동 으로 다시 시작 합 니 다.
 
stopAllComponents 의 실현 은 대체로 다음 과 같다.
//1、    SourceRunner
supervisor.unsupervise(SourceRunners);
//2、    SinkRunner
supervisor.unsupervise(SinkRunners);
//3、    Channel
supervisor.unsupervise(Channels);
//4、    MonitorService
monitorServer.stop(); 

여기 서 알 수 있 듯 이 정지 순 서 는 소스,싱 크,채널,즉 생산 을 먼저 중단 한 다음 에 소 비 를 중단 하고 마지막 에 파 이 프 를 중단 하 는 것 이다.
 
응용 프로그램의 start 방법 코드 는 다음 과 같 습 니 다.
public synchronized void start() {
  for(LifecycleAware component : components) {
    supervisor.supervise(component,
        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  }
} 

애플 리 케 이 션 에 등 록 된 구성 요 소 를 순환 한 다음 에 보초병 을 지 키 는 것 입 니 다.기본 정책 은 문제 가 발생 하면 자동 으로 구성 요 소 를 다시 시작 합 니 다.만약 에 우리 가 reload 설정 파일 을 지원 한다 고 가정 하면 이전에 애플 리 케 이 션 을 시작 할 때 PollingProperties FileConfigurationProvider 구성 요 소 를 등록 한 적 이 있 습 니 다.즉,이 구성 요 소 는 보초병 에 의 해 지 켜 지고 문제 가 발생 하면 기본 정책 이 자동 으로
 
애플 리 케 이 션 이 닫 히 면 다음 동작 이 실 행 됩 니 다.    
public synchronized void stop() {
  supervisor.stop();
  if(monitorServer != null) {
    monitorServer.stop();
  }
} 

수호 초병 과 감시 서 비 스 를 폐쇄 하 는 것 이다.
 
이 기본 적 인 애플 리 케 이 션 분석 이 끝 났 습 니 다.보초병 이 어떻게 이 루어 졌 는 지 의문 이 많 습 니 다. 
 
전체 절 차 는 다음 과 같이 요약 할 수 있다.
1.먼저 명령 행 설정 을 초기 화 합 니 다.
2.설정 파일 을 읽 기;
3.reload 가 필요 한 지 여부 에 따라 설정 파일 의 구성 요 소 를 초기 화 합 니 다.reload 가 필요 하 다 면 Guava 이벤트 버스 를 사용 하여 구독 변 화 를 발표 합 니 다.
4.응용 프로그램 을 만 들 고 수호 보초병 을 만 들 며 모든 구성 요 소 를 중단 한 다음 에 모든 구성 요 소 를 시작 합 니 다.시작 순서:Channel,SinkRunner,SourceRunner,그리고 이 구성 요 소 를 수호 보초병 에 등록 하고 모니터링 서 비 스 를 초기 화 합 니 다.정지 순서:SourceRunner,SinkRunner,Channel;
5.설정 파일 이 정기 적 으로 reload 가 필요 하 다 면 Polling**ConfigurationProvider 를 수호 보초병 에 등록 해 야 합 니 다.
6.마지막 으로 가상 컴퓨터 를 등록 하여 갈 고 리 를 끄 고 보초병 과 감시 서 비 스 를 정지 합 니 다.
 
 
윤 훈 으로 이 루어 진 SourceRunner. 싱 크 러 너 와 스 레 드 를 만들어 작업 을 진행 할 것 입 니 다.전에 작업 방식 을 소 개 했 습 니 다.이제 보초병 의 실현 을 지 켜 보 자.
 
우선 LifecycleSupervisor 를 만 듭 니 다.
  //1、          
  supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
  //2、            
  monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
  //3、         
  monitorService = new ScheduledThreadPoolExecutor(10,
      new ThreadFactoryBuilder().setNameFormat(
          "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
          .build());
  monitorService.setMaximumPoolSize(20);
  monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
  //4、          
  purger = new Purger();
  //4.1、       
  needToPurge = false; 

LifecycleSupervisor 가 시 작 될 때 다음 과 같은 동작 을 합 니 다.
public synchronized void start() {
  monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
  lifecycleState = LifecycleState.START;
} 

우선 두 시간 간격 으로 청소 구성 요 소 를 실행 한 다음 상 태 를 시작 으로 바 꿉 니 다.LifecycleSupervisor 가 정지 되 었 을 때 모니터링 서 비 스 를 중단 하고 데 몬 구성 요소 상 태 를 STOP 로 업데이트 합 니 다.
  //1、          
  if (monitorService != null) {
    monitorService.shutdown();
    try {
      monitorService.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      logger.error("Interrupted while waiting for monitor service to stop");
    }
  }
  //2、           STOP,      stop      
  for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
    if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
      entry.getValue().status.desiredState = LifecycleState.STOP;
      entry.getKey().stop();
    }
  }
  //3、       
  if (lifecycleState.equals(LifecycleState.START)) {
    lifecycleState = LifecycleState.STOP;
  }
  //4、     
  supervisedProcesses.clear();
  monitorFutures.clear(); 

 
다음은 슈퍼 바 이 스 를 호출 하여 구성 요소 데 몬 을 진행 합 니 다:
  if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
  || this.monitorService.isTerminating()){
    //1、            ,            
  }
  //2、       
  Supervisoree process = new Supervisoree();
  process.status = new Status();
  //2.1、         
  process.policy = policy;
  //2.2、         ,        START
  process.status.desiredState = desiredState;
  process.status.error = false;
  //3、     ,             ,        
  MonitorRunnable monitorRunnable = new MonitorRunnable();
  monitorRunnable.lifecycleAware = lifecycleAware;
  monitorRunnable.supervisoree = process;
  monitorRunnable.monitorService = monitorService;

  supervisedProcesses.put(lifecycleAware, process);
  //4、           ,        ,        
  ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
      monitorRunnable, 0, 3, TimeUnit.SECONDS);
  monitorFutures.put(lifecycleAware, future);
}

 
지 킬 필요 가 없다 면 unsupervise 를 호출 해 야 합 니 다:
public synchronized void unsupervise(LifecycleAware lifecycleAware) {
  synchronized (lifecycleAware) {
    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
    //1.1、             
    supervisoree.status.discard = true;
    //1.2、                STOP
    this.setDesiredState(lifecycleAware, LifecycleState.STOP);
    //1.3、    
    lifecycleAware.stop();
  }
  //2、        
  supervisedProcesses.remove(lifecycleAware);
  //3、          
  monitorFutures.get(lifecycleAware).cancel(false);
  //3.1、  Purger      ,Purger      cancel   
  needToPurge = true;
  monitorFutures.remove(lifecycleAware);
}

 
다음은 MonitorRunnable 의 실현 을 살 펴 보 겠 습 니 다.구성 요소 상태 이전 이나 구성 요소 고장 복 구 를 담당 합 니 다.
public void run() {
  long now = System.currentTimeMillis();
  try {
    if (supervisoree.status.firstSeen == null) {
        supervisoree.status.firstSeen = now; //1、           
    }
    supervisoree.status.lastSeen = now; //2、            
    synchronized (lifecycleAware) {
        //3、             ,     
        if (supervisoree.status.discard || supervisoree.status.error) {
          return;
        }
        //4、            
        supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
        //5、                    ,           ,       
        if (!lifecycleAware.getLifecycleState().equals(
            supervisoree.status.desiredState)) {
          switch (supervisoree.status.desiredState) { 
            case START: //6、       ,     
             try {
                lifecycleAware.start();
              } catch (Throwable e) {
                if (e instanceof Error) {
                  supervisoree.status.desiredState = LifecycleState.STOP;
                  try {
                    lifecycleAware.stop();
                  } catch (Throwable e1) {
                    supervisoree.status.error = true;
                    if (e1 instanceof Error) {
                      throw (Error) e1;
                    }
                  }
                }
                supervisoree.status.failures++;
              }
              break;
            case STOP: //7、       ,     
              try {
                lifecycleAware.stop();
              } catch (Throwable e) {
                if (e instanceof Error) {
                  throw (Error) e;
                }
                supervisoree.status.failures++;
              }
              break;
            default:
          }
    } catch(Throwable t) {
    }
  }
}

 
위의 코드 가 간소화 되 었 을 때 전체 논 리 는 정시 에 구성 요소 의 상 태 를 수집 하 는 것 입 니 다.데 몬 구성 요소 와 구성 요소 의 상태 가 일치 하지 않 는 것 을 발견 하면 시작 하거나 중지 해 야 할 수도 있 습 니 다.즉,데 몬 모니터 는 구성 요소 가 실패 하면 자동 으로 시작 할 수 있 도록 보장 합 니 다.기본 정책 은 항상 실패 한 후에 다시 시작 합 니 다.또 하나의 정책 은 한 번 만 시작 하 는 것 입 니 다. 
  
 

좋은 웹페이지 즐겨찾기