Flume 1.7 소스 분석 (3) 프로그램 입구
11832 단어 Flume빅데이터 학습의 길Flume 소스 분석
4 프로그램 입구
Flume를 시작하는 과정은 간단하게 2단계로 나눌 수 있다. 1.관련 프로필을 가져옵니다. (일반적으로 flume-conf.properties)2. 각 구성 요소를 시작합니다.특별히 설명하지 않아도 본고의 구성 요소는LifecycleAware 인터페이스를 실현한 클래스의 대상을 가리키는데 일반적으로 Source,Channel,Sink 등 3가지 대상이다.
4.1 시작 구성 가져오기
4.1.1 Main 함수
Flume를 시작하는 Main 함수는 flume-ng-node 모듈의 org에 있습니다.apache.flume.node.Application.이 함수의 기능은 다음과 같은 세 단계로 간단하게 나눌 수 있다.commons 사용cli 클래스 명령행 매개 변수 가져오기 (시작할 때 들어오는 매개 변수) 2.시작 파라미터에 따라 설정된 읽기 방식을 읽습니다.설정을 읽는 방식은 총 4가지로 구성이 zookeeper에 저장되는지 로컬properties 파일에 저장되는지,reload (자동 재부팅 프로필) 여부에 따라 4가지로 나뉜다.3. 상응하는 설정에 따라 프로그램을 시작하고 갈고리를 닫는 것을 등록한다.다음은 properties 파일을 다시 불러오지 않는 방식으로 예를 들면 주요 코드는 다음과 같다.
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
// Application , (components), LifecycleSupervisor。
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
//start , 。
application.start();
// , kill 。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
public void run() {
appReference.stop();
}
});
위의 코드는 두 가지가 비교적 관건적이다.
4
4.1.2 물적 배치
configurationProvider.getConfiguration () 방법은 주로 다음과 같은 두 가지를 했다.구성 파일(flume-conf.properties)을 읽고 AgentConfiguration 객체에 저장합니다.
public static class AgentConfiguration {
private final String agentName;
private String sources;
private String sinks;
private String channels;
private String sinkgroups;
private final Map<String, ComponentConfiguration> sourceConfigMap;
private final Map<String, ComponentConfiguration> sinkConfigMap;
private final Map<String, ComponentConfiguration> channelConfigMap;
private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
private Map<String, Context> sourceContextMap;
private Map<String, Context> sinkContextMap;
private Map<String, Context> channelContextMap;
private Map<String, Context> sinkGroupContextMap;
private Set<String> sinkSet;
private Set<String> sourceSet;
private Set<String> channelSet;
private Set<String> sinkgroupSet;
}
이 단계는 단지 분류된 텍스트 형식의 설정 항목만 완성했을 뿐이다.2. 구성에서 각 어셈블리 인스턴스를 생성하여 MaterializedConfiguration 인스턴스에 추가합니다.
public interface MaterializedConfiguration {
public void addSourceRunner(String name, SourceRunner sourceRunner);
public void addSinkRunner(String name, SinkRunner sinkRunner);
public void addChannel(String name, Channel channel);
public ImmutableMap getSourceRunners();
public ImmutableMap getSinkRunners();
public ImmutableMap getChannels();
}
이 실례에서 설정 파일에 설정된 모든source,channel,sink를 얻을 수 있으며, 또한 '물화' 이며, 관련 구성 요소의 실례를 직접 얻을 수 있다.
4.2 모든 구성 요소 시작
4.2.1 새 구성으로 재시작
위의 MaterializedConfiguration 실례가 있으면 구성 요소를 시작할 수 있습니다.handleConfiguration Event 메서드에서는 먼저 모든 구성 요소를 중지한 다음 모든 구성 요소를 시작합니다.
stopAllComponents();
startAllComponents(conf); // conf MaterializedConfiguration。
startAllComponents 방법에서는 구성 요소 목록(SourceRunners,SinkRunners,Channels)을 옮겨다니며 각각 슈퍼vise 방법을 호출합니다.Channel의 경우
for (Entry entry :
materializedConfiguration.getChannels().entrySet()) {
try {
logger.info("Starting Channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
이 슈퍼vise 방법은 간단하게 말하면 해당 구성 요소의 상태를 기대하는 상태로 바꾸는 것이다.예를 들어 위 코드의 LifecycleState.START는 바로 기대하는 상태입니다.
4.2.2 LifecycleSupervisor
이전 섹션의 Supervisor는 Lifecycle Supervisor 객체입니다.앞에서 말했듯이 Application을 만들 때Lifecycle Supervisor 대상을 초기화했습니다. 바로 이곳의 슈퍼visor입니다.이 대상은 각 구성 요소의 생명주기 관리자로 이해되며 모든 구성 요소의 상태를 실시간으로 감시하고 원하는 상태(desiredState)가 아니면 상태 전환을 합니다.
이전 코드에서 슈퍼바이저를 호출했습니다.Supervise 방법, 다음은 Supervise 방법을 분석해 보겠습니다.
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
//
Supervisoree process = new Supervisoree();
process.status = new Status();
process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}
모든 구성 요소가 Lifecycle Aware 인터페이스를 실현했기 때문에, 이곳의 슈퍼vise 방법은 Lifecycle Aware 인터페이스의 대상을 전송합니다.
Supervisoree 대상을 만들었습니다. 말 그대로 감시된 대상입니다. 이 대상은 다음과 같은 몇 가지 상태가 있습니다. IDLE, START, STOP, ERROR입니다.scheduleWithFixedDelay는 3초 간격으로 모니터링 작업을 터치합니다.
4.2.3 MonitorRunnable
MonitorRunnable에서는 주로 구성 요소의 상태를 검사하고lifecycleState에서desiredState로의 전환을 실현한다.
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) { }
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) { }
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState);
}
여기까지 모니터링 프로세스를 볼 수 있습니다. 구성 요소 자체의 start와 stop 방법을 사용해서 시작하고 정지합니다.앞서 언급한 3가지 유형의 구성 요소는 SourceRunner, Channel, SinkRunner이고 Channel의 start는 초기화 계수기만 만들었기 때문에 실질적인 내용이 없기 때문에 다음은 SourceRunner의 시작(Source에서 데이터를 쓰고 Channel까지)과 SinkRunner의 시작(Channel에서 데이터를 가져와서 Sink에 쓰기)에서 설명을 전개한다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Flume Processorsactive 상태 프로세스가 죽어야 다른 작업이 바뀔 수 있습니다.그 많은sink가 도대체 누가 먼저 일을 하는지, 권중에 따라 누구의 권중이 높은지, 누가 먼저 일을 하는지, 일반적인 고장 전이를 하면 2개sink...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.