flink Table 의 Orderby 및 Limit 에 대해 이야기 합 니 다.

6231 단어 flink
순서.
본 고 는 주로 flink Table 의 Orderby 와 Limit 를 연구 하고 자 한다.
실례
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");

Table in = tableEnv.fromDataSet(ds, "a, b, c");

// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); 

// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);

// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • orderby 방법 은 sql 의 orderby 와 유사 합 니 다.limit 는 offset 및 fetch 두 가지 방법 으로 구성 되 어 있 으 며, sql 과 유사 한 offset 및 fetch
  • Table
    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
    class Table(
        private[flink] val tableEnv: TableEnvironment,
        private[flink] val logicalPlan: LogicalNode) {
    
      //......
    
      def orderBy(fields: String): Table = {
        val parsedFields = ExpressionParser.parseExpressionList(fields)
        orderBy(parsedFields: _*)
      }
    
      def orderBy(fields: Expression*): Table = {
        val order: Seq[Ordering] = fields.map {
          case o: Ordering => o
          case e => Asc(e)
        }
        new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
      }
    
      def offset(offset: Int): Table = {
        new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
      }
    
      def fetch(fetch: Int): Table = {
        if (fetch < 0) {
          throw new ValidationException("FETCH count must be equal or larger than 0.")
        }
        this.logicalPlan match {
          case Limit(o, -1, c) =>
            // replace LIMIT without FETCH by LIMIT with FETCH
            new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))
          case Limit(_, _, _) =>
            throw new ValidationException("FETCH is already defined.")
          case _ =>
            new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))
        }
      }
    
      //......
    }
  • Table 의 orderby 방법 은 String 또는 Expression 형식의 인 자 를 지원 합 니 다. 그 중에서 String 유형 은 최종 적 으로 Expression 형식 으로 전 환 됩 니 다.orderby 방법 은 마지막 으로 Sort 를 사용 하여 Table 을 다시 만 들 었 습 니 다.offset 및 fetch 방법, Limit 를 사용 하여 Table offset Limit fetch -1;fetch offset Limit offset 0
  • 을 다시 만 들 었 습 니 다.
    Sort
    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
    case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
      override def output: Seq[Attribute] = child.output
    
      override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
        child.construct(relBuilder)
        relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
      }
    
      override def validate(tableEnv: TableEnvironment): LogicalNode = {
        if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
          failValidation(s"Sort on stream tables is currently not supported.")
        }
        super.validate(tableEnv)
      }
    }
  • Sort 는 Unary Node 를 계승 하 였 으 며, 구조 기 는 set 형식의 Ordering 을 받 았 으 며, construct 방법 은 relBuilder. sort 를 사용 하여 sort 조건 을 구축 하 였 다
  • .
    Ordering
    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala
    abstract class Ordering extends UnaryExpression {
      override private[flink] def validateInput(): ValidationResult = {
        if (!child.isInstanceOf[NamedExpression]) {
          ValidationFailure(s"Sort should only based on field reference")
        } else {
          ValidationSuccess
        }
      }
    }
    
    case class Asc(child: Expression) extends Ordering {
      override def toString: String = s"($child).asc"
    
      override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
        child.toRexNode
      }
    
      override private[flink] def resultType: TypeInformation[_] = child.resultType
    }
    
    case class Desc(child: Expression) extends Ordering {
      override def toString: String = s"($child).desc"
    
      override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
        relBuilder.desc(child.toRexNode)
      }
    
      override private[flink] def resultType: TypeInformation[_] = child.resultType
    }
  • Ordering 은 추상 적 인 유형 으로 Asc 와 Desc 두 가지 유형
  • 이 있다.
    Limit
    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
    case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
      override def output: Seq[Attribute] = child.output
    
      override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
        child.construct(relBuilder)
        relBuilder.limit(offset, fetch)
      }
    
      override def validate(tableEnv: TableEnvironment): LogicalNode = {
        if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
          failValidation(s"Limit on stream tables is currently not supported.")
        }
        if (!child.isInstanceOf[Sort]) {
          failValidation(s"Limit operator must be preceded by an OrderBy operator.")
        }
        if (offset < 0) {
          failValidation(s"Offset should be greater than or equal to zero.")
        }
        super.validate(tableEnv)
      }
    }
  • Limit 는 Unary Node 를 계 승 했 습 니 다. 구조 기 는 offset 및 fetch 인 자 를 받 습 니 다. construct 방법 은 relBuilder. limit 를 통 해 offset 및 fetch
  • 를 설정 합 니 다.
    작은 매듭
  • Table 의 orderby 방법 은 sql 의 orderby 와 유사 합 니 다.limit 는 offset 및 fetch 두 가지 방법 으로 구성 되 어 있 으 며, sql 과 유사 한 offset 및 fetch
  • Table 의 orderby 방법 은 String 또는 Expression 형식의 인 자 를 지원 합 니 다. 그 중에서 String 유형 은 최종 적 으로 Expression 형식 으로 전 환 됩 니 다.orderby 방법 은 마지막 으로 Sort 를 사용 하여 Table 을 다시 만 들 었 습 니 다.offset 및 fetch 방법, Limit 를 사용 하여 Table offset Limit fetch -1;fetch offset Limit offset 0
  • 을 다시 만 들 었 습 니 다.
  • Sort 는 Unary Node 를 계승 하 였 으 며, 구조 기 는 set 형식의 Ordering 을 받 았 으 며, construct 방법 은 relBuilder. sort 를 사용 하여 sort 조건 을 구축 하 였 습 니 다.Ordering 은 추상 적 인 유형 으로 Asc 와 Desc 두 가지 키 가 있 습 니 다.Limit 는 Unary Node 를 계승 합 니 다. 구조 기 는 offset 및 fetch 인 자 를 받 습 니 다. construct 방법 은 relBuilder. limit 를 통 해 offset 및 fetch
  • 를 설정 합 니 다.
    doc
  • OrderBy, Offset & Fetch
  • 좋은 웹페이지 즐겨찾기