reactor: Synchronous generate

원문: reactor reference

Flux를 프로그래밍 방식으로 생성하는 가장 간단한 방법은 generator 함수를 사용하는 generate 메서드를 사용하는 것이다.
generate는 동기적으로 한번에 하나씩 발행(emission)한다. 여기서 사용되는 싱크(sink)는 SynchronousSink이며 콜백 함수가 호출될 때에 최대 한 번 싱크의 next() 메소드를 호출할 수 있다. 추가적으로 error(Throwable) 이나 complete()를 호출할 수 도 있다.

가장 유용한 메소드의 변형은 다음번에 무엇을 발행할지 결정하기 위해 싱크가 참조할 수 있는 state를 유지할 수 있게 해주는 변형일 것이다. generator 함수는 BiFunction<S, SynchronousSink<T>, S>이다.<S>는 state 객체의 타입이고, <T>는 발행할 데이터의 타입이고, BiFunction은 state 객체를 반환해야 한다. 초기 상태를 위한 Supplier<S>를 제공해야 하며 generator 함수는 이제 각 라운드에서 내부에서 데이터를 발행하고 새 state를 반환한다.

int를 state로 사용하는 "구구단" 예제이다.

Flux<String> flux = Flux.generate(
    () -> 0, // 1
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); // 2
      if (state == 10) sink.complete();  // 3
      return state + 1; // 4
    }
);

(1) 초기 state를 0으로 제공한다.
(2) state를 사용해서 무엇을 발행할지 결정한다.
(3) 종료해야 할지 판단하는데 사용한다.
(4) 다음 번 호출에 사용할 새로운 state를 반환한다.

결과는 다음과 같다.

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

<S>로 가변 객체를 사용할 수도 있다. 위와 동일한 예제를 AtomicLong을 사용하여 재작성하였다.

Flux<String> flux = Flux.generate(
    AtomicLong::new, 
    (state, sink) -> {
      long i = state.getAndIncrement(); 
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; 
    }
);

state 객체를 사용해서 리소스 정리 작업을 수행해야 한다면 generate(Supplier<S>, BiFunction, Consumer<S>)를 사용하면 된다. 마지막 state 객체가 Consumer에게 전달된다.
위의 예제에 Consumer를 추가한 버전이다.

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { 
      long i = state.getAndIncrement(); 
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; 
    },
    (state) -> System.out.println("state: " + state) 
);

Consumer에는 마지막 state인 11이 전달된다. 이 람다식은 generate 처리가 완료될 때 한 번만 실행된다.

좋은 웹페이지 즐겨찾기