[case52] flink Keyed Stream의 aggregation 작업에 대해 이야기합니다.
12336 단어 flink
순서
본고는 주로 flink Keyed Stream의aggregation 조작을 연구한다
인스턴스
@Test
public void testMax() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
env.fromElements(data)
.keyBy("word")
.max("frequency")
.addSink(new SinkFunction() {
@Override
public void invoke(WordCount value, Context context) throws Exception {
LOGGER.info("value:{}",value);
}
});
env.execute("testMax");
}
KeyedStream.aggregate
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
public SingleOutputStreamOperator sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
}
public SingleOutputStreamOperator sum(String field) {
return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
}
public SingleOutputStreamOperator max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
getExecutionConfig()));
}
public SingleOutputStreamOperator max(String field) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
false, getExecutionConfig()));
}
public SingleOutputStreamOperator min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
getExecutionConfig()));
}
public SingleOutputStreamOperator min(String field) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
false, getExecutionConfig()));
}
public SingleOutputStreamOperator maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
public SingleOutputStreamOperator maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
getExecutionConfig()));
}
public SingleOutputStreamOperator maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
first, getExecutionConfig()));
}
public SingleOutputStreamOperator minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
public SingleOutputStreamOperator minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
getExecutionConfig()));
}
public SingleOutputStreamOperator minBy(String field, boolean first) {
return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
first, getExecutionConfig()));
}
protected SingleOutputStreamOperator aggregate(AggregationFunction aggregate) {
StreamGroupedReduce operator = new StreamGroupedReduce(
clean(aggregate), getType().createSerializer(getExecutionConfig()));
return transform("Keyed Aggregation", getType(), operator);
}
boolean
파라미터가 많습니다. 이 파라미터는 여러 개의compare 값이 같을 때 첫 번째 되돌아올지 여부를 지정하는 데 사용됩니다ComparableAggregator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@Internal
public class ComparableAggregator extends AggregationFunction {
private static final long serialVersionUID = 1L;
private Comparator comparator;
private boolean byAggregate;
private boolean first;
private final FieldAccessor fieldAccessor;
private ComparableAggregator(AggregationType aggregationType, FieldAccessor fieldAccessor, boolean first) {
this.comparator = Comparator.getForAggregation(aggregationType);
this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
this.first = first;
this.fieldAccessor = fieldAccessor;
}
public ComparableAggregator(int positionToAggregate,
TypeInformation typeInfo,
AggregationType aggregationType,
ExecutionConfig config) {
this(positionToAggregate, typeInfo, aggregationType, false, config);
}
public ComparableAggregator(int positionToAggregate,
TypeInformation typeInfo,
AggregationType aggregationType,
boolean first,
ExecutionConfig config) {
this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
}
public ComparableAggregator(String field,
TypeInformation typeInfo,
AggregationType aggregationType,
boolean first,
ExecutionConfig config) {
this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
}
@SuppressWarnings("unchecked")
@Override
public T reduce(T value1, T value2) throws Exception {
Comparable
value1 value2
)이면 반사법으로value2의 비교필드의 값을value1로 업데이트하고 마지막으로value1AggregationFunction
@Internal
public abstract class AggregationFunction implements ReduceFunction {
private static final long serialVersionUID = 1L;
/**
* Aggregation types that can be used on a windowed stream or keyed stream.
*/
public enum AggregationType {
SUM, MIN, MAX, MINBY, MAXBY,
}
}
Comparator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
@Internal
public abstract class Comparator implements Serializable {
private static final long serialVersionUID = 1L;
public abstract int isExtremal(Comparable o1, R o2);
public static Comparator getForAggregation(AggregationType type) {
switch (type) {
case MAX:
return new MaxComparator();
case MIN:
return new MinComparator();
case MINBY:
return new MinByComparator();
case MAXBY:
return new MaxByComparator();
default:
throw new IllegalArgumentException("Unsupported aggregation type.");
}
}
private static class MaxComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public int isExtremal(Comparable o1, R o2) {
return o1.compareTo(o2) > 0 ? 1 : 0;
}
}
private static class MaxByComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public int isExtremal(Comparable o1, R o2) {
int c = o1.compareTo(o2);
if (c > 0) {
return 1;
}
if (c == 0) {
return 0;
} else {
return -1;
}
}
}
private static class MinByComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public int isExtremal(Comparable o1, R o2) {
int c = o1.compareTo(o2);
if (c < 0) {
return 1;
}
if (c == 0) {
return 0;
} else {
return -1;
}
}
}
private static class MinComparator extends Comparator {
private static final long serialVersionUID = 1L;
@Override
public int isExtremal(Comparable o1, R o2) {
return o1.compareTo(o2) < 0 ? 1 : 0;
}
}
}
소결
boolean
파라미터가 하나 더 많아서 비교값이 0일 때 어떤 요소를 되돌려줄지 선택하는 데 사용된다.한편,reduce 방법은byAggregate 작업이 아닌 경우에는value1을 항상 되돌려줍니다. 비교값이 같을 때 반사 업데이트value1을 사용하고value1doc
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[case52] flink Keyed Stream의 aggregation 작업에 대해 이야기합니다.flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java Keyed Stream의agg...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.