flink Table의 Time Attributes에 대해서 얘기를 나눠보도록 하겠습니다.

8558 단어 flink

순서


본고는 주로 flink Table의 Time Attributes를 연구하고자 합니다.

Processing time


fromDataStream을 통해 정의

DataStream> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • DataStream에서 Table를 만들면fromDataStream에서 Processing time
  • 를 정의할 수 있습니다.

    TableSource를 통해 정의

    // define a table source with a processing attribute
    public class UserActionSource implements StreamTableSource, DefinedProctimeAttribute {
    
        @Override
        public TypeInformation getReturnType() {
            String[] names = new String[] {"Username" , "Data"};
            TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
            return Types.ROW(names, types);
        }
    
        @Override
        public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
            // create stream
            DataStream stream = ...;
            return stream;
        }
    
        @Override
        public String getProctimeAttribute() {
            // field with this name will be appended as a third field
            return "UserActionTime";
        }
    }
    
    // register table source
    tEnv.registerTableSource("UserActions", new UserActionSource());
    
    WindowedTable windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • TableSource를 통해 Table를 만들면DefinedProctimeAttribute 인터페이스를 실현하여Processing time
  • 를 정의할 수 있습니다

    Event time


    fromDataStream을 통해 정의

    // Option 1:
    
    // extract timestamp and assign watermarks based on knowledge of the stream
    DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);
    
    // declare an additional logical field as an event time attribute
    Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
    
    
    // Option 2:
    
    // extract timestamp from first field, and assign watermarks based on knowledge of the stream
    DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);
    
    // the first field has been used for timestamp extraction, and is no longer necessary
    // replace first field with a logical event time attribute
    Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
    
    // Usage:
    
    WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • DataStream에서Table를 만들면fromDataStream에서 이벤트타임을 정의할 수 있습니다.구체적으로 두 가지 방법이 있는데 하나는 하나의 필드를 추가로 정의하는 것이고 하나는 원래의 필드를 덮어쓰는 것이다
  • TableSource를 통해 정의

    // define a table source with a rowtime attribute
    public class UserActionSource implements StreamTableSource, DefinedRowtimeAttributes {
    
        @Override
        public TypeInformation getReturnType() {
            String[] names = new String[] {"Username", "Data", "UserActionTime"};
            TypeInformation[] types =
                new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
            return Types.ROW(names, types);
        }
    
        @Override
        public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
            // create stream
            // ...
            // assign watermarks based on the "UserActionTime" attribute
            DataStream stream = inputStream.assignTimestampsAndWatermarks(...);
            return stream;
        }
    
        @Override
        public List getRowtimeAttributeDescriptors() {
            // Mark the "UserActionTime" attribute as event-time attribute.
            // We create one attribute descriptor of "UserActionTime".
            RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
                "UserActionTime",
                new ExistingField("UserActionTime"),
                new AscendingTimestamps());
            List listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
            return listRowtimeAttrDescr;
        }
    }
    
    // register the table source
    tEnv.registerTableSource("UserActions", new UserActionSource());
    
    WindowedTable windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • TableSource를 통해 Table를 만들면DefinedRowtimeAttributes 인터페이스를 통해 이벤트타임
  • 을 정의할 수 있습니다.

    definedTimeAttributes


    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala
    /**
      * Extends a [[TableSource]] to specify a processing time attribute.
      */
    trait DefinedProctimeAttribute {
    
      /**
        * Returns the name of a processing time attribute or null if no processing time attribute is
        * present.
        *
        * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
        * type [[Types.SQL_TIMESTAMP]].
        */
      @Nullable
      def getProctimeAttribute: String
    }
    
    /**
      * Extends a [[TableSource]] to specify rowtime attributes via a
      * [[RowtimeAttributeDescriptor]].
      */
    trait DefinedRowtimeAttributes {
    
      /**
        * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
        *
        * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
        * type [[Types.SQL_TIMESTAMP]].
        *
        * @return A list of [[RowtimeAttributeDescriptor]].
        */
      def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
    }
    
    /**
      * Describes a rowtime attribute of a [[TableSource]].
      *
      * @param attributeName The name of the rowtime attribute.
      * @param timestampExtractor The timestamp extractor to derive the values of the attribute.
      * @param watermarkStrategy The watermark strategy associated with the attribute.
      */
    class RowtimeAttributeDescriptor(
      val attributeName: String,
      val timestampExtractor: TimestampExtractor,
      val watermarkStrategy: WatermarkStrategy) {
    
      /** Returns the name of the rowtime attribute. */
      def getAttributeName: String = attributeName
    
      /** Returns the [[TimestampExtractor]] for the attribute. */
      def getTimestampExtractor: TimestampExtractor = timestampExtractor
    
      /** Returns the [[WatermarkStrategy]] for the attribute. */
      def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
    
      override def equals(other: Any): Boolean = other match {
        case that: RowtimeAttributeDescriptor =>
            Objects.equals(attributeName, that.attributeName) &&
            Objects.equals(timestampExtractor, that.timestampExtractor) &&
            Objects.equals(watermarkStrategy, that.watermarkStrategy)
        case _ => false
      }
    
      override def hashCode(): Int = {
        Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
      }
    }
  • DefinedProctimeAttribute는 getProctimeAttribute 방법을 정의하고 String을 되돌려줍니다.Process time의 필드 이름을 정의하는 데 사용됩니다.Defined Rowtime Attributes는 get Rowtime Attribute Descriptors 방법을 정의했고 Rowtime Attribute Descriptor의 List를 되돌려주었습니다. Rowtime Attribute Descriptor는 3개의 속성이 있는데 각각attributeName,timestampExtractor와 watermarkStrategy
  • 입니다.

    소결

  • DataStream 또는 TableSource에서 Table를 만들 때Time Attributes를 지정할 수 있으며, 지정한 후에field로 사용하거나 time-based 작업에 참여할 수 있음
  • Processing time에 대해 DataStream에서 Table를 만들면fromDataStream에서 정의할 수 있다.TableSource를 통해 Table를 만들면Defined Proctime Attribute 인터페이스를 실현하여Processing time를 정의할 수 있습니다.Defined Proctime Attribute는 get Proctime Attribute 방법을 정의하고 String을 되돌려줍니다. Process time를 정의하는 필드 이름
  • 이벤트타임에 대해 DataStream에서Table를 만들면fromDataStream에서 정의할 수 있습니다.구체적으로 두 가지 방식이 있는데 하나는 하나의 필드를 추가로 정의하는 것이고 하나는 원래의 필드를 덮어쓰는 것이다.TableSource를 통해 Table를 만들면DefinedRowtimeAttributes 인터페이스를 실현하여 Event time를 정의할 수 있습니다.Defined Rowtime Attributes는 get Rowtime Attribute Descriptors 방법을 정의했고 Rowtime Attribute Descriptor의 List를 되돌려주었습니다. Rowtime Attribute Descriptor는 3개의 속성이 있는데 각각attributeName,timestampExtractor와 watermarkStrategy
  • 입니다.

    doc

  • Time Attributes
  • 좋은 웹페이지 즐겨찾기