Flink 1.12 새로운 기능 의 Flink SQL 시제 표 (Temporal Tables) 설명 및 요약
Flink 1.12 가 정식 적 으로 발 표 된 후에 새로운 특성 을 많이 가 져 왔 습 니 다. 본 고 는 Flink 1.11 과 Flink 1. 12 에서 시제 표 의 사용 과 자신의 작은 정 리 를 중점적으로 학습 하고 정리 하 는 데 중심 을 두 었 습 니 다. 글 에 문제 가 있 으 면 댓 글로 교류 하고 토론 하 십시오. 저 는 신속하게 고 치 겠 습 니 다.
본 고 는 주로 Flink 1.12 에서 새로운 시제 표 의 새로운 개념 과 주의사항 을 어떻게 Join 에서 사용 하 는 지 에 대해 다음 글 에서 구체 적 으로 토론 할 것 이다.
Flink 의 시제 표 디자인 취지
우선, 여러분 은 명확 한 개념 을 가 져 야 합 니 다. 바로 전통 적 인 SQL 에서 표 는 일반적으로 경계 가 있 는 데 이 터 를 나타 내 는데 스 트림 컴 퓨 팅 과 같은 끊 임 없 는 데이터 에 문제 가 존재 하기 때문에 Flink SQL 에서 동적 표 라 는 개념 을 제 시 했 습 니 다. 이 점 은 홈 페이지 에 명확 한 설명 이 있 습 니 다.
자세 한 내용 은 링크 를 보십시오: 동적 테이블
하지만 여기 에는 추가 설명 이 있다.
입 니 다. SQL
위의 세 가지 개념 을 명 확 히 한 후에 시제 표 의 디자인 취 지 를 살 펴 보 자.
업무 에서 우 리 는 차원 표 가 항상 업데이트 되 는 것 을 만 날 수 있다. 정상 적 으로 볼 때 우 리 는 최근 한 시간의 차원 표 데 이 터 를 얻 을 수 밖 에 없다. 그러나 업무 에서 우 리 는 특정한 시간 에 발생 할 때 이 사건 의 사건 시간 에 대응 하 는 차원 이 어떻게 되 어야 하 는 지 에 가장 관심 을 가진다. 홈 페이지 의 한 예 와 결합 하여 설명 한다.
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
우 리 는 위의 데 이 터 를 보면 시간 에 따라 끊임없이 변화 하 는 환율 표를 가지 고 있다. 예 를 들 어 주문 이 9: 00 에 왔 을 때 해당 하 는 차원 의 결 과 는 다음 과 같다.
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
그리고 주문 이 12: 00 에 왔 을 때 해당 하 는 차원 의 결 과 는 다음 과 같 아야 한다.
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Yen 1
11:15 Euro 119 (Euro )
11:49 Pounds 108 (Pounds )
Flink SQL 1.11 때 SQL 의 DDL 에 서 는 시간의 의 미 를 처리 하 는 시제 표 join 만 지원 합 니 다. 만약 에 우리 가 사건 시간의 의미 효 과 를 얻 으 려 면 시제 표 함수 로 만 실현 할 수 있 습 니 다. 예 를 들 어:
log.info(" ");
tEnv.createTemporaryView("RatesHistory", ratesHistory);
log.info(" ");
//
// "r_proctime" , "r_currency"
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
$("rowtime"), // <==== (1)
$("currency")); // <==== (2)
log.info(" ");
tEnv.createTemporarySystemFunction("Rates", rates);
log.info(" ");
String dml = "SELECT * FROM Orders AS o , LATERAL TABLE (Rates(o.time)) AS r WHERE r.currency = o.currency";
여기 서 주의해 야 할 것 은 TemporalTableFunction 이벤트 시간 속성 을 입력 하려 면 TemporalTableFunction 을 정의 할 때 도 이벤트 시간 으로 정의 해 야 합 니 다. 그렇지 않 으 면 오류 가 발생 합 니 다. Non processing timeAttribute [TIME ATTRIBUTE (ROWTIME)] passed as the argument to TemporalTableFunction.
한편, Flink 1.12 에 서 는 1.11 의 부족 함 을 보완 하고 DDL 에 서 는 이벤트 시간 과 처리 시간 두 가지 의 미 를 직접 지원 하 며 버 전 표 (1.12), 버 전 보기 (1.12), 일반 표 (1.12), 시제 표 함수 (1.11) 등 개념 도 도입 했다.
Flink 1. 12 중 시제 표 의 유형
시제 표 는 일련의 버 전의 표 스냅숏 집합 으로 나 눌 수 있다. 표 스냅숏 의 버 전 은 스냅숏 에 기 록 된 모든 유효 구간 을 대표 하고 유효 구간 의 시작 시간 과 종료 시간 은 사용자 가 지정 할 수 있 으 며 시제 표 가 자신의 역사 버 전 여 부 를 추적 할 수 있 는 지 에 따라 시제 표 는
와
로 나 눌 수 있다.버 전 목록:
CREATE
표 문 구 는 반드시 PRIMARY KEY
과 사건 시간 속성 을 포함해 야 한 다 는 것 이다.메 인 키 제약 과 이벤트 시간 속성 을 정의 한 표 가 버 전 표 입 니 다.--
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product_id) NOT ENFORCED, -- (1)
WATERMARK FOR update_time AS update_time -- (2) watermark
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
이상 의 DML 은
(1)
에서 표 product_changelog
로 메 인 키 를 정 의 했 고 (2)
는 update_time
를 표 product_changelog
의 이벤트 시간 으로 정 의 했 기 때문에 product_changelog
는 버 전 표 이다.METADATA FROM 'value.source.timestamp' VIRTUAL
문법 은 모든 changelog 에서 changelog 에 대응 하 는 데이터베이스 시트 에서 작 동 하 는 실행 시간 을 추출 한 다 는 뜻 이다.product_changelog
의 메 인 키 P.product_id
는 join 조건 O.product_id = P.product_id
에 포함 되 어야 합 니 다.이것 은 이해 하기 쉽다. 메 인 키 와 이벤트 시간 에 유일 하 게 데 이 터 를 확정한다.버 전 보기
흐름 에서 우 리 는 흔히 append - only 흐름 을 얻 습 니 다. 이것 은 우리 가 정의 할 수 없다 는 것 을 의미 합 니 다
PRIMARY KEY
. 그러나 우 리 는 이 표 가 버 전 표를 정의 하 는 모든 필요 한 정 보 를 가지 고 있다 는 것 을 잘 알 고 있 기 때문에 우 리 는 Flink SQL 이 제공 하 는 DISTINCT 를 통 해 재 처리 하고 재 조회 하면 질서 있 는 changelog 흐름 을 생산 할 수 있 습 니 다.SELECT * FROM RatesHistory;
currency_time currency rate
============= ========= ====
09:00:00 US Dollar 102
09:00:00 Euro 114
09:00:00 Yen 1
10:45:00 Euro 116
11:15:00 Euro 119
11:49:00 Pounds 108
-- query Flink changelog stream, changelog 。
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time -- (1) `currency_time`
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` query unique key,
ORDER BY currency_time DESC) AS rowNum
FROM RatesHistory )
WHERE rowNum = 1;
-- `versioned_rates` changelog:
(changelog kind) currency_time currency rate
================ ============= ========= ====
+(INSERT) 09:00:00 US Dollar 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 Yen 1
+(UPDATE_AFTER) 10:45:00 Euro 116
+(UPDATE_AFTER) 11:15:00 Euro 119
+(INSERT) 11:49:00 Pounds 108
보통 시계
-- DDL HBase , SQL
-- 'currency' HBase rowKey
CREATE TABLE LatestRates (
currency STRING,
fam1 ROW<rate DOUBLE>
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'rates',
'zookeeper.quorum' = 'localhost:2181'
);
LookupableTableSource
를 실현 해 야 한다.인터페이스 LookupableTableSource
의 인 스 턴 스 는 시제 표 로 만 처리 시간 을 바탕 으로 하 는 시제 Join 에 사용 할 수 있 습 니 다.LookupableTableSource
을 통 해 정 의 된 표 의 는 이 표 가 실 행 될 때 하나 이상 의 key 를 통 해 외부 저장 시스템 을 조회 하 는 능력 을 갖 추고 있 음 을 의미 합 니 다. 현재 처리 시간 을 기반 으로 하 는 시제 표 join 에서 사용 할 수 있 는 표 는 JDBC, HBase, Hive 를 포함 합 니 다.처리 시간 을 기반 으로 한 시제 표 Join 에서 임의의 표를 시제 표 로 지원 하 는 것 은 멀 지 않 은 미래 에 지 원 될 것 입 니 다.시제 표 함수
시제 표 함 수 는 본 고의 두 번 째 부분 에서 이미 설명 되 었 으 니 주의해 야 할 것 은
총결산
본 고 는 Flink 1.11 시제 관련 부족 과 Flink 1. 12 중 시제 표 디자인 의 새로운 개념 과 기본 적 인 정의 표 의 방법 과 주의사항 을 정리 했다.다음 에 Join 장 을 써 서 시제 표, 시제 함수 의 사용 보충 을 할 것 입 니 다.
– 원숭이 두 마리
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
flink 패키지 컴파일 문제1.9flink를 포장 컴파일할 때 이 문제가 발생했습니다. 그리고 가다https://mvnrepository.com다운로드를 검색하세요.발견하다https://maven.ceon.pl/artifactory/repo/...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.