Apache Flink (3): Flink 도 킹 DataSource

8282 단어 빅 데이터
DataSource
DataSource 는 스 트림 컴 퓨 팅 의 입력 을 지정 합 니 다. 사용 자 는 flink 실행 환경 streamExecution Environment 의 addSource () 방법 으로 데이터 원본 을 추가 할 수 있 습 니 다. Flink 는 일부 DataSource 의 실현 을 미리 실현 하 였 습 니 다. 사용자 가 자신의 데이터 원본 을 사용자 정의 해 야 한다 면 SourceFunction 인터페이스 (비 병렬 Source) 나 ParallelSourceFunction 인터페이스 를 실현 할 수 있 습 니 다.(병렬 소스 구현) 또는 RichParallelSourceFunction 계승 (병렬 소스 구현 및 상태 조작 지원).
File Based: 텍스트 파일 을 입력 원 으로 합 니 다.
readTextFile (path) - 텍스트 파일 을 읽 습 니 다. 바 텀 은 TextInputFormat 한 줄 을 통 해 파일 데 이 터 를 읽 습 니 다. 되 돌아 오 는 것 은 DataStream [String] 입 니 다. - 한 번 만 처리 합 니 다.
    //1.  StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.  DataStream -  
    val filePath="file:///D:\\data"
    val dataStream: DataStream[String] = fsEnv.readTextFile(filePath)
    //3.      
    dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")

readFile (fileInputFormat, path) - 텍스트 파일 읽 기, 바 텀 지정 입력 형식 - 한 번 만 처리
    //1.  StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.  DataStream -  
    val filePath="file:///D:\\data"
    val inputFormat = new TextInputFormat(null)
    val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath)
    //3.      
    dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")

readFile (fileInputFormat, path, watchType, interval, pathFilter) - 상기 두 가지 방법 은 모두 이 방법 입 니 다.
     //1.  StreamExecutionEnvironment
        val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
        //2.  DataStream -  
        val filePath="file:///D:\\data"
        val inputFormat = new TextInputFormat(null)
    
        inputFormat.setFilesFilter(new FilePathFilter {
          override def filterPath(path: Path): Boolean = {
            if(path.getName().startsWith("1")){ //        
              return true
            }
            false
          }
        })
        val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath,
          FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
        //3.      
        dataStream.flatMap(_.split("\\s+"))
          .map((_,1))
          .keyBy(0)
          .sum(1)
          .print()
    
        fsEnv.execute("FlinkWordCountsQuickStart")
    

정기 적 으로 파일 을 검색 합 니 다. 파일 내용 이 수정 되면 이 파일 은 완전히 다시 읽 힙 니 다. 따라서 중복 계산 이 발생 할 수 있 습 니 다.
Collection: 집합 을 데이터 원본 으로 합 니 다.
    //1.  StreamExecutionEnvironment
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.  DataStream -  
    val dataStream: DataStream[String] = fsEnv.fromCollection(List("this is a demo","hello world"))
    //3.      
    dataStream.flatMap(_.split("\\s+"))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")

사용자 정의 데이터 원본
class UserDefineDataSource extends ParallelSourceFunction[String]{

  val lines = Array("Hello Flink", "Hello Spark", "Hello Scala")

  @volatile
  var isRunning = true

  //   
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(1000)
      sourceContext.collect(lines(new Random().nextInt(lines.length)))
    }
  }

  //   
  override def cancel(): Unit = {
    isRunning = false
  }

}
object FlinkUserDefineSource {

  def main(args: Array[String]): Unit = {

    // 1.  StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    //            
    val dataStream : DataStream[String] = flinkEnv.addSource[String](
      new UserDefineDataSource
    )

    dataStream
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
      .print()

    //     
    flinkEnv.execute("FlinkWordCount")

  }

}

Flink 도 킹 Kafka 데이터 원본
관련 의존 도입
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-connector-kafka_2.11artifactId>
        <version>1.8.1version>
    dependency>

인 스 턴 스 코드
object FlinkKafkaSourceSimple{

  def main(args: Array[String]): Unit = {

    // 1.  StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.  DataStream
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Spark:9092")
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

    //     kafka  value
    val dataStream : DataStream[String] = flinkEnv.addSource[String](
      new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), prop)
    )

    dataStream
        .flatMap(_.split("\\s+"))
        .map((_, 1))
        .keyBy(0)
        .sum(1)
        .print()

    //     
    flinkEnv.execute("FlinkWordCount")

  }

}

상기 코드 는 value 정보 만 얻 을 수 있 습 니 다. 만약 에 사용자 가 key / offset / partition 등 다른 정 보 를 얻 으 려 면 KafkaDeserializationSchema 를 맞 춰 야 합 니 다.
Kafka 레코드 메타 데이터 정보 가 져 오기
   class UserDefineKafkaSchema extends KafkaDeserializationSchema[(Int, Long, String, String, String)]{

  override def isEndOfStream(t: (Int, Long, String, String, String)): Boolean = {
    false
  }

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]):
  (Int, Long, String, String, String) = {
    //   key  
    if(consumerRecord.key() == null){
      (consumerRecord.partition(), consumerRecord.offset(), consumerRecord.topic(),
        "", new String(consumerRecord.value()))
    }else{
      (consumerRecord.partition(), consumerRecord.offset(), consumerRecord.topic(),
        StringUtils.arrayToString(consumerRecord.key()), new String(consumerRecord.value()))
    }

  }

  //       
  override def getProducedType: TypeInformation[(Int, Long, String, String, String)] = {
    createTypeInformation[(Int, Long, String, String, String)]
  }

}
    

인 스 턴 스 코드

object FlinkKafkaSourceComplex {

  def main(args: Array[String]): Unit = {

    // 1.  StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.  DataStream
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Spark:9092")
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")

    //     kafka        
    val dataStream = flinkEnv.addSource[(Int, Long, String, String, String)](
      new FlinkKafkaConsumer[(Int, Long, String, String, String)]("flink", new UserDefineKafkaSchema, prop)
    )

    dataStream.print()

    //     
    flinkEnv.execute("FlinkWordCount")

  }

}

좋은 웹페이지 즐겨찾기