Flink 시스템 의 Table API 와 SQL

Flink 는 표 처럼 처 리 된 API 와 SQL 문 구 를 실행 하 는 것 처럼 결과 집합 을 실행 합 니 다.이렇게 하면 모두 가 데이터 처 리 를 편리 하 게 할 수 있다.예 를 들 어 일부 조 회 를 수행 하고 무한 데이터 와 일괄 처리 작업 에서 이 를 일정한 형식 으로 출력 하면 SQL 을 수행 하 는 것 처럼 간단 합 니 다.
오늘 주로 쓴 것 은 다음 과 같은 몇 가지 로 나 뉘 어 다음 과 같은 몇 가지 측면 에 따라 전개 된다.
  1. Flink 의 서로 다른 API 의 계층 적 개요 입 니 다.
2. FlinkSQL 의 프로 그래 밍 절차.
  3. Flink 프로 그래 밍 의 예.
  
하나  Flink 는 서로 다른 등급 의 API 를 가지 고 있 으 며, 서로 다른 등급 의 API 는 서로 다른 사용자 가 처리 하기에 편리 하 다.일반 사용 자 는 Datastream 과 Dataset 를 사용 하여 프로그램 을 작성 합 니 다. 우 리 는 더 높 은 기초 위 에서 Table API 와 SQL 을 사용 할 수 있 습 니 다. 이것 도 Flink 의 강력 한 점 입 니 다. 처리 표를 사용 하 는 것 처럼 데 이 터 를 처리 할 수 있 습 니 다.더 높이 연구 하고 싶다 면 더 밑바닥 을 볼 수 있다.
SQL 
High-level Language
Table API
Declarative  DSL
Datastream / Dataset API
Core API
Stateful Stream Processing
Low-level building block (streams, state, [event] time)
둘째, Flink 의 Table API 화해시키다 SQL 프로 그래 밍 절 차 는 다음 과 같 습 니 다.
  1) 후속 사용 을 위해 Table Environment 표 환경 을 만 듭 니 다.Table Environment 는... SQL 화해시키다 Table API 의 핵심 개념 은 실행 에 필요 한 데이터 속성 을 설정 하 는 데 사 용 됩 니 다. Execution Environment 와 유사 하 며 주로 책임 을 집 니 다.
    a) 레 지 스 트 데이터 원본, 내부 또는 외부 원본.
    b) 해당 SQL 문 구 를 실행 합 니 다.
    c) 사용자 정의 집합 수 를 등록 합 니 다.
d. 결과 집합 을 대상 데이터 원본 에 스 캔 하고 기록 합 니 다.
    e) 같은 environment 에서 join unin 작업 을 수행 할 수 있 습 니 다.
2) 다음 에 데이터 원본 을 어떻게 등록 하 는 지 살 펴 보 겠 습 니 다. 서로 다른 Flink 버 전 은 서로 다른 실현 이 있 지만 핵심 내용 은 변 하지 않 습 니 다.
    a) 데이터 세트 에서 직접 등록 할 수 있 습 니 다.예 를 들 어 table Environment. registerDataSet ().
    b) 이미 존재 하 는 Table 에서 scan 이나 select 를 직접 실행 하면 새로운 Table 이 생 성 됩 니 다. 즉, 데 이 터 는 기 존의 Table 에서 다시 가 져 올 수 있 습 니 다. Table t = tableEnv. scan ("x"). select ("a, b, c").
    c) TableSource 일 수도 있 고, 서로 다른 파일, 데이터베이스, 메시지 시스템 에서 읽 는 것 이다. csv 파일, TableSource csvSource = new CsvTableSource("path/to/file")。
3) 데 이 터 를 읽 고 처리 한 후 저장 하려 면 파일 이나 데이터베이스, 메시지 시스템 등에 Sink (저장) 이 필요 합 니 다.
    a) 예 를 들 어 싱 크 에서 CSV 파일 까지. TableSink csvSink = new TableCSVSink("path/to/sink", ..)。
    b) Sink 는 CSV 파일 에 필드 문장 과 형식 을 지정 합 니 다.
표 필드 지정:  String[] fieldNames = {"fild1", "filed2", "field3"}; 지정 한 필드 종류:  TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 
테이블 이름과 csv 파일 지정: tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink); 、 , 。

    1) 주어진 단어 와 단어의 개수 에서 각 단어 에 나타 난 데 이 터 를 통계 하고 SQL 문 구 를 사용 하여 조회 통 계 를 실현 한다.전체 사례 는 다음 과 같 습 니 다 (주의, 서로 다른 FLink 버 전 구현 에 약간의 차이 가 있 음).
package myflink.sql;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;


public class WordCountSQL {

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);

        DataSet input = env.fromElements(
                WC.of("hello", 1),
                WC.of("hqs", 1),
                WC.of("world", 1),
                WC.of("hello", 1)
        );
        //     
        tEnv.registerDataSet("WordCount", input, "word, frequency");

        //  SQL,          
        Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");

        DataSet result = tEnv.toDataSet(table, WC.class);

        result.print();

    }

    public static class WC {
        public String word; //hello
        public long frequency;

        //      , flink     
        public WC() {}

        public static WC of(String word, long frequency) {
            WC wc = new WC();
            wc.word = word;
            wc.frequency = frequency;
            return wc;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }

}

 
출력 결 과 는 우리 가 생각 하 는 결과 와 같다 는 것 이다.
WC world 1
WC hello 2
WC hqs 1

2) 다음 예 는 복잡 할 것 입 니 다. txt 파일 에서 데 이 터 를 읽 습 니 다. txt 파일 에는 id, 헤링본, 책 이름, 가격 정보 가 포함 되 어 있 습 니 다.그리고 데 이 터 를 하나의 표 로 등록 한 다음 에 이 표 의 결 과 를 통계 하고 사람의 이름 에 따라 이 사람 이 책 을 사 는 데 쓴 돈 을 통계 하여 결 과 를 하나의 파일 에 넣 습 니 다.코드 를 올리다.
  
package myflink.sql;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;

public class SQLFromFile {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

        env.setParallelism(1);
        //    
        DataSource input = env.readTextFile("test.txt");
        //           
        input.print();
        //   DataSet
        DataSet inputDataSet = input.map(new MapFunction() {
            @Override
            public Orders map(String s) throws Exception {
                String[] splits = s.split(" ");
                return Orders.of(Integer.valueOf(splits[0]), String.valueOf(splits[1]), String.valueOf(splits[2]), Double.valueOf(splits[3]));
            }
        });
        //   table
        Table order = tableEnv.fromDataSet(inputDataSet);
        //  Orders  
        tableEnv.registerTable("Orders", order);
        Table nameResult = tableEnv.scan("Orders").select("name");
        //     
        nameResult.printSchema();

        //      
        Table sqlQueryResult = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");
        //       DataSet
        DataSet result = tableEnv.toDataSet(sqlQueryResult, Result.class);
        result.print();

        // tuple       
        result.map(new MapFunction>() {
            @Override
            public Tuple2 map(Result result) throws Exception {
                String name = result.name;
                Double total = result.total;
                return Tuple2.of(name, total);
            }
        }).print();

        TableSink sink  = new CsvTableSink("SQLText.txt", " | ");

        //     
        String[] filedNames = {"name", "total"};
        //      
        TypeInformation[] filedTypes = {Types.STRING(), Types.DOUBLE()};

        tableEnv.registerTableSink("SQLTEXT", filedNames, filedTypes, sink);

        sqlQueryResult.insertInto("SQLTEXT");

        env.execute();

    }

    public static class Orders {
        public Integer id;
        public String name;
        public String book;
        public Double price;

        public Orders() {
            super();
        }

        public static Orders of(Integer id, String name, String book, Double price) {
            Orders orders = new Orders();
            orders.id = id;
            orders.name = name;
            orders.book = book;
            orders.price = price;
            return orders;
        }
    }

    public static class Result {
        public String name;
        public Double total;

        public Result() {
            super();
        }

        public static Result of(String name, Double total) {
            Result result = new Result();
            result.name = name;
            result.total = total;
            return result;
        }
    }

}

  
완전한 코드 를 참고 하려 면 접근 할 수 있 습 니 다.https://github.com/stonehqs/flink-demo 。
 
문제 가 있 으 면 벽돌 을 찍 는 것 을 환영 합 니 다.

좋은 웹페이지 즐겨찾기