reactor: Synchronous generate
Flux
를 프로그래밍 방식으로 생성하는 가장 간단한 방법은 generator 함수를 사용하는 generate
메서드를 사용하는 것이다.
generate
는 동기적으로 한번에 하나씩 발행(emission)한다. 여기서 사용되는 싱크(sink)는 SynchronousSink
이며 콜백 함수가 호출될 때에 최대 한 번 싱크의 next()
메소드를 호출할 수 있다. 추가적으로 error(Throwable)
이나 complete()
를 호출할 수 도 있다.
가장 유용한 메소드의 변형은 다음번에 무엇을 발행할지 결정하기 위해 싱크가 참조할 수 있는 state를 유지할 수 있게 해주는 변형일 것이다. generator 함수는 BiFunction<S, SynchronousSink<T>, S>
이다.<S>
는 state 객체의 타입이고, <T>
는 발행할 데이터의 타입이고, BiFunction
은 state 객체를 반환해야 한다. 초기 상태를 위한 Supplier<S>
를 제공해야 하며 generator 함수는 이제 각 라운드에서 내부에서 데이터를 발행하고 새 state를 반환한다.
int
를 state로 사용하는 "구구단" 예제이다.
Flux<String> flux = Flux.generate(
() -> 0, // 1
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); // 2
if (state == 10) sink.complete(); // 3
return state + 1; // 4
}
);
(1) 초기 state를 0으로 제공한다.
(2) state를 사용해서 무엇을 발행할지 결정한다.
(3) 종료해야 할지 판단하는데 사용한다.
(4) 다음 번 호출에 사용할 새로운 state를 반환한다.
결과는 다음과 같다.
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
<S>
로 가변 객체를 사용할 수도 있다. 위와 동일한 예제를 AtomicLong
을 사용하여 재작성하였다.
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
}
);
state 객체를 사용해서 리소스 정리 작업을 수행해야 한다면 generate(Supplier<S>, BiFunction, Consumer<S>)
를 사용하면 된다. 마지막 state 객체가 Consumer
에게 전달된다.
위의 예제에 Consumer
를 추가한 버전이다.
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
},
(state) -> System.out.println("state: " + state)
);
Consumer
에는 마지막 state인 11이 전달된다. 이 람다식은 generate
처리가 완료될 때 한 번만 실행된다.
Author And Source
이 문제에 관하여(reactor: Synchronous generate), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@nkjang/reactor-Synchronous-generate저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)