flume 소스 코드 분석
5833 단어 Flume
flume 홈 페이지:http://flume.apache.org/
Flume 문서:http://flume.apache.org/FlumeUserGuide.html,http://flume.apache.org/FlumeDeveloperGuide.html
설치:홈 페이지 에서 flume 를 다운로드 한 후 압축 해제
시작:nohup bin/flume-ng 에이전트--conf
Flume 은 주로 세 부분 을 포함 합 니 다.source,channel,sink.source 는 데 이 터 를 수신 하 는 데 사 용 됩 니 다.channel 은 버퍼 채널 이 고 sink 은 데 이 터 를 목적 단 으로 보 냅 니 다.source 는 여러 채널 을 설정 할 수 있 습 니 다.channel 은 channel Select 를 통 해 그 channel 로 보 낼 수 있 습 니 다.각 channel 로 보 낼 수도 있 고 매개 변 수 를 설정 할 수도 있 습 니 다.특정한 값 을 만족 시 킬 때 특정한 channel 로 보 낼 수도 있 습 니 다.채널 마다 여러 개의 sink 을 설정 할 수 있 습 니 다.sinkprocess 를 통 해 load balance 나 failover 를 만 듭 니 다.
flume-ng 명령 은 애플 리 케 이 션 의 main 함 수 를 호출 합 니 다.reloadconfigure 파일 이 필요 하 다 면 애플 리 케 이 션 을 eventBus 에 등록 하고 파일 이 변경 되 었 을 때 애플 리 케 이 션 의 handle ConfigurationEvent 방법 을 호출 합 니 다.
public static void main(String[] args) {
Application application;
if(reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
application.start();
}
application 의 start 방법 은 슈퍼 visor.supervise()를 호출 합 니 다.이 방법 은 component 의 start 방법 을 호출 하려 고 시도 합 니 다.component 목록 에는 pollingProperties FileConfigurationProvider 대상 이 포함 되 어 있 습 니 다.이 대상 의 start 방법 은 스 레 드 를 시작 하여 파일 의 변경 을 감시 합 니 다.초기 상태 파일 은 변경 되 었 습 니 다.이 어 애플 리 케 이 션 의 handle ConfigurationEvent 방법 을 호출 합 니 다.
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
...에 있다 handle ConfigurationEvent 에서 먼저 호출 Properties FileConfigurationProvider 의 getConfiguration 방법 은 설정 파일 을 통 해 source,sink,channel 을 만 들 고 각 구성 요소 의 configure 방법 을 호출 한 다음 startAllComponents 방법 을 호출 하여 channel,source,sink 을 시작 하고 Monitor 를 불 러 와 flume 을 감시 하 는 metrics 를 사용 합 니 다.
private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
logger.info("Starting new configuration:{}", materializedConfiguration);
this.materializedConfiguration = materializedConfiguration;
for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try{
logger.info("Starting Channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e){
logger.error("Error while starting {}", entry.getValue(), e);
}
}
/*
* Wait for all channels to start.
*/
for(Channel ch: materializedConfiguration.getChannels().values()){
while(ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)){
try {
logger.info("Waiting for channel: " + ch.getName() +
" to start. Sleeping for 500 ms");
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for channel to start.", e);
Throwables.propagate(e);
}
}
}
for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
.entrySet()) {
try{
logger.info("Starting Sink " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
for (Entry<String, SourceRunner> entry : materializedConfiguration
.getSourceRunners().entrySet()) {
try{
logger.info("Starting Source " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
this.loadMonitoring();
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Flume Processorsactive 상태 프로세스가 죽어야 다른 작업이 바뀔 수 있습니다.그 많은sink가 도대체 누가 먼저 일을 하는지, 권중에 따라 누구의 권중이 높은지, 누가 먼저 일을 하는지, 일반적인 고장 전이를 하면 2개sink...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.