reactor: 동기적인 블로킹 호출을 감싸기
B.1. How Do I Wrap a Synchronous, Blocking Call?
정보의 소스가 동기적이고 블로킹 방식인 경우가 많이 있다. Reactor 애플리케이션에서 이러한 소스를 처리하려면 다음 패턴을 적용한다.
Mono blockingWrapper = Mono.fromCallable(() -> { // 1
return /* make a remote synchronous call */ // 2
})
.subscribeOn(Schedulers.boundedElastic()); // 3
blockingWrapper.subscribe(...);
(1)
fromCallable
을 사용하여 새로운Mono
로 만든다. 소스가 하나의 값을 반환하기 때문에Mono
를 사용해야 한다.
(2) 동기적인 블로킹 호출의 결과를 반환하는Callable
람다를 작성한다.
(3) 구독할 때 마다 전용 단일 스레드에서 실행되도록 한다.Schedulers.boundedElastic()
의 워커를 지정한다.subscribeOn
은Mono
를 구독하지 않으며, 구독이 실행되는Scheduler
의 종류를 지정하는데 사용된다.
Scheduler
로는 boundedElastic
을 사용해야 한다. 다음과 같은 이유 때문이다.
- 전용 스레드를 사용하여 블로킹 리소스가 처리되기까지 대기하며, 이는 다른 논블로킹 처리에 영향을 주지 않는다.
- 생성할 수 있는 스레드의 갯수에 제한이 있어 너무 많은 스레드가 생성되지 않게 해준다.
- 블로킹 태스크가 급증하면 큐잉되어 처리 시점이 미뤄질 수 있게 해준다.
예제
Mono<Long> blockingLong = Mono
.fromCallable(
() -> {
log.info("blocking start");
Thread.sleep(1000);
log.info("blocking end");
return System.currentTimeMillis() % 100;
}
)
.subscribeOn(Schedulers.boundedElastic());
blockingLong
.doOnSubscribe(s -> log.info("onSubscribe"))
.subscribe(l -> log.info("result: " + l));
결과
18:56:10.908 [main] INFO my.ex.GenerateFlux - onSubscribe
18:56:10.910 [boundedElastic-1] INFO my.ex.GenerateFlux - blocking start
18:56:11.912 [boundedElastic-1] INFO my.ex.GenerateFlux - blocking end // 1000ms의 차이가 있다.
18:56:11.913 [boundedElastic-1] INFO my.ex.GenerateFlux - result: 12
Author And Source
이 문제에 관하여(reactor: 동기적인 블로킹 호출을 감싸기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@nkjang/reactor-동기적인-블로킹-호출을-감싸기저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)