reactor: 동기적인 블로킹 호출을 감싸기

B.1. How Do I Wrap a Synchronous, Blocking Call?

원문: reactor reference

정보의 소스가 동기적이고 블로킹 방식인 경우가 많이 있다. Reactor 애플리케이션에서 이러한 소스를 처리하려면 다음 패턴을 적용한다.

Mono blockingWrapper = Mono.fromCallable(() -> {  // 1
    return /* make a remote synchronous call */ // 2
})
.subscribeOn(Schedulers.boundedElastic()); // 3

blockingWrapper.subscribe(...);

(1) fromCallable을 사용하여 새로운 Mono로 만든다. 소스가 하나의 값을 반환하기 때문에 Mono를 사용해야 한다.
(2) 동기적인 블로킹 호출의 결과를 반환하는 Callable 람다를 작성한다.
(3) 구독할 때 마다 전용 단일 스레드에서 실행되도록 한다. Schedulers.boundedElastic()의 워커를 지정한다. subscribeOnMono를 구독하지 않으며, 구독이 실행되는 Scheduler의 종류를 지정하는데 사용된다.

Scheduler로는 boundedElastic을 사용해야 한다. 다음과 같은 이유 때문이다.

  • 전용 스레드를 사용하여 블로킹 리소스가 처리되기까지 대기하며, 이는 다른 논블로킹 처리에 영향을 주지 않는다.
  • 생성할 수 있는 스레드의 갯수에 제한이 있어 너무 많은 스레드가 생성되지 않게 해준다.
  • 블로킹 태스크가 급증하면 큐잉되어 처리 시점이 미뤄질 수 있게 해준다.

예제

Mono<Long> blockingLong = Mono
    .fromCallable(
        () -> {
            log.info("blocking start");
            Thread.sleep(1000);
            log.info("blocking end");
            return System.currentTimeMillis() % 100;
        }
    )
    .subscribeOn(Schedulers.boundedElastic());

blockingLong
    .doOnSubscribe(s -> log.info("onSubscribe"))
    .subscribe(l -> log.info("result: " + l));

결과

18:56:10.908 [main] INFO  my.ex.GenerateFlux - onSubscribe
18:56:10.910 [boundedElastic-1] INFO  my.ex.GenerateFlux - blocking start
18:56:11.912 [boundedElastic-1] INFO  my.ex.GenerateFlux - blocking end // 1000ms의 차이가 있다.
18:56:11.913 [boundedElastic-1] INFO  my.ex.GenerateFlux - result: 12

좋은 웹페이지 즐겨찾기