flink Datastream 어셈블리

4513 단어 flink
transformation은 flink에서stream의 정적 대상으로sink와source를 포함하는transformation를 조립하여 정의된 코드에 따라stream의 정적 토폴로지도를 구성할 수 있다. 다음과 같다.
*   Source              Source
*      +                   +
*      |                   |
*      v                   v
*  Rebalance          HashPartition
*      +                   +
*      |                   |
*      |                   |
*      +------>Union

그 중에서 Source와 Sink는 토폴로지의 시작점과 끝점이고 그 중에서 spilt,select는 중간에 데이터를 구체적으로 조작하는transformation이다.
또한 DataStream 클래스는 다음과 같이 생성된 stream 데이터 추상화입니다.
protected final StreamExecutionEnvironment environment;

protected final StreamTransformation transformation;

그 중에서enviroment는 전체 흐름의 상하문으로 그 중의 수조의 형식으로 이전의 모든transformation을 순서대로 저장하고transformation는 현재strean이 조립하는 과정에서 논리적으로 마지막transformation이다.
 
다음은 간단한stream 토폴로지의 예입니다.
*  Source              Source
*    +                   +
*    |                   |
*    |                   |
*    +------->Map

그 중의 맵도 구체적인streamTransformation으로 DataStream에서 맵 () 방법을 호출하여stream 작업에 구체적으로 추가합니다.
public  SingleOutputStreamOperator map(MapFunction mapper) {

   TypeInformation outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
         Utils.getCallLocationName(), true);

   return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

이 곳에서 현재 DataStream이 맵 () 방법을 호출할 때, 우선 현재 DataStream의treanformation에서 출력된 데이터 형식을 얻어 그 데이터 형식에 따라 새로운transformation의 입력 데이터 형식으로 사용해야 한다.
맵 논리를 구체적으로 구현하는 MapFunction을 매개 변수로 사용해야 합니다. 그렇지 않으면 의미가 없습니다.
이후 이 데이터 유형을 매개 변수의 일부로 삼아 매개 변수의 MapFunction에 따라 새로운 StreamMap, 즉 동적 조작부호를 생성하고transform() 방법을 통해 새로운 DataStream을 얻을 수 있다.
@PublicEvolving
public  SingleOutputStreamOperator transform(String operatorName, TypeInformation outTypeInfo, OneInputStreamOperator operator) {

   // read the output type of the input Transform to coax out errors about MissingTypeInfo
   transformation.getOutputType();

   OneInputTransformation resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());

   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, resultTransform);

   getExecutionEnvironment().addOperator(resultTransform);

   return returnStream;
}

여기서 하나의 맵 작업이기 때문에 자연 입력과 출력은 하나만 필요하기 때문에 먼저 OneInputTransformation을 생성합니다. 이곳의 매개 변수는 전의transformation과 현재의 맵 작업과 입력 데이터 형식, 그리고 현재 상하문 환경의 병행도를 필요로 합니다.
마지막으로 생성된tramsform과 상하문에서 새로운 DataStream을 생성하여 되돌려줍니다. 새로운 DataStream으로 아래의transform을 조립합니다.
 
또 다른 예는 유니언 작업이다. 이 작업은 두 개의 다른 DataStream을 통합할 것이다.
@SafeVarargs
public final DataStream union(DataStream... streams) {
   List> unionedTransforms = new ArrayList<>();
   unionedTransforms.add(this.transformation);

   for (DataStream newStream : streams) {
      if (!getType().equals(newStream.getType())) {
         throw new IllegalArgumentException("Cannot union streams of different types: "
               + getType() + " and " + newStream.getType());
      }

      unionedTransforms.add(newStream.getTransformation());
   }
   return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
}
public UnionTransformation(List> inputs) {
   super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());

   for (StreamTransformation input: inputs) {
      if (!input.getOutputType().equals(getOutputType())) {
         throw new UnsupportedOperationException("Type mismatch in input " + input);
      }
   }

   this.inputs = Lists.newArrayList(inputs);
}

병합이 필요한 모든 DataStream을 배열에 저장하고 새로운 UnionTransformation을 생성합니다. 이 배열에 병합이 필요한 모든 DataStream 배열 매개 변수를 예로 저장합니다.새로운 UnionTransformation도 새로운 DataStream의transform 매개 변수로 되돌아옵니다.
 
spilt와 select를 동시에 사용하면 분류의 목적을 달성할 수 있습니다.
spilt () 는 호출될 때 Output Select를 사용하고 select () 방법을 다시 써야 합니다. 유입된 데이터에 따라 다른 출력 결과를 되돌려주고, 다음에 select () 방법을 호출하고 매개 변수에서 정해진 되돌림 결과를 결정하면 전자의 출력 결과에 따라 지정한 흐름으로 이동합니다.

좋은 웹페이지 즐겨찾기