Flume 구조 와 소스 코드 분석-핵심 구성 요소 분석-2
13971 단어 Flume
4.전체 절차
위 부분 에서 알 수 있 듯 이 Source 든 Sink 이 든 모두 Channel 에 의존 합 니 다.그러면 시작 할 때 채널 을 먼저 시작 한 다음 에 Source 나 Sink 을 시작 하면 됩 니 다.
Flume 은 두 가지 시작 방식 이 있 습 니 다.Embeddedagent 를 사용 하여 자바 응용 프로그램 에 끼 워 넣 거나 응용 프로그램 을 사용 하여 하나의 프로 세 스 를 단독으로 시작 합 니 다.여 기 는 응용 프로그램 분석 을 위주 로 합 니 다.
먼저 org.apache.flume.node.Application 의 main 방법 으로 시작 합 니 다.
//1、 、
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
//2、
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
if (isZkConfigured) {
//3、 ZooKeeper , ZooKeeper , ,
} else {
//4、 ,
File configurationFile = new File(commandLine.getOptionValue('f'));
if (!configurationFile.exists()) {
throw new ParseException(
"The specified configuration file does not exist: " + path);
}
List<LifecycleAware> components = Lists.newArrayList();
if (reload) { //5、 reload ,
//5.1、 Guava
EventBus eventBus = new EventBus(agentName + "-event-bus");
//5.2、 , , 30s
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components); //5.3、 Application
//5.4、 ,EventBus Application @Subscribe
eventBus.register(application);
} else { //5、 reload
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(
agentName, configurationFile);
application = new Application();
//6.2、 Flume
application.handleConfigurationEvent(configurationProvider
.getConfiguration());
}
}
//7、 Flume
application.start();
//8、 , Application stop
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
상기 절 차 는 핵심 코드 중의 일부분 만 추출 했다.예 를 들 어 ZK 의 실현 은 무시 되 었 고 Flume 시작 은 대체적으로 다음 과 같다.
1.명령 행 인자 읽 기;
2.설정 파일 읽 기;
3.reload 가 필요 한 지 여부 에 따라 다른 정책 으로 Flume 을 초기 화 합 니 다.reload 가 필요 하 다 면 Guava 의 이벤트 버스 를 사용 하여 이 루어 집 니 다.Application 의 handle ConfigurationEvent 는 이벤트 구독 자 이 고,PollingProperties FileConfigurationProvider 는 이벤트 게시 자 이 며,정기 적 으로 교대 훈련 을 통 해 파일 변경 여 부 를 확인 합 니 다.변경 되면 프로필 을 다시 읽 고 프로필 이벤트 변경 을 발표 합 니 다.handle ConfigurationEvent 는 이 설정 변경 을 받 고 초기 화 됩 니 다.
4.애플 리 케 이 션 을 시작 하고 가상 컴퓨터 를 등록 하여 갈 고 리 를 닫 습 니 다.
handle ConfigurationEvent 방법 은 간단 합 니 다.먼저 stopAllComponents 를 호출 하여 모든 구성 요 소 를 중단 한 다음 startAllComponents 를 호출 하여 설정 파일 을 사용 하여 모든 구성 요 소 를 초기 화 합 니 다.
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
}
Materialized Configuration 은 Flume 이 실 행 될 때 필요 한 구성 요 소 를 저장 합 니 다.Source,Channel,Sink,SourceRunner,SinkRunner 등 은 ConfigurationProvider 를 통 해 초기 화 됩 니 다.예 를 들 어 PollingProperties FileConfigurationProvider 는 설정 파일 을 읽 고 구성 요 소 를 초기 화 합 니 다.
startAllComponents 의 실현 은 대체로 다음 과 같다.
//1、 Channel
supervisor.supervise(Channels,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//2、 Channel
for(Channel ch: materializedConfiguration.getChannels().values()){
while(ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Throwables.propagate(e);
}
}
}
//3、 SinkRunner
supervisor.supervise(SinkRunners,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//4、 SourceRunner
supervisor.supervise(SourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//5、
this.loadMonitoring();
다음 코드 에서 볼 수 있 듯 이 먼저 채널 을 준비 해 야 합 니 다.Source 와 Sink 이 이 를 조작 하기 때문에 채널 을 초기 화 하 는 데 실패 하면 전체 프로 세 스 가 실패 합 니 다.그리고 싱 크 러 너 를 시작 하여 먼저 소비 자 를 준비 합 니 다.이 어 SourceRunner 를 시작 하여 채집 로 그 를 시작 합 니 다.여기 서 우 리 는 두 개의 단독 구성 요소 인 Lifecycle Supervisor 와 Monitor Service 가 있 는 것 을 발견 했다.하 나 는 구성 요소 수호 보초병 이 고 하 나 는 모니터링 서비스 이다.수호 보초병 은 이 구성 요소 들 을 수호 합 니 다.문제 가 생 겼 다 고 가정 하면 기본 정책 은 이 구성 요 소 를 자동 으로 다시 시작 합 니 다.
stopAllComponents 의 실현 은 대체로 다음 과 같다.
//1、 SourceRunner
supervisor.unsupervise(SourceRunners);
//2、 SinkRunner
supervisor.unsupervise(SinkRunners);
//3、 Channel
supervisor.unsupervise(Channels);
//4、 MonitorService
monitorServer.stop();
여기 서 알 수 있 듯 이 정지 순 서 는 소스,싱 크,채널,즉 생산 을 먼저 중단 한 다음 에 소 비 를 중단 하고 마지막 에 파 이 프 를 중단 하 는 것 이다.
응용 프로그램의 start 방법 코드 는 다음 과 같 습 니 다.
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
애플 리 케 이 션 에 등 록 된 구성 요 소 를 순환 한 다음 에 보초병 을 지 키 는 것 입 니 다.기본 정책 은 문제 가 발생 하면 자동 으로 구성 요 소 를 다시 시작 합 니 다.만약 에 우리 가 reload 설정 파일 을 지원 한다 고 가정 하면 이전에 애플 리 케 이 션 을 시작 할 때 PollingProperties FileConfigurationProvider 구성 요 소 를 등록 한 적 이 있 습 니 다.즉,이 구성 요 소 는 보초병 에 의 해 지 켜 지고 문제 가 발생 하면 기본 정책 이 자동 으로
애플 리 케 이 션 이 닫 히 면 다음 동작 이 실 행 됩 니 다.
public synchronized void stop() {
supervisor.stop();
if(monitorServer != null) {
monitorServer.stop();
}
}
수호 초병 과 감시 서 비 스 를 폐쇄 하 는 것 이다.
이 기본 적 인 애플 리 케 이 션 분석 이 끝 났 습 니 다.보초병 이 어떻게 이 루어 졌 는 지 의문 이 많 습 니 다.
전체 절 차 는 다음 과 같이 요약 할 수 있다.
1.먼저 명령 행 설정 을 초기 화 합 니 다.
2.설정 파일 을 읽 기;
3.reload 가 필요 한 지 여부 에 따라 설정 파일 의 구성 요 소 를 초기 화 합 니 다.reload 가 필요 하 다 면 Guava 이벤트 버스 를 사용 하여 구독 변 화 를 발표 합 니 다.
4.응용 프로그램 을 만 들 고 수호 보초병 을 만 들 며 모든 구성 요 소 를 중단 한 다음 에 모든 구성 요 소 를 시작 합 니 다.시작 순서:Channel,SinkRunner,SourceRunner,그리고 이 구성 요 소 를 수호 보초병 에 등록 하고 모니터링 서 비 스 를 초기 화 합 니 다.정지 순서:SourceRunner,SinkRunner,Channel;
5.설정 파일 이 정기 적 으로 reload 가 필요 하 다 면 Polling**ConfigurationProvider 를 수호 보초병 에 등록 해 야 합 니 다.
6.마지막 으로 가상 컴퓨터 를 등록 하여 갈 고 리 를 끄 고 보초병 과 감시 서 비 스 를 정지 합 니 다.
윤 훈 으로 이 루어 진 SourceRunner. 싱 크 러 너 와 스 레 드 를 만들어 작업 을 진행 할 것 입 니 다.전에 작업 방식 을 소 개 했 습 니 다.이제 보초병 의 실현 을 지 켜 보 자.
우선 LifecycleSupervisor 를 만 듭 니 다.
//1、
supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
//2、
monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
//3、
monitorService = new ScheduledThreadPoolExecutor(10,
new ThreadFactoryBuilder().setNameFormat(
"lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
.build());
monitorService.setMaximumPoolSize(20);
monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
//4、
purger = new Purger();
//4.1、
needToPurge = false;
LifecycleSupervisor 가 시 작 될 때 다음 과 같은 동작 을 합 니 다.
public synchronized void start() {
monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
lifecycleState = LifecycleState.START;
}
우선 두 시간 간격 으로 청소 구성 요 소 를 실행 한 다음 상 태 를 시작 으로 바 꿉 니 다.LifecycleSupervisor 가 정지 되 었 을 때 모니터링 서 비 스 를 중단 하고 데 몬 구성 요소 상 태 를 STOP 로 업데이트 합 니 다.
//1、
if (monitorService != null) {
monitorService.shutdown();
try {
monitorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for monitor service to stop");
}
}
//2、 STOP, stop
for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses.entrySet()) {
if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
entry.getValue().status.desiredState = LifecycleState.STOP;
entry.getKey().stop();
}
}
//3、
if (lifecycleState.equals(LifecycleState.START)) {
lifecycleState = LifecycleState.STOP;
}
//4、
supervisedProcesses.clear();
monitorFutures.clear();
다음은 슈퍼 바 이 스 를 호출 하여 구성 요소 데 몬 을 진행 합 니 다:
if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
|| this.monitorService.isTerminating()){
//1、 ,
}
//2、
Supervisoree process = new Supervisoree();
process.status = new Status();
//2.1、
process.policy = policy;
//2.2、 , START
process.status.desiredState = desiredState;
process.status.error = false;
//3、 , ,
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
//4、 , ,
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}
지 킬 필요 가 없다 면 unsupervise 를 호출 해 야 합 니 다:
public synchronized void unsupervise(LifecycleAware lifecycleAware) {
synchronized (lifecycleAware) {
Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
//1.1、
supervisoree.status.discard = true;
//1.2、 STOP
this.setDesiredState(lifecycleAware, LifecycleState.STOP);
//1.3、
lifecycleAware.stop();
}
//2、
supervisedProcesses.remove(lifecycleAware);
//3、
monitorFutures.get(lifecycleAware).cancel(false);
//3.1、 Purger ,Purger cancel
needToPurge = true;
monitorFutures.remove(lifecycleAware);
}
다음은 MonitorRunnable 의 실현 을 살 펴 보 겠 습 니 다.구성 요소 상태 이전 이나 구성 요소 고장 복 구 를 담당 합 니 다.
public void run() {
long now = System.currentTimeMillis();
try {
if (supervisoree.status.firstSeen == null) {
supervisoree.status.firstSeen = now; //1、
}
supervisoree.status.lastSeen = now; //2、
synchronized (lifecycleAware) {
//3、 ,
if (supervisoree.status.discard || supervisoree.status.error) {
return;
}
//4、
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
//5、 , ,
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
switch (supervisoree.status.desiredState) {
case START: //6、 ,
try {
lifecycleAware.start();
} catch (Throwable e) {
if (e instanceof Error) {
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
} catch (Throwable e1) {
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
}
}
supervisoree.status.failures++;
}
break;
case STOP: //7、 ,
try {
lifecycleAware.stop();
} catch (Throwable e) {
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
}
} catch(Throwable t) {
}
}
}
위의 코드 가 간소화 되 었 을 때 전체 논 리 는 정시 에 구성 요소 의 상 태 를 수집 하 는 것 입 니 다.데 몬 구성 요소 와 구성 요소 의 상태 가 일치 하지 않 는 것 을 발견 하면 시작 하거나 중지 해 야 할 수도 있 습 니 다.즉,데 몬 모니터 는 구성 요소 가 실패 하면 자동 으로 시작 할 수 있 도록 보장 합 니 다.기본 정책 은 항상 실패 한 후에 다시 시작 합 니 다.또 하나의 정책 은 한 번 만 시작 하 는 것 입 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Flume Processorsactive 상태 프로세스가 죽어야 다른 작업이 바뀔 수 있습니다.그 많은sink가 도대체 누가 먼저 일을 하는지, 권중에 따라 누구의 권중이 높은지, 누가 먼저 일을 하는지, 일반적인 고장 전이를 하면 2개sink...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.