flink 소비kafka 데이터
2210 단어 flink
org.apache.flink
flink-scala_2.11
1.7.2
org.apache.flink
flink-streaming-scala_2.11
1.7.2
org.apache.flink
flink-connector-kafka_2.11
1.7.2
import java.util.Properties
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object FlinKafka {
def main(args: Array[String]): Unit = {
//
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
//kafak
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "a")
// Kafka
//Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
// or 09 for Kafka 0.9.0.x versions, etc.
// or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("t1", new SimpleStringSchema(), properties))
val stream2 = stream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
stream2.addSink(tup=>{ //sink , action
println(tup._1+", count-> ",tup._2)
})
//
env.execute("test kafka")
}
}
### : kafka ---------
wang@wang-pc:~$ kafka-console-producer.sh --broker-list localhost:9092 --topi1 t1
>a a a
>b c
### : --------
(a, count-> ,1)
(a, count-> ,2)
(a, count-> ,3)
(c, count-> ,1)
(b, count-> ,1)
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[case52] flink Keyed Stream의 aggregation 작업에 대해 이야기합니다.flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java Keyed Stream의agg...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.