Flink JOIN 실행 계획

5310 단어 Flink
Flink JOIN 실행 계획
코드:
    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
    val table = table1.join(table2).where("b = d").select("a, c")

실행 계획:

== Abstract Syntax Tree ==
LogicalProject(a=[$0], c=[$2])
  LogicalFilter(condition=[=($1, $3)])
    LogicalJoin(condition=[true], joinType=[inner])
      LogicalTableScan(table=[[_DataSetTable_0]])
      LogicalTableScan(table=[[_DataSetTable_1]])

== Optimized Logical Plan ==
DataSetCalc(select=[a, c])
  DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
    DataSetScan(table=[[_DataSetTable_0]])
    DataSetScan(table=[[_DataSetTable_1]])

== Physical Execution Plan ==
Stage 4 : Data Source
    content : collect elements with CollectionInputFormat
    Partitioning : RANDOM_PARTITIONED

    Stage 3 : Map
        content : from: (a, b)
        ship_strategy : Forward
        exchange_mode : PIPELINED
        driver_strategy : Map
        Partitioning : RANDOM_PARTITIONED

Stage 6 : Data Source
    content : collect elements with CollectionInputFormat
    Partitioning : RANDOM_PARTITIONED

    Stage 5 : Map
        content : from: (c, d)
        ship_strategy : Forward
        exchange_mode : PIPELINED
        driver_strategy : Map
        Partitioning : RANDOM_PARTITIONED

        Stage 2 : Join
            content : where: (=(b, d)), join: (a, b, c, d)
            ship_strategy : Hash Partition on [1]
            exchange_mode : PIPELINED
            driver_strategy : Hybrid Hash (build: from: (a, b) (id: 3))
            Partitioning : RANDOM_PARTITIONED

            Stage 1 : FlatMap
                content : select: (a, c)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : FlatMap
                Partitioning : RANDOM_PARTITIONED

                Stage 0 : Data Sink
                    content : org.apache.flink.api.java.io.DiscardingOutputFormat
                    ship_strategy : Forward
                    exchange_mode : PIPELINED
                    Partitioning : RANDOM_PARTITIONED


Flink 유 틸 리 티 의 깊이 우선 순위:
    /** * Plan.accept * Traverses the job depth first from all data sinks on towards the sources. * * @see Visitable#accept(Visitor) */
    @Override
    public void accept(Visitor> visitor) {
        for (GenericDataSinkBase> sink : this.sinks) {
            sink.accept(visitor);
        }
    }

    /** * GenericDataSinkBase.accept * Accepts the visitor and applies it this instance. This method applies the visitor in a depth-first traversal. * The visitors pre-visit method is called and, if returning * true, the visitor is recursively applied on the single input. After the recursion returned, * the post-visit method is called. * * @param visitor The visitor. * * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor) */
    @Override
    public void accept(Visitor> visitor) {
        boolean descend = visitor.preVisit(this);
        if (descend) {
            this.input.accept(visitor);
            visitor.postVisit(this);
        }
    }

좋은 웹페이지 즐겨찾기