Flume-ng ThriftSource 원리 분석
6819 단어 Flume
Thrift IDL
Flume Thrift IDL은 클라이언트 패키지에서 다음과 같이 정의됩니다.
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list events),
}
: 이벤트는 C#에서 키워드이기 때문에 Thrift 컴파일러를 이용하여 클라이언트의 인터페이스를 생성할 때 모든 이벤트 키워드를 다른 이벤트로 바꾸어야 합니다.Thrift Service
Flume의 Source는
SinkRunner를 통한 PollableSource 인터페이스 관리 SourceEventDrivenSource 인터페이스를 사용하면 데이터를 직접 받아서 채널로 전송할 수 있습니다.예를 들어 ThriftSourceFlume Thrift Service의 구현 클래스는 코어 패키지에 있습니다.
public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
public static final String CONFIG_THREADS = "threads";
public static final String CONFIG_BIND = "bind";
public static final String CONFIG_PORT = "port";
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
public void start() {
//
...
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
//ThriftSourceProtocol Flume Thrift Service
args.processor(new ThriftSourceProtocol
.Processor(new ThriftSourceHandler()));
/**
* Start serving.
*/
servingExecutor.submit(new Runnable() {
@Override
public void run() {
server.serve();
}
});
...
}
Flume Thrift Service의 진정한 구현 클래스는 내부 클래스 ThriftSourceHandler
private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
// channel
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn("Thrift source " + getName() + " could not append events " +
"to the channel.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn("Thrift source %s could not append events to the " +
"channel.", getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Flume Processorsactive 상태 프로세스가 죽어야 다른 작업이 바뀔 수 있습니다.그 많은sink가 도대체 누가 먼저 일을 하는지, 권중에 따라 누구의 권중이 높은지, 누가 먼저 일을 하는지, 일반적인 고장 전이를 하면 2개sink...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.