flink 소비kafka 데이터

2210 단어 flink
  • maven 구성
  • 
        org.apache.flink
        flink-scala_2.11
        1.7.2
    
    
    
        org.apache.flink
        flink-streaming-scala_2.11
        1.7.2
    
    
    
        org.apache.flink
        flink-connector-kafka_2.11
        
        1.7.2
    
    
    
  • 코드는 다음과 같다
  • import java.util.Properties
    
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    
    object FlinKafka {
      def main(args: Array[String]): Unit = {
        //         
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    
        //kafak  
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "localhost:9092")
        properties.setProperty("zookeeper.connect", "localhost:2181")
        properties.setProperty("group.id", "a")
    
        //  Kafka  
        //Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
        // or 09 for Kafka 0.9.0.x versions, etc.
        // or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
        val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("t1", new SimpleStringSchema(), properties))
        val stream2 = stream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
        stream2.addSink(tup=>{ //sink  ,  action
          println(tup._1+", count->  ",tup._2)
        })
    
        //    
        env.execute("test kafka")
      }
    }
    
    ###       : kafka   ---------
    wang@wang-pc:~$ kafka-console-producer.sh --broker-list localhost:9092 --topi1 t1
    >a a a
    >b c
    
    ###         : --------
    (a, count->  ,1)
    (a, count->  ,2)
    (a, count->  ,3)
    (c, count->  ,1)
    (b, count->  ,1)
    
    

    좋은 웹페이지 즐겨찾기