Flink용 Process Function(하위 작업)
ProcessFunction
는 저차원 흐름 처리 작업으로 모든 (링이 없는) 흐름 프로그램의 기초 구축 모듈을 되돌려줍니다: 1. 이벤트(event) (이벤트) 2, 상태(state) (용착성, 일치성, keyed stream에만 있음) 3, 타이머(timers) (event time와processing time, keyed stream에만 있음)ProcessFunction
keyedstate와timers에 접근할 수 있다고 볼 수 있는 FlatMapFunction
로, 입력 흐름에서 수신된 모든 이벤트를 호출하여 처리합니다.용량 오류 상태에 대해 ProcessFunction은 RuntimeContext를 통해 Flink의 keyed state에 접근할 수 있으며, 다른 상태 함수가 keyed state에 접근하는 것과 같다.
타이머는 프로그램이
processing time
와 event time
의 변화에 반응할 수 있도록 하고 processElement(...)
의 호출마다 Context
대상을 받는다. 이 대상은 요소 이벤트 시간의 시간 스탬프와 TimeServer
에 접근할 수 있도록 한다.TimeServer
아직 발생하지 않은 이벤트-time나processing-time에 리셋을 등록할 수 있으며, 타이머의 시간이 도착하면onTimer(...)방법이 호출될 것이다.이 호출 기간 동안 모든 상태는 타이머를 만드는 키로 한정되며, 타이머가 키 제어 상태 (keyed states) 를 제어할 수 있도록 허용합니다.주의: 키 제어 상태 (keyed state) 와 타이머에 접근하려면 키 제어 흐름 (keyed stream) 에 Process Function을 적용해야 합니다.
stream.keyBy(...).process(new MyProcessFunction())
낮은 수준의 Join(Low-Level Joins)
두 입력 흐름에서 저차원 조작을 실현하기 위해 응용 프로그램은
CoProcessFunction
를 사용할 수 있다. 이 함수는 두 개의 서로 다른 입력 흐름을 연결하고 각각 processElement1(...)
와 processElement2(...)
를 호출하여 두 개의 서로 다른 입력 흐름의 기록을 얻을 수 있다.낮은 차원의join을 실현하려면 보통 아래의 모델에 따라 진행한다. 1. 하나의 입력원(또는 둘 다)을 위한 상태(state)2, 입력원에 있는 요소를 수신할 때 상태(state)3, 다른 입력원에 있는 요소를 수신할 때 상태를 탐색하고 연결 결과를 생성할 수 있다. 예를 들어 고객 데이터를 금융 거래에 연결하고 고객 정보를 저장하는 상태,만약에 무질서한 이벤트 상황에서 완전하고 확실한 연결에 관심이 있다면 고객 데이터 흐름의 수인이 거래를 통과했을 때, 당신은 타이머를 사용하여 거래의 연결을 평가하고 발표할 수 있습니다.
예.
다음 예는 키마다 계수를 유지하고 키를 업데이트하지 않은 키/count 쌍을 1분마다 보냅니다.1. count, 키와 last-modification-timestamp은ValueState에 저장되며 이ValueState는 키가 은근히 제한합니다.2. 모든 기록에 대해 Process Function은counter의 계수를 증가하고 마지막 업데이트 시간 스탬프를 설정합니다. 이 함수는 앞으로 1분 안에 리셋(event time 기반)4를 설정합니다. 리셋할 때마다 저장된count의 마지막 업데이트 시간에 따라callback의 이벤트 시간을 검사하고 일치하면 키/count를 보냅니다.
주의: 이 간단한 예는session window를 통해 실현할 수 있습니다. Process Function을 사용하는 것은 기본 모델을 설명하기 위해서입니다.Java 코드:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
//
DataStream> stream = ...;
// process function (keyed stream)
DataStream> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
/**
* The data type stored in the state
* state
*/
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
* ProcessFunction ,
*/
public class CountWithTimeoutFunction extends ProcessFunction, Tuple2> {
/** The state that is maintained by this process function */
/** process function */
private ValueState state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(Tuple2 value, Context ctx, Collector> out)
throws Exception {
// retrieve the current count
// count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
// state count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
// state
current.lastModified = ctx.timestamp();
// write the state back
//
state.update(current);
// schedule the next timer 60 seconds from the current event time
// 60
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector> out)
throws Exception {
// get the state for the key that scheduled the timer
// key
CountWithTimestamp result = state.value();
//
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2(result.key, result.count));
}
}
}
Scala 코드:
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector
//
val stream: DataStream[Tuple2[String, String]] = ...
// process function (keyed stream)
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
/**
* state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* ProcessFunction ,
*/
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
/** process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
// /
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
//
state.update(current)
// 60
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.