Flink 시스템 의 Table API 와 SQL
오늘 주로 쓴 것 은 다음 과 같은 몇 가지 로 나 뉘 어 다음 과 같은 몇 가지 측면 에 따라 전개 된다.
1. Flink 의 서로 다른 API 의 계층 적 개요 입 니 다.
2. FlinkSQL 의 프로 그래 밍 절차.
3. Flink 프로 그래 밍 의 예.
하나 Flink 는 서로 다른 등급 의 API 를 가지 고 있 으 며, 서로 다른 등급 의 API 는 서로 다른 사용자 가 처리 하기에 편리 하 다.일반 사용 자 는 Datastream 과 Dataset 를 사용 하여 프로그램 을 작성 합 니 다. 우 리 는 더 높 은 기초 위 에서 Table API 와 SQL 을 사용 할 수 있 습 니 다. 이것 도 Flink 의 강력 한 점 입 니 다. 처리 표를 사용 하 는 것 처럼 데 이 터 를 처리 할 수 있 습 니 다.더 높이 연구 하고 싶다 면 더 밑바닥 을 볼 수 있다.
SQL
High-level Language
Table API
Declarative DSL
Datastream / Dataset API
Core API
Stateful Stream Processing
Low-level building block (streams, state, [event] time)
둘째, Flink 의 Table API 화해시키다 SQL 프로 그래 밍 절 차 는 다음 과 같 습 니 다.
1) 후속 사용 을 위해 Table Environment 표 환경 을 만 듭 니 다.Table Environment 는... SQL 화해시키다 Table API 의 핵심 개념 은 실행 에 필요 한 데이터 속성 을 설정 하 는 데 사 용 됩 니 다. Execution Environment 와 유사 하 며 주로 책임 을 집 니 다.
a) 레 지 스 트 데이터 원본, 내부 또는 외부 원본.
b) 해당 SQL 문 구 를 실행 합 니 다.
c) 사용자 정의 집합 수 를 등록 합 니 다.
d. 결과 집합 을 대상 데이터 원본 에 스 캔 하고 기록 합 니 다.
e) 같은 environment 에서 join unin 작업 을 수행 할 수 있 습 니 다.
2) 다음 에 데이터 원본 을 어떻게 등록 하 는 지 살 펴 보 겠 습 니 다. 서로 다른 Flink 버 전 은 서로 다른 실현 이 있 지만 핵심 내용 은 변 하지 않 습 니 다.
a) 데이터 세트 에서 직접 등록 할 수 있 습 니 다.예 를 들 어 table Environment. registerDataSet ().
b) 이미 존재 하 는 Table 에서 scan 이나 select 를 직접 실행 하면 새로운 Table 이 생 성 됩 니 다. 즉, 데 이 터 는 기 존의 Table 에서 다시 가 져 올 수 있 습 니 다. Table t = tableEnv. scan ("x"). select ("a, b, c").
c) TableSource 일 수도 있 고, 서로 다른 파일, 데이터베이스, 메시지 시스템 에서 읽 는 것 이다. csv 파일, TableSource csvSource = new CsvTableSource("path/to/file")。
3) 데 이 터 를 읽 고 처리 한 후 저장 하려 면 파일 이나 데이터베이스, 메시지 시스템 등에 Sink (저장) 이 필요 합 니 다.
a) 예 를 들 어 싱 크 에서 CSV 파일 까지. TableSink csvSink = new TableCSVSink("path/to/sink", ..)。
b) Sink 는 CSV 파일 에 필드 문장 과 형식 을 지정 합 니 다.
표 필드 지정:
String[] fieldNames = {"fild1", "filed2", "field3"};
지정 한 필드 종류: TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
테이블 이름과 csv 파일 지정:
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
、 , 。
1) 주어진 단어 와 단어의 개수 에서 각 단어 에 나타 난 데 이 터 를 통계 하고 SQL 문 구 를 사용 하여 조회 통 계 를 실현 한다.전체 사례 는 다음 과 같 습 니 다 (주의, 서로 다른 FLink 버 전 구현 에 약간의 차이 가 있 음).
package myflink.sql;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class WordCountSQL {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);
DataSet input = env.fromElements(
WC.of("hello", 1),
WC.of("hqs", 1),
WC.of("world", 1),
WC.of("hello", 1)
);
//
tEnv.registerDataSet("WordCount", input, "word, frequency");
// SQL,
Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet result = tEnv.toDataSet(table, WC.class);
result.print();
}
public static class WC {
public String word; //hello
public long frequency;
// , flink
public WC() {}
public static WC of(String word, long frequency) {
WC wc = new WC();
wc.word = word;
wc.frequency = frequency;
return wc;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
}
출력 결 과 는 우리 가 생각 하 는 결과 와 같다 는 것 이다.
WC world 1
WC hello 2
WC hqs 1
2) 다음 예 는 복잡 할 것 입 니 다. txt 파일 에서 데 이 터 를 읽 습 니 다. txt 파일 에는 id, 헤링본, 책 이름, 가격 정보 가 포함 되 어 있 습 니 다.그리고 데 이 터 를 하나의 표 로 등록 한 다음 에 이 표 의 결 과 를 통계 하고 사람의 이름 에 따라 이 사람 이 책 을 사 는 데 쓴 돈 을 통계 하여 결 과 를 하나의 파일 에 넣 습 니 다.코드 를 올리다.
package myflink.sql;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
public class SQLFromFile {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
env.setParallelism(1);
//
DataSource input = env.readTextFile("test.txt");
//
input.print();
// DataSet
DataSet inputDataSet = input.map(new MapFunction() {
@Override
public Orders map(String s) throws Exception {
String[] splits = s.split(" ");
return Orders.of(Integer.valueOf(splits[0]), String.valueOf(splits[1]), String.valueOf(splits[2]), Double.valueOf(splits[3]));
}
});
// table
Table order = tableEnv.fromDataSet(inputDataSet);
// Orders
tableEnv.registerTable("Orders", order);
Table nameResult = tableEnv.scan("Orders").select("name");
//
nameResult.printSchema();
//
Table sqlQueryResult = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");
// DataSet
DataSet result = tableEnv.toDataSet(sqlQueryResult, Result.class);
result.print();
// tuple
result.map(new MapFunction>() {
@Override
public Tuple2 map(Result result) throws Exception {
String name = result.name;
Double total = result.total;
return Tuple2.of(name, total);
}
}).print();
TableSink sink = new CsvTableSink("SQLText.txt", " | ");
//
String[] filedNames = {"name", "total"};
//
TypeInformation[] filedTypes = {Types.STRING(), Types.DOUBLE()};
tableEnv.registerTableSink("SQLTEXT", filedNames, filedTypes, sink);
sqlQueryResult.insertInto("SQLTEXT");
env.execute();
}
public static class Orders {
public Integer id;
public String name;
public String book;
public Double price;
public Orders() {
super();
}
public static Orders of(Integer id, String name, String book, Double price) {
Orders orders = new Orders();
orders.id = id;
orders.name = name;
orders.book = book;
orders.price = price;
return orders;
}
}
public static class Result {
public String name;
public Double total;
public Result() {
super();
}
public static Result of(String name, Double total) {
Result result = new Result();
result.name = name;
result.total = total;
return result;
}
}
}
완전한 코드 를 참고 하려 면 접근 할 수 있 습 니 다.https://github.com/stonehqs/flink-demo 。
문제 가 있 으 면 벽돌 을 찍 는 것 을 환영 합 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.