Flink 1.12 새로운 기능 의 Flink SQL 시제 표 (Temporal Tables) 설명 및 요약

24723 단어 Flinkflink
머리말
Flink 1.12 가 정식 적 으로 발 표 된 후에 새로운 특성 을 많이 가 져 왔 습 니 다. 본 고 는 Flink 1.11 과 Flink 1. 12 에서 시제 표 의 사용 과 자신의 작은 정 리 를 중점적으로 학습 하고 정리 하 는 데 중심 을 두 었 습 니 다. 글 에 문제 가 있 으 면 댓 글로 교류 하고 토론 하 십시오. 저 는 신속하게 고 치 겠 습 니 다.
본 고 는 주로 Flink 1.12 에서 새로운 시제 표 의 새로운 개념 과 주의사항 을 어떻게 Join 에서 사용 하 는 지 에 대해 다음 글 에서 구체 적 으로 토론 할 것 이다.
Flink 의 시제 표 디자인 취지
우선, 여러분 은 명확 한 개념 을 가 져 야 합 니 다. 바로 전통 적 인 SQL 에서 표 는 일반적으로 경계 가 있 는 데 이 터 를 나타 내 는데 스 트림 컴 퓨 팅 과 같은 끊 임 없 는 데이터 에 문제 가 존재 하기 때문에 Flink SQL 에서 동적 표 라 는 개념 을 제 시 했 습 니 다. 이 점 은 홈 페이지 에 명확 한 설명 이 있 습 니 다.
자세 한 내용 은 링크 를 보십시오: 동적 테이블
하지만 여기 에는 추가 설명 이 있다.
  • 정적 표 에 비해 동적 표 는 시간 에 따라 변화 한다.SQL 조회 작용 과 동적 표를 조회 하면 조 회 는 지속 적 으로 실행 되 고 종료 되 지 않 으 며 하나 입 니 다.
  • 데이터 가 지속 적 으로 끝 이 없 기 때문에 연속 조 회 는 최종 적 으로 변 하지 않 는 결 과 를 내 놓 지 않 습 니 다. SQL
  • 연속 적 인 조 회 는 종료 되 지 않 고 입력 표 (동적 표) 의 데이터 변화 에 따라 지속 적 으로 계산 하고 변 화 를 결과 표 에 반영 합 니 다.

  • 위의 세 가지 개념 을 명 확 히 한 후에 시제 표 의 디자인 취 지 를 살 펴 보 자.
    업무 에서 우 리 는 차원 표 가 항상 업데이트 되 는 것 을 만 날 수 있다. 정상 적 으로 볼 때 우 리 는 최근 한 시간의 차원 표 데 이 터 를 얻 을 수 밖 에 없다. 그러나 업무 에서 우 리 는 특정한 시간 에 발생 할 때 이 사건 의 사건 시간 에 대응 하 는 차원 이 어떻게 되 어야 하 는 지 에 가장 관심 을 가진다. 홈 페이지 의 한 예 와 결합 하여 설명 한다.
    rowtime currency   rate
    ======= ======== ======
    09:00   US Dollar   102
    09:00   Euro        114
    09:00   Yen           1
    10:45   Euro        116
    11:15   Euro        119
    11:49   Pounds      108
    

    우 리 는 위의 데 이 터 를 보면 시간 에 따라 끊임없이 변화 하 는 환율 표를 가지 고 있다. 예 를 들 어 주문 이 9: 00 에 왔 을 때 해당 하 는 차원 의 결 과 는 다음 과 같다.
    rowtime currency   rate
    ======= ======== ======
    09:00   US Dollar   102
    09:00   Euro        114
    09:00   Yen           1
    

    그리고 주문 이 12: 00 에 왔 을 때 해당 하 는 차원 의 결 과 는 다음 과 같 아야 한다.
    rowtime currency   rate
    ======= ======== ======
    09:00   US Dollar   102
    09:00   Yen           1
    11:15   Euro        119 (Euro   )
    11:49   Pounds      108 (Pounds  )
    

    Flink SQL 1.11 때 SQL 의 DDL 에 서 는 시간의 의 미 를 처리 하 는 시제 표 join 만 지원 합 니 다. 만약 에 우리 가 사건 시간의 의미 효 과 를 얻 으 려 면 시제 표 함수 로 만 실현 할 수 있 습 니 다. 예 를 들 어:
    log.info("       ");
    tEnv.createTemporaryView("RatesHistory", ratesHistory);
    log.info("       ");
    
    //           
    //    "r_proctime"      ,   "r_currency"    
    TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
    $("rowtime"), // <==== (1)            
    $("currency")); // <==== (2)           
    log.info("         ");
    tEnv.createTemporarySystemFunction("Rates", rates);
    log.info("         ");
    
    String dml = "SELECT * FROM Orders AS o , LATERAL TABLE (Rates(o.time)) AS r WHERE r.currency = o.currency";
    

    여기 서 주의해 야 할 것 은 TemporalTableFunction 이벤트 시간 속성 을 입력 하려 면 TemporalTableFunction 을 정의 할 때 도 이벤트 시간 으로 정의 해 야 합 니 다. 그렇지 않 으 면 오류 가 발생 합 니 다. Non processing timeAttribute [TIME ATTRIBUTE (ROWTIME)] passed as the argument to TemporalTableFunction.
    한편, Flink 1.12 에 서 는 1.11 의 부족 함 을 보완 하고 DDL 에 서 는 이벤트 시간 과 처리 시간 두 가지 의 미 를 직접 지원 하 며 버 전 표 (1.12), 버 전 보기 (1.12), 일반 표 (1.12), 시제 표 함수 (1.11) 등 개념 도 도입 했다.
    Flink 1. 12 중 시제 표 의 유형
    시제 표 는 일련의 버 전의 표 스냅숏 집합 으로 나 눌 수 있다. 표 스냅숏 의 버 전 은 스냅숏 에 기 록 된 모든 유효 구간 을 대표 하고 유효 구간 의 시작 시간 과 종료 시간 은 사용자 가 지정 할 수 있 으 며 시제 표 가 자신의 역사 버 전 여 부 를 추적 할 수 있 는 지 에 따라 시제 표 는 로 나 눌 수 있다.
    버 전 목록:
  • 버 전, 버 전 표 는 무엇 입 니까?버 전 은 시간 대별 반응 표 데이터 의 한 형태 이다.예 를 들 어 우리 가 위 에서 제시 한 환율 의 예 는 9: 00 과 12: 00 이 바로 환율 표 의 두 가지 버 전이 다.버 전 표 는 표 가 서로 다른 시간 대 버 전의 집합 으로 우 리 는 그것 의 역사 버 전 을 추적 하고 방문 할 수 있다.한편, Flink 1.12 에 서 는 변경 로 그 를 기본 소스 나 형식 으로 직접 정의 하 는 표 에 대해 서 는 암시 적 으로 버 전 화 표를 정의 합 니 다.upsert Kafka 소스 와 데이터베이스 changelog 로그 형식, 예 를 들 어 debezium 과 canal 을 포함 합 니 다.상기 와 같이 유일한 부가 요 구 는 CREATE 표 문 구 는 반드시 PRIMARY KEY 과 사건 시간 속성 을 포함해 야 한 다 는 것 이다.메 인 키 제약 과 이벤트 시간 속성 을 정의 한 표 가 버 전 표 입 니 다.
  • 버 전 표를 어떻게 정의 합 니까? 홈 키 제약 과 이벤트 시간 으로 버 전 표를 정의 합 니 다. 홈 키 와 시간 이벤트 가 하나의 차원 데 이 터 를 유일 하 게 확인 할 수 있 기 때 문 입 니 다.
  • --        
    CREATE TABLE product_changelog (
      product_id STRING,
      product_name STRING,
      product_price DECIMAL(10, 4),
      update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
      PRIMARY KEY(product_id) NOT ENFORCED,      -- (1)       
      WATERMARK FOR update_time AS update_time   -- (2)    watermark                     
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'products',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'localhost:9092',
      'value.format' = 'debezium-json'
    );
    

    이상 의 DML 은 (1) 에서 표 product_changelog 로 메 인 키 를 정 의 했 고 (2)update_time 를 표 product_changelog 의 이벤트 시간 으로 정 의 했 기 때문에 product_changelog 는 버 전 표 이다.METADATA FROM 'value.source.timestamp' VIRTUAL 문법 은 모든 changelog 에서 changelog 에 대응 하 는 데이터베이스 시트 에서 작 동 하 는 실행 시간 을 추출 한 다 는 뜻 이다.
  • 버 전 시트 Join 을 사용 할 때 주의해 야 할 사항:
  • 이벤트 시간 을 기반 으로 하 는 시제 표 Join 의 join key 는 시제 표 의 메 인 키 를 포함해 야 합 니 다. 예 를 들 어 표 product_changelog 의 메 인 키 P.product_id 는 join 조건 O.product_id = P.product_id 에 포함 되 어야 합 니 다.이것 은 이해 하기 쉽다. 메 인 키 와 이벤트 시간 에 유일 하 게 데 이 터 를 확정한다.
  • 워 터 마크 의 설정: 이벤트 시간 을 기반 으로 하 는 시제 표 Join 은 좌우 양쪽 워 터 마크 를 통 해 작 동 합 니 다. join 양쪽 표 에 적합 한 워 터 마크 를 설정 하 였 는 지 확인 하 십시오.사건 시간의 중요 한 개념 중 하 나 는 워 터 마크 다. 이 건 설명 할 필요 가 없다.
  • 홈 페이지 에 서 는 데이터베이스 시트 에서 작 동 하 는 실행 시간 을 이벤트 시간 으로 사용 하 는 것 을 강력 히 추천 합 니 다. 그렇지 않 으 면 시간 을 통 해 추출 한 버 전이 데이터베이스 에 있 는 버 전과 일치 하지 않 을 수 있 습 니 다.


  • 버 전 보기
  • 보기 가 무엇 입 니까? 도 표를 보 시 겠 습 니까?보 기 는 이미 컴 파일 된 SQL 문장 입 니 다. 그래프 는 이미 컴 파일 된 SQL 문장 을 통 해 만들어 진 가상 테이블 입 니 다.
  • 왜 도표 가 있어 야 합 니까?

  • 흐름 에서 우 리 는 흔히 append - only 흐름 을 얻 습 니 다. 이것 은 우리 가 정의 할 수 없다 는 것 을 의미 합 니 다 PRIMARY KEY. 그러나 우 리 는 이 표 가 버 전 표를 정의 하 는 모든 필요 한 정 보 를 가지 고 있다 는 것 을 잘 알 고 있 기 때문에 우 리 는 Flink SQL 이 제공 하 는 DISTINCT 를 통 해 재 처리 하고 재 조회 하면 질서 있 는 changelog 흐름 을 생산 할 수 있 습 니 다.
  • 시 도 표를 어떻게 정의 합 니까? 홈 키 를 추측 하고 원시 데이터 흐름 을 유지 할 수 있 는 이벤트 시간 속성 을 다시 조회 합 니 다. 다음 과 같 습 니 다.
  • SELECT * FROM RatesHistory;
    
    currency_time currency  rate
    ============= ========= ====
    09:00:00      US Dollar 102
    09:00:00      Euro      114
    09:00:00      Yen       1
    10:45:00      Euro      116
    11:15:00      Euro      119
    11:49:00      Pounds    108
    
    --        query    Flink          changelog stream,     changelog             。
    CREATE VIEW versioned_rates AS              
    SELECT currency, rate, currency_time            -- (1) `currency_time`        
      FROM (
          SELECT *,
          ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency`     query   unique key,      
             ORDER BY currency_time DESC) AS rowNum 
          FROM RatesHistory )
    WHERE rowNum = 1; 
    
    --    `versioned_rates`         changelog:
    
    (changelog kind) currency_time currency   rate
    ================ ============= =========  ====
    +(INSERT)        09:00:00      US Dollar  102
    +(INSERT)        09:00:00      Euro       114
    +(INSERT)        09:00:00      Yen        1
    +(UPDATE_AFTER)  10:45:00      Euro       116
    +(UPDATE_AFTER)  11:15:00      Euro       119
    +(INSERT)        11:49:00      Pounds     108
    
  • 사용 은 도 표를 볼 때 주의해 야 할 사항 입 니 다. 도 표를 볼 때 먼저 append 가 필요 합 니 다.only 흐름, 이렇게 하면 우 리 는 DISTINCT 작업 을 사용 하여 사건 의 변 화 를 통 해 changlog 흐름 을 생산 할 수 있 습 니 다.플 링크 SQL 1.12 는 메 인 키 를 자동 으로 추정 하고 원본 데이터 흐름 을 유지 하 는 이벤트 시간 입 니 다.

  • 보통 시계
  • 보통 시 계 는 무엇 입 니까?버 전 표 는 표 가 각 시간 대 에 있 는 버 전 을 보존 하고 일반 표 는 이 표 의 최신 데이터 만 보존 합 니 다.
  • 일반 표를 어떻게 정의 합 니까? 일반 표 의 특성 은 그의 이름과 같 습 니 다. 바로 Flink 의 일반 표 입 니 다. 그 성명 은 Flink 건축 표 DDL 과 일치 합 니 다. 다음 과 같 습 니 다.
  • --   DDL      HBase  ,        SQL             
    -- 'currency'    HBase     rowKey
     CREATE TABLE LatestRates (   
         currency STRING,   
         fam1 ROW<rate DOUBLE>   
     ) WITH (   
        'connector' = 'hbase-1.4',   
        'table-name' = 'rates',   
        'zookeeper.quorum' = 'localhost:2181'   
     );
    
  • 일반 표를 사용 할 때 주의해 야 할 사항: 이론 적 으로 임의로 시제 표 로 사용 할 수 있 고 처리 시간 을 바탕 으로 하 는 시제 표 Join 에서 사용 할 수 있 지만 현재 시제 표 로 지원 하 는 일반 표 는 반드시 인터페이스 LookupableTableSource 를 실현 해 야 한다.인터페이스 LookupableTableSource 의 인 스 턴 스 는 시제 표 로 만 처리 시간 을 바탕 으로 하 는 시제 Join 에 사용 할 수 있 습 니 다.LookupableTableSource 을 통 해 정 의 된 표 의 는 이 표 가 실 행 될 때 하나 이상 의 key 를 통 해 외부 저장 시스템 을 조회 하 는 능력 을 갖 추고 있 음 을 의미 합 니 다. 현재 처리 시간 을 기반 으로 하 는 시제 표 join 에서 사용 할 수 있 는 표 는 JDBC, HBase, Hive 를 포함 합 니 다.처리 시간 을 기반 으로 한 시제 표 Join 에서 임의의 표를 시제 표 로 지원 하 는 것 은 멀 지 않 은 미래 에 지 원 될 것 입 니 다.

  • 시제 표 함수
    시제 표 함 수 는 본 고의 두 번 째 부분 에서 이미 설명 되 었 으 니 주의해 야 할 것 은
  • join 시 왼쪽 표 (왼쪽 입력 / 프로 브 측) 에서 시제 표 (오른쪽 입력 / 구축 측) 와 연결 해 야 합 니 다. 양쪽 의 시간 적 의미 가 같 아야 합 니 다. 그렇지 않 으 면 유사 한 이상 을 던 집 니 다. Non processing timeAttribute [TIME ATTRIBUTE (ROWTIME)] passed as the argument to TemporalTableFunction.
  • 처리 시간 을 바탕 으로 하 는 시제 Join 에서 오른쪽 표 가 외부 시스템 의 표 가 아니 라 일반적인 데이터 흐름 이 라면 시제 표 함수 Join 과 시제 표 Join 의 의미 에 문제 가 있 고 시제 표 함수 Join 은 여전히 사용 할 수 있 지만 시제 표 Join 은 이 기능 을 사용 하지 않 습 니 다.의미 문제 의 원인 은 Join 연산 자가 오른쪽 시제 표 (구조 측) 의 전체 스냅 샷 이 일치 하 는 지 알 수 없 기 때문에 왼쪽 의 흐름 이 시작 할 때 사용자 가 기대 하 는 데이터 와 관련 되 지 않 고 생산 환경 에서 사용 자 를 오도 할 수 있 습 니 다.
  • 처리 시간의 의미 에서 시제 표 함수 의 경우 최신 데이터 만 보존 하고 시간 사건 의 의미 에서 각 수위 에 대응 하 는 동적 표를 보존 합 니 다.

  • 총결산
    본 고 는 Flink 1.11 시제 관련 부족 과 Flink 1. 12 중 시제 표 디자인 의 새로운 개념 과 기본 적 인 정의 표 의 방법 과 주의사항 을 정리 했다.다음 에 Join 장 을 써 서 시제 표, 시제 함수 의 사용 보충 을 할 것 입 니 다.
    – 원숭이 두 마리

    좋은 웹페이지 즐겨찾기