Flink용 Process Function(하위 작업)

6995 단어
Process Function(프로세스 함수)ProcessFunction는 저차원 흐름 처리 작업으로 모든 (링이 없는) 흐름 프로그램의 기초 구축 모듈을 되돌려줍니다: 1. 이벤트(event) (이벤트) 2, 상태(state) (용착성, 일치성, keyed stream에만 있음) 3, 타이머(timers) (event time와processing time, keyed stream에만 있음)ProcessFunctionkeyedstate와timers에 접근할 수 있다고 볼 수 있는 FlatMapFunction로, 입력 흐름에서 수신된 모든 이벤트를 호출하여 처리합니다.
용량 오류 상태에 대해 ProcessFunction은 RuntimeContext를 통해 Flink의 keyed state에 접근할 수 있으며, 다른 상태 함수가 keyed state에 접근하는 것과 같다.
타이머는 프로그램이 processing timeevent time의 변화에 반응할 수 있도록 하고 processElement(...)의 호출마다 Context 대상을 받는다. 이 대상은 요소 이벤트 시간의 시간 스탬프와 TimeServer에 접근할 수 있도록 한다.TimeServer 아직 발생하지 않은 이벤트-time나processing-time에 리셋을 등록할 수 있으며, 타이머의 시간이 도착하면onTimer(...)방법이 호출될 것이다.이 호출 기간 동안 모든 상태는 타이머를 만드는 키로 한정되며, 타이머가 키 제어 상태 (keyed states) 를 제어할 수 있도록 허용합니다.
주의: 키 제어 상태 (keyed state) 와 타이머에 접근하려면 키 제어 흐름 (keyed stream) 에 Process Function을 적용해야 합니다.
stream.keyBy(...).process(new MyProcessFunction())

낮은 수준의 Join(Low-Level Joins)
두 입력 흐름에서 저차원 조작을 실현하기 위해 응용 프로그램은 CoProcessFunction를 사용할 수 있다. 이 함수는 두 개의 서로 다른 입력 흐름을 연결하고 각각 processElement1(...)processElement2(...)를 호출하여 두 개의 서로 다른 입력 흐름의 기록을 얻을 수 있다.
낮은 차원의join을 실현하려면 보통 아래의 모델에 따라 진행한다. 1. 하나의 입력원(또는 둘 다)을 위한 상태(state)2, 입력원에 있는 요소를 수신할 때 상태(state)3, 다른 입력원에 있는 요소를 수신할 때 상태를 탐색하고 연결 결과를 생성할 수 있다. 예를 들어 고객 데이터를 금융 거래에 연결하고 고객 정보를 저장하는 상태,만약에 무질서한 이벤트 상황에서 완전하고 확실한 연결에 관심이 있다면 고객 데이터 흐름의 수인이 거래를 통과했을 때, 당신은 타이머를 사용하여 거래의 연결을 평가하고 발표할 수 있습니다.
예.
다음 예는 키마다 계수를 유지하고 키를 업데이트하지 않은 키/count 쌍을 1분마다 보냅니다.1. count, 키와 last-modification-timestamp은ValueState에 저장되며 이ValueState는 키가 은근히 제한합니다.2. 모든 기록에 대해 Process Function은counter의 계수를 증가하고 마지막 업데이트 시간 스탬프를 설정합니다. 이 함수는 앞으로 1분 안에 리셋(event time 기반)4를 설정합니다. 리셋할 때마다 저장된count의 마지막 업데이트 시간에 따라callback의 이벤트 시간을 검사하고 일치하면 키/count를 보냅니다.
주의: 이 간단한 예는session window를 통해 실현할 수 있습니다. Process Function을 사용하는 것은 기본 모델을 설명하기 위해서입니다.Java 코드:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


//       
DataStream> stream = ...;

//   process function         (keyed stream) 
DataStream> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 * state        
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 * ProcessFunction   ,         
 */
public class CountWithTimeoutFunction extends ProcessFunction, Tuple2> {

    /** The state that is maintained by this process function */
    /** process function      */
    private ValueState state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2 value, Context ctx, Collector> out)
            throws Exception {

        // retrieve the current count
        //      count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
         //    state   count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        //  state                 
        current.lastModified = ctx.timestamp();

        // write the state back
        //      
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        //               60     
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        //        key   
        CountWithTimestamp result = state.value();

        //                   
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2(result.key, result.count));
        }
    }
}

Scala 코드:
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector

//        
val stream: DataStream[Tuple2[String, String]] = ...

//   process function         (keyed stream) 
val result: DataStream[Tuple2[String, Long]] = stream
  .keyBy(0)
  .process(new CountWithTimeoutFunction())

/**
  *  state        
  */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
  * ProcessFunction   ,         
  */
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {

  /** process function       */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


  override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
    //        /    
 
    val current: CountWithTimestamp = state.value match {
      case null =>
        CountWithTimestamp(value._1, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, lastModified) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    //      
    state.update(current)

    //               60     
    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  }

  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
        out.collect((key, count))
      case _ =>
    }
  }
}

좋은 웹페이지 즐겨찾기