Flink 맞춤형 Data Sources

3539 단어
source는 데이터를 읽는 곳입니다.Flink에서 소스를 가져오는 방법은 다음과 같습니다.

File-based:

  • readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.
  • readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

  • Socket-based:

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.

  • Collection-based:

  • fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
  • fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
  • fromElements(T ...) - Creates a data stream from the given sequence of objects. All objects must be of the same type.
  • fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
  • generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

  • Custom:
  • addSource - Attach a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer08<>(...)) . See connectors for more details.

  • 사용자 정의 데이터 소스

    package com.river.streaming;
    
    import org.apache.commons.lang3.RandomUtils;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.Random;
    
    /**
     * @author river
     * @date 2019/4/24 12:59
     **/
    public class DataSourcesTest {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.addSource(new MySource())
                        .print();
            env.execute("DataSourcesTest");
        }
    
        public static class MySource implements SourceFunction> {
            private static final long serialVersionUID = 1L;
            private volatile boolean isRunning = true;
    
            public MySource() {
                this.isRunning = true;
            }
    
            @Override
            public void run(SourceContext> sourceContext) throws Exception {
                while (isRunning){
                    Thread.sleep(1000);
                    if(new Random().nextBoolean()){
                        sourceContext.collect(Tuple2.of("frank", RandomUtils.nextInt(0,100)));
                       continue;
                    }
                    sourceContext.collect(Tuple2.of("lucy", RandomUtils.nextInt(0,100)));
                }
            }
            @Override
            public void cancel() {
                isRunning = false;
            }
        }
    }
    
    

    실행 결과
    1> (lucy,77) 2> (lucy,95) 3> (lucy,6) 4> (lucy,40) 5> (frank,15) 6> (lucy,11) 7> (lucy,12) .... ....
    참조 주소https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources

    좋은 웹페이지 즐겨찾기