Flink 흐름 처리 입문 (자바)
13726 단어 Flink
package com.heiheihei.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author 1212
* @version 1.0
* @date 2020/3/31 17:22
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
//
int port = 8081;
try {
//
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port");
}
//
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
DataStream text = env.socketTextStream("localhost", port, "
");
//
DataStream<WordWithCount> windowCounts = text.flatMap((FlatMapFunction<String, WordWithCount>) (s, collector) -> {
for (String word : s.split(" ")) {
collector.collect(new WordWithCount(word, 1L));
}
});
//
windowCounts.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception {
return new WordWithCount(wordWithCount.word, wordWithCount.count + t1.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Soket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Flink On YARN 고가용 클러스터 모드 구축(flink-1.10.0-bin-scala_2.11.tgz)다운로드 주소:https://flink.apache.org/downloads.html 다운로드한 설치 패키지를 서버에 업로드하고 지정한 디렉터리에 압축을 풀십시오. 명령은 다음과 같습니다. 파일 끝에 다음과 같은 내...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.