Flink 에서 window Trigger 의 이해
6528 단어 빅 데이터
1. window 는 어떻게 구분 합 니까?
window 의 구분 은 데이터 자체 와 상 관 없 이 시스템 에 의 해 정 의 됩 니 다.
dataStreamSource.flatMap(new MyFlatMapFunction())
.keyBy("")
.timeWindow(Time.seconds(10))
.allowedLateness(Time.seconds(12)) //
Time. seconds (10) 는 10 초 동안 창 을 나 누 는 것 을 표시 합 니 다. 시스템 은 창 을 이렇게 나 눕 니 다.
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
어떤 데이터 가 도 착 했 을 때, 그것 이 어느 창 에 속 하 는 지 이미 결정 되 었 다.
2. 워 터 마크 생 성 방식 은?얼마나 자주 발생 합 니까?
1. 워 터 마크 를 만 드 는 방식 은 두 가지 가 있다.
2. 워 터 마크 간격 생 성
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks {
private final long maxOutofOrderness = 5000;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(PacketDescriptor element, long previousElementTimestamp) {
long timestamp = element.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
// getCurrentWatermark() , watermark watermark, watermark。
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutofOrderness);
}
}
3. window 트리거 계산 조건?
Flink 에서 이벤트 - time 모드 를 사용 할 때 기본적으로 제공 하 는 window 는 TumblingEventTimeWindows, SlidingEventTimeWindows, EventTimeSessionWindow 등 이 있 습 니 다.이 세 가지 기본 적 인 window operator 에 서 는 기본 적 인 trigger 를 제공 합 니 다. 우 리 는 이 세 가지 방법 을 사용 할 때 trigger 를 쓰 지 않 고 window process 를 직접 씁 니 다. 예 를 들 어 reduce ().이 세 개의 window 에 있 는 getDefault Trigger () 방법 은 EventTimeTrigger 를 사용 하기 때 문 입 니 다. 즉, 이것 은 우리 에 게 기본 적 인 trigger 를 제공 하기 때 문 입 니 다.
EventTimeTrigger.java
@Override
// window
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
//trigger
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public Trigger
window size = 10s 첫 번 째 데이터 (2019 - 06 - 03 17: 00: 02) 가 도 착 했 을 때 이 데이터 가 속 한 창 은 [2019 - 06 - 03 17: 00: 00, 2019 - 06 - 03 17: 00: 10) 입 니 다.
세 번 째 데이터 (2019 - 06 - 03 17: 00: 15) 가 도 착 했 을 때 이 데이터 가 속 한 창 은 [2019 - 06 - 03 17: 00: 10, 2019 - 06 - 03 17: 00: 20] 입 니 다.
Window Operator. java 에서 processElement () 방법
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
지각 이 너무 많은 데이터 에 대해 서 는 isWindow Late (window) 방법, 즉 window. max Timestamp () + allowed Latenes < = watermark 가 작 으 면 window 가 만 료 되 고 window 대상 을 삭제 하 며 window 상 태 를 삭제 해 야 합 니 다. 크 면 이 창 이 삭제 되 지 않 았 습 니 다. 발 창 에 직접 접촉 하여 계산 하고 현재 데 이 터 를 이전 창 에서 계산 한 결과 와 연산 하여 merge 작업 을 해 야 합 니 다.
참고:https://www.jianshu.com/p/c8c789ff5570 https://blog.csdn.net/lmalds/article/details/52704170 https://www.jianshu.com/p/9db56f81fa2a
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.