Flume-ng ThriftSource 원리 분석

6819 단어 Flume

Thrift IDL


Flume Thrift IDL은 클라이언트 패키지에서 다음과 같이 정의됩니다.
namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list events),
}
: 이벤트는 C#에서 키워드이기 때문에 Thrift 컴파일러를 이용하여 클라이언트의 인터페이스를 생성할 때 모든 이벤트 키워드를 다른 이벤트로 바꾸어야 합니다.

Thrift Service


Flume의 Source는
SinkRunner를 통한 PollableSource 인터페이스 관리 SourceEventDrivenSource 인터페이스를 사용하면 데이터를 직접 받아서 채널로 전송할 수 있습니다.예를 들어 ThriftSourceFlume Thrift Service의 구현 클래스는 코어 패키지에 있습니다.
public class ThriftSource extends AbstractSource implements Configurable,
  EventDrivenSource {
  public static final String CONFIG_THREADS = "threads";
  public static final String CONFIG_BIND = "bind";
  public static final String CONFIG_PORT = "port";
  private Integer port;
  private String bindAddress;
  private int maxThreads = 0;
  private SourceCounter sourceCounter;
  private TServer server;
  private TServerTransport serverTransport;
  private ExecutorService servingExecutor;
  public void start() {
      // 
      ...
      args.protocolFactory(new TCompactProtocol.Factory());
      args.inputTransportFactory(new TFastFramedTransport.Factory());
      args.outputTransportFactory(new TFastFramedTransport.Factory());

      //ThriftSourceProtocol Flume Thrift Service 
      args.processor(new ThriftSourceProtocol
        .Processor(new ThriftSourceHandler()));
     /**
     * Start serving.
     */
    servingExecutor.submit(new Runnable() {
      @Override
      public void run() {
        server.serve();
      }
    });
    ...
  }

Flume Thrift Service의 진정한 구현 클래스는 내부 클래스 ThriftSourceHandler
  private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {

    @Override
    public Status append(ThriftFlumeEvent event) throws TException {
      Event flumeEvent = EventBuilder.withBody(event.getBody(),
        event.getHeaders());

      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      try {
        // channel
        getChannelProcessor().processEvent(flumeEvent);
      } catch (ChannelException ex) {
        logger.warn("Thrift source " + getName() + " could not append events " +
          "to the channel.", ex);
        return Status.FAILED;
      }
      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();
      return Status.OK;
    }

    @Override
    public Status appendBatch(List events) throws TException {
      sourceCounter.incrementAppendBatchReceivedCount();
      sourceCounter.addToEventReceivedCount(events.size());

      List flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

      try {
        getChannelProcessor().processEventBatch(flumeEvents);
      } catch (ChannelException ex) {
        logger.warn("Thrift source %s could not append events to the " +
          "channel.", getName());
        return Status.FAILED;
      }

      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
      return Status.OK;
    }
  }

좋은 웹페이지 즐겨찾기