Flink JOIN 실행 계획
5310 단어 Flink
코드:
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
val table = table1.join(table2).where("b = d").select("a, c")
실행 계획:
== Abstract Syntax Tree ==
LogicalProject(a=[$0], c=[$2])
LogicalFilter(condition=[=($1, $3)])
LogicalJoin(condition=[true], joinType=[inner])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])
== Optimized Logical Plan ==
DataSetCalc(select=[a, c])
DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
== Physical Execution Plan ==
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 3 : Map
content : from: (a, b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 6 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 5 : Map
content : from: (c, d)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 2 : Join
content : where: (=(b, d)), join: (a, b, c, d)
ship_strategy : Hash Partition on [1]
exchange_mode : PIPELINED
driver_strategy : Hybrid Hash (build: from: (a, b) (id: 3))
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
content : select: (a, c)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content : org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Flink 유 틸 리 티 의 깊이 우선 순위:
/** * Plan.accept * Traverses the job depth first from all data sinks on towards the sources. * * @see Visitable#accept(Visitor) */
@Override
public void accept(Visitor> visitor) {
for (GenericDataSinkBase> sink : this.sinks) {
sink.accept(visitor);
}
}
/** * GenericDataSinkBase.accept * Accepts the visitor and applies it this instance. This method applies the visitor in a depth-first traversal. * The visitors pre-visit method is called and, if returning * true, the visitor is recursively applied on the single input. After the recursion returned, * the post-visit method is called. * * @param visitor The visitor. * * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor) */
@Override
public void accept(Visitor> visitor) {
boolean descend = visitor.preVisit(this);
if (descend) {
this.input.accept(visitor);
visitor.postVisit(this);
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Flink On YARN 고가용 클러스터 모드 구축(flink-1.10.0-bin-scala_2.11.tgz)다운로드 주소:https://flink.apache.org/downloads.html 다운로드한 설치 패키지를 서버에 업로드하고 지정한 디렉터리에 압축을 풀십시오. 명령은 다음과 같습니다. 파일 끝에 다음과 같은 내...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.