Flink 에서 window Trigger 의 이해

6528 단어 빅 데이터
Flink 를 처음 알 았 습 니 다. window 와 관련 된 몇 가지 개념 에 대해 잘 모 르 겠 습 니 다. 먼저 몇 가지 관건 적 인 문 제 를 던 집 니 다!
  • window 는 어떻게 구분 합 니까?
  • 워 터 마크 생 성 방식 은?얼마나 자주 발생 합 니까?
  • window 트리거 계산 조건?
  • 지연 데 이 터 는 어떻게 처리 합 니까?

  • 1. window 는 어떻게 구분 합 니까?
    window 의 구분 은 데이터 자체 와 상 관 없 이 시스템 에 의 해 정 의 됩 니 다.
    dataStreamSource.flatMap(new MyFlatMapFunction())
    				.keyBy("")
    				.timeWindow(Time.seconds(10))
                    .allowedLateness(Time.seconds(12))	//       
    

    Time. seconds (10) 는 10 초 동안 창 을 나 누 는 것 을 표시 합 니 다. 시스템 은 창 을 이렇게 나 눕 니 다.
    [00:00:00,00:00:10)
    [00:00:10,00:00:20)
    ...
    [00:00:50,00:01:00)
    

    어떤 데이터 가 도 착 했 을 때, 그것 이 어느 창 에 속 하 는 지 이미 결정 되 었 다.
    2. 워 터 마크 생 성 방식 은?얼마나 자주 발생 합 니까?
    1. 워 터 마크 를 만 드 는 방식 은 두 가지 가 있다.
  • Periodic: 일정한 시간 간격 이나 일정한 기록 항목 에 도달 하면 워 터 마크 가 생 길 수 있 습 니 다.
  • Punctuated: 이벤트 타임 을 바탕 으로 일정한 논 리 를 통 해 워 터 마크 를 만 듭 니 다. 예 를 들 어 데 이 터 를 받 으 면 워 터 마크 가 생 깁 니 다.

  • 2. 워 터 마크 간격 생 성
  • 워 터 마크 가 생 성 하 는 시간 간격 (n 밀리초 당) 은 ExecutionConfig. setAutoWatermarkInterval () 을 통 해 정 의 됩 니 다. 기본 값 은 0 입 니 다.
  • public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks {
        private final long maxOutofOrderness = 5000;
        private long currentMaxTimestamp;
        
        @Override
        public long extractTimestamp(PacketDescriptor element, long previousElementTimestamp) {
            long timestamp = element.getTime();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
        
        @Override
        //        getCurrentWatermark()   ,     watermark        watermark,      watermark。
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - maxOutofOrderness);
        }
    }
    

    3. window 트리거 계산 조건?
    Flink 에서 이벤트 - time 모드 를 사용 할 때 기본적으로 제공 하 는 window 는 TumblingEventTimeWindows, SlidingEventTimeWindows, EventTimeSessionWindow 등 이 있 습 니 다.이 세 가지 기본 적 인 window operator 에 서 는 기본 적 인 trigger 를 제공 합 니 다. 우 리 는 이 세 가지 방법 을 사용 할 때 trigger 를 쓰 지 않 고 window process 를 직접 씁 니 다. 예 를 들 어 reduce ().이 세 개의 window 에 있 는 getDefault Trigger () 방법 은 EventTimeTrigger 를 사용 하기 때 문 입 니 다. 즉, 이것 은 우리 에 게 기본 적 인 trigger 를 제공 하기 때 문 입 니 다.
    EventTimeTrigger.java
    @Override
    //       window     
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    	if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
    		// if the watermark is already past the window fire immediately
    		return TriggerResult.FIRE;
    	} else {
    		ctx.registerEventTimeTimer(window.maxTimestamp());
    		return TriggerResult.CONTINUE;
    	}
    }
    
    @Override
    //trigger          
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    	return time == window.maxTimestamp() ?
    		TriggerResult.FIRE :
    		TriggerResult.CONTINUE;
    }
    
    @Override
    public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {
    	return EventTimeTrigger.create();
    }
    

    window size = 10s 첫 번 째 데이터 (2019 - 06 - 03 17: 00: 02) 가 도 착 했 을 때 이 데이터 가 속 한 창 은 [2019 - 06 - 03 17: 00: 00, 2019 - 06 - 03 17: 00: 10) 입 니 다.
  • MyTimestamps AndWatermarks 의 getCurrentWatermark () 방법 을 호출 하여 watermark 를 2019 - 06 - 03 16: 59: 57
  • 로 계산 합 니 다.
  • onElement () 방법 을 호출 합 니 다. window. max Timestamp () 는 2019 - 06 - 03 17: 00: 10 ctx. getCurrent Watermark () 는 2019 - 06 - 03 16: 59: 57 if 조건 이 만족 하지 않 으 면 else 에 trigger 를 등록 합 니 다. 시간 은 2019 - 06 - 03 17: 00: 10 입 니 다. 바 텀 은 set 이 고 같은 window 와 같은 시간의 trigger 입 니 다. 하나만 등록 합 니 다
  • 두 번 째 데이터 (2019 - 06 - 03 17: 00: 11) 가 도 착 했 을 때 이 데이터 가 속 한 창 은 [2019 - 06 - 03 17: 00: 10, 2019 - 06 - 03 17: 00: 20] 입 니 다.
  • MyTimestamps AndWatermarks 의 getCurrentWatermark () 방법 으로 watermark 를 계산 합 니 다: 2019 - 06 - 03 17: 00: 06
  • onElement () 방법 을 호출 합 니 다. 이전 창 window. max Timestamp () 는 2019 - 06 - 03 17: 00: 10 ctx. getCurrentWatermark () 는 2019 - 06 - 03 17: 00: 06 if 조건 이 만족 하지 않 으 면 else 에 trigger 를 등록 합 니 다. 시간 은 2019 - 06 - 03 17: 00: 20
  • 입 니 다.
    세 번 째 데이터 (2019 - 06 - 03 17: 00: 15) 가 도 착 했 을 때 이 데이터 가 속 한 창 은 [2019 - 06 - 03 17: 00: 10, 2019 - 06 - 03 17: 00: 20] 입 니 다.
  • MyTimestamps AndWatermarks 의 getCurrentWatermark () 방법 으로 watermark 를 계산 합 니 다: 2019 - 06 - 03 17: 00: 10
  • onElement () 방법 을 호출 합 니 다. 이전 창 window. max Timestamp () 는 2019 - 06 - 03 17: 00: 10 ctx. getCurrentWatermark () 는 2019 - 06 - 03 17: 00: 10 if 조건 을 만족 시 키 고 이 watermaker 와 같은 trigger 를 촉발 합 니 다. window 는 계산 을 시작 하고 이 window 의 trigger 등록 시간 을 삭제 합 니 다.
  • 네 번 째 데이터 (2019 - 06 - 03 17: 00: 05) 가 도 착 했 을 때 이때 워 터 마크 는 2019 - 06 - 03 17: 00: 30 이 라 고 가정 합 니 다.
  • onElement () 방법 을 호출 합 니 다. 이전 창 window. max Timestamp () 는 2019 - 06 - 03 17: 00: 10 ctx. getCurrentWatermark () 는 2019 - 06 - 03 17: 00: 30 window. max Timestamp () + allowed Lateness > watermark 입 니 다. 이 창 은 삭제 되 지 않 았 습 니 다. 발 창 에 직접 접촉 하여 계산 하고 현재 데 이 터 를 이전 창 에 계산 한 결과 와 연산 합 니 다. window. max Timestamp ()+ allowedLateness < = watermark, 이 창 이 삭제 되 었 다 고 판단 하면 데 이 터 를 버 립 니 다
  • window 트리거 계산 조건
  • watermark >= endTime
  • window 에 원소
  • 가 있 습 니 다.
  • 트리거 윈도 계산 (trigger 등록 시간 이 워 터 마크 와 같은 동작 보다 작 음)
  • 4. 지연 데 이 터 는 어떻게 처리 합 니까?
    Window Operator. java 에서 processElement () 방법
    for (W window: elementWindows) {
    	 // drop if the window is already late
    	 if (isWindowLate(window)) {
    	     continue;
    	 }
    	 isSkippedElement = false;
    	
    	 windowState.setCurrentNamespace(window);
    	 windowState.add(element.getValue());
    	
    	 triggerContext.key = key;
    	 triggerContext.window = window;
    	
    	 TriggerResult triggerResult = triggerContext.onElement(element);
    	
    	 if (triggerResult.isFire()) {
    	     ACC contents = windowState.get();
    	     if (contents == null) {
    	         continue;
    	     }
    	     emitWindowContents(window, contents);
    	 }
    	
    	 if (triggerResult.isPurge()) {
    	     windowState.clear();
    	 }
    	 registerCleanupTimer(window);
    }
    

    지각 이 너무 많은 데이터 에 대해 서 는 isWindow Late (window) 방법, 즉 window. max Timestamp () + allowed Latenes < = watermark 가 작 으 면 window 가 만 료 되 고 window 대상 을 삭제 하 며 window 상 태 를 삭제 해 야 합 니 다. 크 면 이 창 이 삭제 되 지 않 았 습 니 다. 발 창 에 직접 접촉 하여 계산 하고 현재 데 이 터 를 이전 창 에서 계산 한 결과 와 연산 하여 merge 작업 을 해 야 합 니 다.
    참고:https://www.jianshu.com/p/c8c789ff5570 https://blog.csdn.net/lmalds/article/details/52704170 https://www.jianshu.com/p/9db56f81fa2a

    좋은 웹페이지 즐겨찾기