Flume-NG 의 Channel 과 Transaction 관계 (오리지널)

4453 단어 transaction
sink와source에서 (내장이든 사용자 정의든) 기본적으로 다음과 같은 코드가 있습니다. 이 코드는 sink의process 방법에서 사용되지만,source에서는 스스로 쓸 필요가 없습니다. source에서getChannelProcessor ().processEventBatch(events) 메서드에는 다음과 같은 항목이 자동으로 생성됩니다.
    ...

    Channel channel = getChannel();

    Transaction transaction = channel.getTransaction();

    Event event = null;

    Status result = Status.READY;

    transaction.begin();

    ...

    event = channel.take();//getChannelProcessor().processEvent(event);,    sink    source

    ...

    transaction.commit();

    transaction.rollback()

    transaction.close();

    ...

그럼 어떤 분들은 물어보시겠어요?위 코드에서 채널만 가져오면 될 것 같습니다. 데이터를 가져올 때 이벤트 = 채널만 필요하기 때문입니다.take () 또는
getChannelProcessor().processEvent(event)?    ?     transaction  ,        ,  !

그럼 왜요?이것은 확실히 좀 의심스럽지만, 사실은 채널이다.take () 작업은transaction입니다.doTake().즉, 실제put과take 등의 조작은transaction에서 진행되기 때문에 채널을 사용하려면 반드시transcation을 만들어야 사용할 수 있다.그리고 채널.getTransaction () 방법은 가져오거나 만들거나transcation을 만드는 것입니다. BasicChannelSemantics의 대응 코드는 다음과 같습니다.
@Override

  public Transaction getTransaction() {



    if (!initialized) {

      synchronized (this) {

        if (!initialized) {

          initialize();

          initialized = true;

        }

      }

    }



    BasicTransactionSemantics transaction = currentTransaction.get();//  transcation

    if (transaction == null || transaction.getState().equals(//  transaction           

            BasicTransactionSemantics.State.CLOSED)) {

      transaction = createTransaction();//  

      currentTransaction.set(transaction);//   currentTransaction

    }

    return transaction;

  }

이 방법은 모든 채널의 부류인 BasicChannel Semantics에서 구체적인 채널 클래스에서 보호된 abstract BasicTransaction Semantics createTransaction () 이라는 추상적인 방법을 실현하여 해당하는transaction 대상을 얻어야 한다.BasicChannel Semantics는transaction을take ()와transaction.put(event) 방법은 더 나아가take()와put(event) 방법으로 봉인한다. 이 두 가지 방법은 sink나source에 노출된 채널이다.take () 및 channel.put 방법.
 @Override

  public void put(Event event) throws ChannelException {

    BasicTransactionSemantics transaction = currentTransaction.get();

    Preconditions.checkState(transaction != null,

        "No transaction exists for this thread");

    transaction.put(event);

  }



  @Override

  public Event take() throws ChannelException {

    BasicTransactionSemantics transaction = currentTransaction.get();

    Preconditions.checkState(transaction != null,

        "No transaction exists for this thread");

    return transaction.take();

  }

이로써 업무 스케줄을 알 수 있겠지!
  Transaction transaction = channel.getTransaction();          ,           transcation     currentTransaction ,        transaction 。

좋은 웹페이지 즐겨찾기