flink Table의 Time Attributes에 대해서 얘기를 나눠보도록 하겠습니다.
8558 단어 flink
순서
본고는 주로 flink Table의 Time Attributes를 연구하고자 합니다.
Processing time
fromDataStream을 통해 정의
DataStream> stream = ...;
// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
TableSource를 통해 정의
// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource, DefinedProctimeAttribute {
@Override
public TypeInformation getReturnType() {
String[] names = new String[] {"Username" , "Data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// field with this name will be appended as a third field
return "UserActionTime";
}
}
// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("UserActions")
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
Event time
fromDataStream을 통해 정의
// Option 1:
// extract timestamp and assign watermarks based on knowledge of the stream
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);
// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
// Option 2:
// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
// Usage:
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
TableSource를 통해 정의
// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource, DefinedRowtimeAttributes {
@Override
public TypeInformation getReturnType() {
String[] names = new String[] {"Username", "Data", "UserActionTime"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
// ...
// assign watermarks based on the "UserActionTime" attribute
DataStream stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public List getRowtimeAttributeDescriptors() {
// Mark the "UserActionTime" attribute as event-time attribute.
// We create one attribute descriptor of "UserActionTime".
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"UserActionTime",
new ExistingField("UserActionTime"),
new AscendingTimestamps());
List listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}
// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());
WindowedTable windowedTable = tEnv
.scan("UserActions")
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
definedTimeAttributes
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala
/**
* Extends a [[TableSource]] to specify a processing time attribute.
*/
trait DefinedProctimeAttribute {
/**
* Returns the name of a processing time attribute or null if no processing time attribute is
* present.
*
* The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
* type [[Types.SQL_TIMESTAMP]].
*/
@Nullable
def getProctimeAttribute: String
}
/**
* Extends a [[TableSource]] to specify rowtime attributes via a
* [[RowtimeAttributeDescriptor]].
*/
trait DefinedRowtimeAttributes {
/**
* Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
*
* All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
* type [[Types.SQL_TIMESTAMP]].
*
* @return A list of [[RowtimeAttributeDescriptor]].
*/
def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
/**
* Describes a rowtime attribute of a [[TableSource]].
*
* @param attributeName The name of the rowtime attribute.
* @param timestampExtractor The timestamp extractor to derive the values of the attribute.
* @param watermarkStrategy The watermark strategy associated with the attribute.
*/
class RowtimeAttributeDescriptor(
val attributeName: String,
val timestampExtractor: TimestampExtractor,
val watermarkStrategy: WatermarkStrategy) {
/** Returns the name of the rowtime attribute. */
def getAttributeName: String = attributeName
/** Returns the [[TimestampExtractor]] for the attribute. */
def getTimestampExtractor: TimestampExtractor = timestampExtractor
/** Returns the [[WatermarkStrategy]] for the attribute. */
def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
override def equals(other: Any): Boolean = other match {
case that: RowtimeAttributeDescriptor =>
Objects.equals(attributeName, that.attributeName) &&
Objects.equals(timestampExtractor, that.timestampExtractor) &&
Objects.equals(watermarkStrategy, that.watermarkStrategy)
case _ => false
}
override def hashCode(): Int = {
Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
}
}
소결
doc
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[case52] flink Keyed Stream의 aggregation 작업에 대해 이야기합니다.flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java Keyed Stream의agg...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.