[case52] flink Keyed Stream의 aggregation 작업에 대해 이야기합니다.

12336 단어 flink

순서


본고는 주로 flink Keyed Stream의aggregation 조작을 연구한다

인스턴스

    @Test
    public void testMax() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
                WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
        env.fromElements(data)
                .keyBy("word")
                .max("frequency")
                .addSink(new SinkFunction() {
                    @Override
                    public void invoke(WordCount value, Context context) throws Exception {
                        LOGGER.info("value:{}",value);
                    }
                });
        env.execute("testMax");
    }
  • 여기서 먼저 워드 필드에 키By 조작을 한 다음에Keyed Stream의 max 방법으로frequency 필드에 따라 최대의Word Count
  • 를 취한다.

    KeyedStream.aggregate


    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
        public SingleOutputStreamOperator sum(int positionToSum) {
            return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator sum(String field) {
            return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator max(int positionToMax) {
            return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
                    getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator max(String field) {
            return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
                    false, getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator min(int positionToMin) {
            return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
                    getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator min(String field) {
            return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
                    false, getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator maxBy(int positionToMaxBy) {
            return this.maxBy(positionToMaxBy, true);
        }
    
        public SingleOutputStreamOperator maxBy(String positionToMaxBy) {
            return this.maxBy(positionToMaxBy, true);
        }
    
        public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) {
            return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
                    getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator maxBy(String field, boolean first) {
            return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
                    first, getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator minBy(int positionToMinBy) {
            return this.minBy(positionToMinBy, true);
        }
    
        public SingleOutputStreamOperator minBy(String positionToMinBy) {
            return this.minBy(positionToMinBy, true);
        }
    
        public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) {
            return aggregate(new ComparableAggregator(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
                    getExecutionConfig()));
        }
    
        public SingleOutputStreamOperator minBy(String field, boolean first) {
            return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
                    first, getExecutionConfig()));
        }
    
        protected SingleOutputStreamOperator aggregate(AggregationFunction aggregate) {
            StreamGroupedReduce operator = new StreamGroupedReduce(
                    clean(aggregate), getType().createSerializer(getExecutionConfig()));
            return transform("Keyed Aggregation", getType(), operator);
        }
  • Keyed Stream의aggregation 방법은 보호된 것으로 sum,max,min,maxBy,minBy 이 몇 가지 방법은 실제적으로aggregate 방법을 사용했다. 단지 그들이 만든Comparable Aggregator의AggregationType는 다르다. 각각SUM,MAX,MIN,MAXBY,MINBY
  • 이다.
  • 각sum,max,min,maxBy,minBy는 두 가지 재부팅 방법이 있는데 하나는 int 유형의 매개 변수이고 하나는String 유형의 매개 변수
  • maxBy,minBy는sum,max,min보다 퍼스트boolean 파라미터가 많습니다. 이 파라미터는 여러 개의compare 값이 같을 때 첫 번째 되돌아올지 여부를 지정하는 데 사용됩니다
  • ComparableAggregator


    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
    @Internal
    public class ComparableAggregator extends AggregationFunction {
    
        private static final long serialVersionUID = 1L;
    
        private Comparator comparator;
        private boolean byAggregate;
        private boolean first;
        private final FieldAccessor fieldAccessor;
    
        private ComparableAggregator(AggregationType aggregationType, FieldAccessor fieldAccessor, boolean first) {
            this.comparator = Comparator.getForAggregation(aggregationType);
            this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
            this.first = first;
            this.fieldAccessor = fieldAccessor;
        }
    
        public ComparableAggregator(int positionToAggregate,
                TypeInformation typeInfo,
                AggregationType aggregationType,
                ExecutionConfig config) {
            this(positionToAggregate, typeInfo, aggregationType, false, config);
        }
    
        public ComparableAggregator(int positionToAggregate,
                TypeInformation typeInfo,
                AggregationType aggregationType,
                boolean first,
                ExecutionConfig config) {
            this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
        }
    
        public ComparableAggregator(String field,
                TypeInformation typeInfo,
                AggregationType aggregationType,
                boolean first,
                ExecutionConfig config) {
            this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public T reduce(T value1, T value2) throws Exception {
            Comparable o1 = (Comparable) fieldAccessor.get(value1);
            Object o2 = fieldAccessor.get(value2);
    
            int c = comparator.isExtremal(o1, o2);
    
            if (byAggregate) {
                // if they are the same we choose based on whether we want to first or last
                // element with the min/max.
                if (c == 0) {
                    return first ? value1 : value2;
                }
    
                return c == 1 ? value1 : value2;
    
            } else {
                if (c == 0) {
                    value1 = fieldAccessor.set(value1, o2);
                }
                return value1;
            }
        }
    }
  • Comparable Aggregator는Aggregation Function을 계승했고 Aggregation Function는 Reduce Function 인터페이스를 실현했다. 여기서Comparable Aggregator가 실현한 Reduce 방법은 먼저Comparator를 빌려 두 대상을 비교한 다음byAggregate인지 여부에 따라 다른 처리를 한다. 만약byAggregate라면 비교값이 0일 때 가장 먼저 만났던 요소를 되돌려줄지, 만약value1을 되돌려줄지 판단한다.그렇지 않으면value2를 되돌려주고 비교값이 0이 아니면 비교값이 가장 큰 요소를 되돌려줍니다.byAggregate가 아니면 비교값이 0( value1 value2 )이면 반사법으로value2의 비교필드의 값을value1로 업데이트하고 마지막으로value1
  • 을 되돌려줍니다

    AggregationFunction

    @Internal
    public abstract class AggregationFunction implements ReduceFunction {
        private static final long serialVersionUID = 1L;
    
        /**
         * Aggregation types that can be used on a windowed stream or keyed stream.
         */
        public enum AggregationType {
            SUM, MIN, MAX, MINBY, MAXBY,
        }
    }
  • AggregationFunction 성명은 ReduceFunction을 실현했고 다섯 가지 유형의AggregationType을 정의했다. 그것이 바로 SUM, MIN, MAX, MINBY, MAXBY
  • 이다.

    Comparator


    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
    @Internal
    public abstract class Comparator implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        public abstract  int isExtremal(Comparable o1, R o2);
    
        public static Comparator getForAggregation(AggregationType type) {
            switch (type) {
            case MAX:
                return new MaxComparator();
            case MIN:
                return new MinComparator();
            case MINBY:
                return new MinByComparator();
            case MAXBY:
                return new MaxByComparator();
            default:
                throw new IllegalArgumentException("Unsupported aggregation type.");
            }
        }
    
        private static class MaxComparator extends Comparator {
    
            private static final long serialVersionUID = 1L;
    
            @Override
            public  int isExtremal(Comparable o1, R o2) {
                return o1.compareTo(o2) > 0 ? 1 : 0;
            }
    
        }
    
        private static class MaxByComparator extends Comparator {
    
            private static final long serialVersionUID = 1L;
    
            @Override
            public  int isExtremal(Comparable o1, R o2) {
                int c = o1.compareTo(o2);
                if (c > 0) {
                    return 1;
                }
                if (c == 0) {
                    return 0;
                } else {
                    return -1;
                }
            }
    
        }
    
        private static class MinByComparator extends Comparator {
    
            private static final long serialVersionUID = 1L;
    
            @Override
            public  int isExtremal(Comparable o1, R o2) {
                int c = o1.compareTo(o2);
                if (c < 0) {
                    return 1;
                }
                if (c == 0) {
                    return 0;
                } else {
                    return -1;
                }
            }
    
        }
    
        private static class MinComparator extends Comparator {
    
            private static final long serialVersionUID = 1L;
    
            @Override
            public  int isExtremal(Comparable o1, R o2) {
                return o1.compareTo(o2) < 0 ? 1 : 0;
            }
    
        }
    }
  • Comparator는 Serializable 인터페이스를 실현하고 isExtremal 추상적인 방법을 정의했으며 getForAggregation 공장 방법을 제공하여 서로 다른 AggregationType에 따라 서로 다른 Comparator
  • 를 창설했다.
  • Comparator에서 MaxComparator, MinComparator, MinByComparator, MaxByComparator 네 개의 하위 클래스를 정의했는데 모두 isExtremal 방법을 실현했다
  • MaxComparator는Comparable 인터페이스에서 정의한compareTo 방법을 직접 이용하지만 그의 반환은 0과 1이고compareTo가 0보다 크면 1을 반환한다. 그렇지 않으면 0을 반환한다. 즉 0보다 크면 1을 반환하고 그렇지 않으면 0을 반환한다.MaxByComparator도 먼저Comparable 인터페이스에 정의된compareTo 방법에 따라 값을 얻지만 그의 반환값은 3가지가 있는데 0보다 크면 1을 반환하고 0보다 작으면 0을 반환하며 0보다 작으면 -1을 반환한다. 즉, 크면 1을 반환하고 같은 경우 0을 반환하고 작으면 -1
  • 을 반환한다.

    소결

  • Keyed Stream의aggregation 조작은 주로sum,max,min,maxBy,minBy로 나뉘는데 그들 내부에 보호된 수식의aggregation 방법을 사용했다. 단지 그들이 만든Comparable Aggregator의AggregationType는 다르다. 각각SUM,MAX,MIN,MAXBY,MINBY
  • 이다.
  • Comparable Aggregator는Aggregation Function을 계승했고 Aggregation Function는 Reduce Function 인터페이스를 실현했다. 여기서Comparable Aggregator가 실현한 Reduce 방법은 먼저 Comparator를 빌려 두 대상을 비교한 다음byAggregate인지 여부에 따라 다른 처리를 한다. 만약byAggregate라면 비교값이 0일 때 가장 먼저 만났던 요소를 되돌려줄지 판단하고 만약에 가장 먼저 만났던 요소를 되돌려준다.그렇지 않으면 마지막에 만났던 비교값이 0이 아닐 때 비교값이 가장 큰 요소를 되돌려준다.byAggregate가 아니면 비교값이 0이면 반사법으로 후자의 값을value1로 업데이트하고 마지막으로value1
  • 을 되돌려줍니다
  • Comparator에서 MaxComparator, MinComparator, MinByComparator, MaxByComparator 네 개의 부류를 정의했는데 모두 isExtremal 방법을 실현했다.MaxComparator와 MaxByComparator의 차이점은 MaxComparator가 되돌아오는 1보다 크고 0보다 작으며 MaxByComparator의 되돌아오는 값이 더욱 정교하다는 것이다. 되돌아오는 1보다 크고 0보다 작으며 되돌아오는 -1보다 작다는 것이다.이 차이도ComparableAggregator의 Reduce 방법에 나타난다. 또한maxBy,minBy는 다른 방법보다 퍼스트boolean 파라미터가 하나 더 많아서 비교값이 0일 때 어떤 요소를 되돌려줄지 선택하는 데 사용된다.한편,reduce 방법은byAggregate 작업이 아닌 경우에는value1을 항상 되돌려줍니다. 비교값이 같을 때 반사 업데이트value1을 사용하고value1
  • 을 되돌려줍니다.

    doc

  • DataStream Transformations
  • 좋은 웹페이지 즐겨찾기