DataStream API 기반 flink 프로그램 TopN 구현
9392 단어 Flink 공식 문서 노트
5 분 간격 으로 지난 한 시간의 인기 상품 을 집계 한다.
데이터 세트:
https://tianchi.aliyun.com/dataset/dataDetail?dataId=60747
코드:
데이터 구조:
package com.flink.topn;
public class UserDataBean {
public long userId; // ID
public long itemId; // ID
public int categoryId; // ID
public String behavior; // , ("pv", "buy", "cart", "fav")
public long timestamp; // ,
}
주 함수
package com.flink.topn;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.File;
import java.net.URL;
public class HotGoodTopN {
public static void main(String[] args) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
URL fileUrl = HotGoodTopN.class.getClassLoader().getResource("UserBehavior.csv");
System.out.println(fileUrl);
Path path = Path.fromLocalFile(new File(fileUrl.toURI()));
PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserDataBean.class);
String[] fieldInfo = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
PojoCsvInputFormat csvInputFormat = new PojoCsvInputFormat<>(path, typeInfo, fieldInfo);
DataStream input = env.createInput(csvInputFormat, typeInfo);
// watermark
DataStream operator = input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(UserDataBean userDataBean) {
return userDataBean.timestamp * 1000;
}
});
//
DataStream pv = operator.filter(new FilterFunction() {
@Override
public boolean filter(UserDataBean userDataBean) throws Exception {
return userDataBean.behavior.equals("pv");
}
});
// , , 1h, 5m
DataStream itemIdData = pv.keyBy("itemId").window(SlidingEventTimeWindows.of(Time.hours(1L), Time.minutes(5L)))
.aggregate(new CountGoodNum(), new CountResultWindow());
// , topN
DataStream process = itemIdData.keyBy("windowEnd").process(new TopNGoodsInfo(3));
process.print();
env.execute("HotGoodTopN");
} catch (Exception e) {
e.printStackTrace();
}
}
}
중간 출력 데이터 구조 POJO:
package com.flink.topn;
public class CountResultWindowOutput {
public long itemId; // ID
public long windowEnd; //
public long viewCount; //
// , ( flink pojo , key )
/**
* org.apache.flink.api.common.InvalidProgramException: This type (GenericType) cannot be used as key.
* at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)
* at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
* at com.suning.flink.topn.HotGoodTopN.main(HotGoodTopN.java:55)
*/
public CountResultWindowOutput() {
}
public CountResultWindowOutput(long itemId, long windowEnd, long viewCount) {
this.itemId = itemId;
this.windowEnd = windowEnd;
this.viewCount = viewCount;
}
}
창 함수:
1. 증 량 집합 함수:
package com.flink.topn;
import org.apache.flink.api.common.functions.AggregateFunction;
public class CountGoodNum implements AggregateFunction {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserDataBean userDataBean, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
2. 출력 창 함수:
package com.flink.topn;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class CountResultWindow implements WindowFunction {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable iterable,
Collector collector) throws Exception {
Long itemId = (Long) ((Tuple1) key).f0;
collector.collect(new CountResultWindowOutput(itemId, timeWindow.getEnd(), iterable.iterator().next()));
}
}
3. 최종 출력 창 함수:
package com.flink.topn;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
public class TopNGoodsInfo extends KeyedProcessFunction {
private int topSize = 0;
//
private ListState goodsState;
public TopNGoodsInfo(int topSize) {
this.topSize = topSize;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor stateDescriptor = new ListStateDescriptor<>("goods_state", CountResultWindowOutput.class);
goodsState = this.getRuntimeContext().getListState(stateDescriptor);
}
@Override
public void processElement(CountResultWindowOutput countResultWindowOutput, Context context, Collector collector) throws Exception {
goodsState.add(countResultWindowOutput);
context.timerService().registerEventTimeTimer(countResultWindowOutput.windowEnd + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
super.onTimer(timestamp, ctx, out);
ArrayList list = new ArrayList<>();
for (CountResultWindowOutput goodsInfo : goodsState.get()) {
list.add(goodsInfo);
}
list.sort(new Comparator() {
@Override
public int compare(CountResultWindowOutput o1, CountResultWindowOutput o2) {
return (int) (o2.viewCount - o1.viewCount);
}
});
//
StringBuilder builder = new StringBuilder();
builder.append("====================================
");
builder.append(" : ").append(new Timestamp(timestamp - 1)).append("
");
for (int i = 0; i < topSize && i < list.size(); i++) {
CountResultWindowOutput countResultWindowOutput = list.get(i);
builder.append("No").append(i + 1).append(":")
.append(" ID=").append(countResultWindowOutput.itemId)
.append(" =").append(countResultWindowOutput.viewCount)
.append("
");
}
builder.append("====================================
");
out.collect(builder.toString());
}
@Override
public void close() throws Exception {
super.close();
}
}
참고:https://ververica.cn/developers/computing-real-time-hot-goods/