Apache Flink (3): Flink 도 킹 DataSource
8282 단어 빅 데이터
DataSource 는 스 트림 컴 퓨 팅 의 입력 을 지정 합 니 다. 사용 자 는 flink 실행 환경 streamExecution Environment 의 addSource () 방법 으로 데이터 원본 을 추가 할 수 있 습 니 다. Flink 는 일부 DataSource 의 실현 을 미리 실현 하 였 습 니 다. 사용자 가 자신의 데이터 원본 을 사용자 정의 해 야 한다 면 SourceFunction 인터페이스 (비 병렬 Source) 나 ParallelSourceFunction 인터페이스 를 실현 할 수 있 습 니 다.(병렬 소스 구현) 또는 RichParallelSourceFunction 계승 (병렬 소스 구현 및 상태 조작 지원).
File Based: 텍스트 파일 을 입력 원 으로 합 니 다.
readTextFile (path) - 텍스트 파일 을 읽 습 니 다. 바 텀 은 TextInputFormat 한 줄 을 통 해 파일 데 이 터 를 읽 습 니 다. 되 돌아 오 는 것 은 DataStream [String] 입 니 다. - 한 번 만 처리 합 니 다.
//1. StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2. DataStream -
val filePath="file:///D:\\data"
val dataStream: DataStream[String] = fsEnv.readTextFile(filePath)
//3.
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
readFile (fileInputFormat, path) - 텍스트 파일 읽 기, 바 텀 지정 입력 형식 - 한 번 만 처리
//1. StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2. DataStream -
val filePath="file:///D:\\data"
val inputFormat = new TextInputFormat(null)
val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath)
//3.
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
readFile (fileInputFormat, path, watchType, interval, pathFilter) - 상기 두 가지 방법 은 모두 이 방법 입 니 다.
//1. StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2. DataStream -
val filePath="file:///D:\\data"
val inputFormat = new TextInputFormat(null)
inputFormat.setFilesFilter(new FilePathFilter {
override def filterPath(path: Path): Boolean = {
if(path.getName().startsWith("1")){ //
return true
}
false
}
})
val dataStream: DataStream[String] = fsEnv.readFile(inputFormat,filePath,
FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
//3.
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
정기 적 으로 파일 을 검색 합 니 다. 파일 내용 이 수정 되면 이 파일 은 완전히 다시 읽 힙 니 다. 따라서 중복 계산 이 발생 할 수 있 습 니 다.
Collection: 집합 을 데이터 원본 으로 합 니 다.
//1. StreamExecutionEnvironment
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2. DataStream -
val dataStream: DataStream[String] = fsEnv.fromCollection(List("this is a demo","hello world"))
//3.
dataStream.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
사용자 정의 데이터 원본
class UserDefineDataSource extends ParallelSourceFunction[String]{
val lines = Array("Hello Flink", "Hello Spark", "Hello Scala")
@volatile
var isRunning = true
//
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (isRunning){
Thread.sleep(1000)
sourceContext.collect(lines(new Random().nextInt(lines.length)))
}
}
//
override def cancel(): Unit = {
isRunning = false
}
}
object FlinkUserDefineSource {
def main(args: Array[String]): Unit = {
// 1. StreamExecutionEnvironment
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment
//
val dataStream : DataStream[String] = flinkEnv.addSource[String](
new UserDefineDataSource
)
dataStream
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(0)
.sum(1)
.print()
//
flinkEnv.execute("FlinkWordCount")
}
}
Flink 도 킹 Kafka 데이터 원본
관련 의존 도입
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.8.1version>
dependency>
인 스 턴 스 코드
object FlinkKafkaSourceSimple{
def main(args: Array[String]): Unit = {
// 1. StreamExecutionEnvironment
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 2. DataStream
val prop = new Properties()
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Spark:9092")
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")
// kafka value
val dataStream : DataStream[String] = flinkEnv.addSource[String](
new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), prop)
)
dataStream
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(0)
.sum(1)
.print()
//
flinkEnv.execute("FlinkWordCount")
}
}
상기 코드 는 value 정보 만 얻 을 수 있 습 니 다. 만약 에 사용자 가 key / offset / partition 등 다른 정 보 를 얻 으 려 면 KafkaDeserializationSchema 를 맞 춰 야 합 니 다.
Kafka 레코드 메타 데이터 정보 가 져 오기
class UserDefineKafkaSchema extends KafkaDeserializationSchema[(Int, Long, String, String, String)]{
override def isEndOfStream(t: (Int, Long, String, String, String)): Boolean = {
false
}
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]):
(Int, Long, String, String, String) = {
// key
if(consumerRecord.key() == null){
(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.topic(),
"", new String(consumerRecord.value()))
}else{
(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.topic(),
StringUtils.arrayToString(consumerRecord.key()), new String(consumerRecord.value()))
}
}
//
override def getProducedType: TypeInformation[(Int, Long, String, String, String)] = {
createTypeInformation[(Int, Long, String, String, String)]
}
}
인 스 턴 스 코드
object FlinkKafkaSourceComplex {
def main(args: Array[String]): Unit = {
// 1. StreamExecutionEnvironment
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 2. DataStream
val prop = new Properties()
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Spark:9092")
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")
// kafka
val dataStream = flinkEnv.addSource[(Int, Long, String, String, String)](
new FlinkKafkaConsumer[(Int, Long, String, String, String)]("flink", new UserDefineKafkaSchema, prop)
)
dataStream.print()
//
flinkEnv.execute("FlinkWordCount")
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.