DataStream API 기반 flink 프로그램 TopN 구현

배경 설명:
5 분 간격 으로 지난 한 시간의 인기 상품 을 집계 한다.
데이터 세트:
https://tianchi.aliyun.com/dataset/dataDetail?dataId=60747
코드:
데이터 구조:
package com.flink.topn;

public class UserDataBean {
    public long userId;         //   ID
    public long itemId;         //   ID
    public int categoryId;      //     ID
    public String behavior;     //     ,   ("pv", "buy", "cart", "fav")
    public long timestamp;      //         ,   
}

주 함수
package com.flink.topn;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.io.File;
import java.net.URL;

public class HotGoodTopN {
    public static void main(String[] args) {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

            URL fileUrl = HotGoodTopN.class.getClassLoader().getResource("UserBehavior.csv");
            System.out.println(fileUrl);
            Path path = Path.fromLocalFile(new File(fileUrl.toURI()));
            PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserDataBean.class);
            String[] fieldInfo = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
            PojoCsvInputFormat csvInputFormat = new PojoCsvInputFormat<>(path, typeInfo, fieldInfo);
            DataStream input = env.createInput(csvInputFormat, typeInfo);

            //  watermark
            DataStream operator = input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
                @Override
                public long extractAscendingTimestamp(UserDataBean userDataBean) {
                    return userDataBean.timestamp * 1000;
                }
            });

            //            
            DataStream pv = operator.filter(new FilterFunction() {
                @Override
                public boolean filter(UserDataBean userDataBean) throws Exception {
                    return userDataBean.behavior.equals("pv");
                }
            });
            
            //          ,    ,     1h, 5m    
            DataStream itemIdData = pv.keyBy("itemId").window(SlidingEventTimeWindows.of(Time.hours(1L), Time.minutes(5L)))
                    .aggregate(new CountGoodNum(), new CountResultWindow());

            //             ,        topN    
            DataStream process = itemIdData.keyBy("windowEnd").process(new TopNGoodsInfo(3));
            process.print();

            env.execute("HotGoodTopN");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

중간 출력 데이터 구조 POJO:
package com.flink.topn;

public class CountResultWindowOutput {
    public long itemId;     //   ID
    public long windowEnd;  //        
    public long viewCount;  //       

    //          ,         ( flink pojo         ,      key  )
    /**
     * org.apache.flink.api.common.InvalidProgramException: This type (GenericType) cannot be used as key.
     * 	at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)
     * 	at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
     * 	at com.suning.flink.topn.HotGoodTopN.main(HotGoodTopN.java:55)
     */
    public CountResultWindowOutput() {
    }

    public CountResultWindowOutput(long itemId, long windowEnd, long viewCount) {
        this.itemId = itemId;
        this.windowEnd = windowEnd;
        this.viewCount = viewCount;
    }
}

창 함수:
1. 증 량 집합 함수:
package com.flink.topn;

import org.apache.flink.api.common.functions.AggregateFunction;

public class CountGoodNum implements AggregateFunction {

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(UserDataBean userDataBean, Long acc) {
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
        return acc1 + acc2;
    }
}

2. 출력 창 함수:
package com.flink.topn;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class CountResultWindow implements WindowFunction {
    @Override
    public void apply(Tuple key, TimeWindow timeWindow, Iterable iterable,
                      Collector collector) throws Exception {

        Long itemId = (Long) ((Tuple1) key).f0;
        collector.collect(new CountResultWindowOutput(itemId, timeWindow.getEnd(), iterable.iterator().next()));
    }
}

3. 최종 출력 창 함수:
package com.flink.topn;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;

public class TopNGoodsInfo extends KeyedProcessFunction {
    private int topSize = 0;
    //        
    private ListState goodsState;

    public TopNGoodsInfo(int topSize) {
        this.topSize = topSize;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor stateDescriptor = new ListStateDescriptor<>("goods_state", CountResultWindowOutput.class);
        goodsState = this.getRuntimeContext().getListState(stateDescriptor);
    }

    @Override
    public void processElement(CountResultWindowOutput countResultWindowOutput, Context context, Collector collector) throws Exception {
        goodsState.add(countResultWindowOutput);
        context.timerService().registerEventTimeTimer(countResultWindowOutput.windowEnd + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        ArrayList list = new ArrayList<>();
        for (CountResultWindowOutput goodsInfo : goodsState.get()) {
            list.add(goodsInfo);
        }
        list.sort(new Comparator() {
            @Override
            public int compare(CountResultWindowOutput o1, CountResultWindowOutput o2) {
                return (int) (o2.viewCount - o1.viewCount);
            }
        });

        //       
        StringBuilder builder = new StringBuilder();
        builder.append("====================================
"); builder.append(" : ").append(new Timestamp(timestamp - 1)).append("
"); for (int i = 0; i < topSize && i < list.size(); i++) { CountResultWindowOutput countResultWindowOutput = list.get(i); builder.append("No").append(i + 1).append(":") .append(" ID=").append(countResultWindowOutput.itemId) .append(" =").append(countResultWindowOutput.viewCount) .append("
"); } builder.append("====================================
"); out.collect(builder.toString()); } @Override public void close() throws Exception { super.close(); } }

참고:https://ververica.cn/developers/computing-real-time-hot-goods/

좋은 웹페이지 즐겨찾기