[스프링 리액티브] Operator

🌱 토비의 봄 TV 스프링 리액티브 프로그래밍을 시청한 후 학습한 내용을 정리하고 기록하기 위해 작성하는 포스팅입니다.

리액티브 프로그래밍에서 Operator는 기존에 pub-sub가 바로 연결되는 구조에서 중간에 다양한 Operator들을 추가하여 데이터를 다양한 방식으로 가공해주는 역할을 한다.

Operator가 포함된 Pub/Sub 흐름
Publisher -> Data1 -> Operator1 -> Data2 -> Operator2 -> Data3 -> Subscriber

사실 원래 Publisher 인터페이스의 구현체인 Flux/Mono에서는 자바 스트림과 같은 형식으로 손쉽게 데이터를 가공해줄 수 있다.

Flux.<Integer>create(e -> {
            e.next(1);
            e.next(2);
            e.next(3);
            e.complete();
        }).log().map(s->s*10).reduce(0, (a,b) -> a+b).log().subscribe();

또 Subscriber의 경우 직접 만들어주지 않아도, 스프링에서 필요한 시점에 알아서 만들고 subscribe하고 어떤 이벤트가 일어난 것을 탐지해서 Publisher의 request를 실행해주는 것까지 알아서 해준다.

그러나 오늘 포스팅에서는 이렇게 내부적으로 구현되고 추상화되어 쉽게 사용할 수 있는 라이브러리를 사용하기 이전에, 이런 Operator들이 어떻게 체이닝되어 구현되어있는지를 코드를 통해 알아보려 한다.

우선 가장 간단한 pub-sub 구조는 다음과 같이 구현된다.

Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
pub.subscribe(logSub());
private static <T> Subscriber<T> logSub() {
        return new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("subscribe");
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(T val) {
                System.out.println("job: " + val);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("error detected");
            }

            @Override
            public void onComplete() {
                System.out.println("completed");
            }
        };
    }

    private static Publisher<Integer> iterPub(Iterable<Integer> iter) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                sub.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        try {
                            iter.forEach(s -> sub.onNext(s));
                            sub.onComplete();
                        } catch (Throwable t) {
                            sub.onError(t);
                        }
                    }

                    @Override
                    public void cancel() {
                        System.out.println("job canceled");
                    }
                });
            }
        };
    }

예시를 단순화하기 위해 request가 한 번 호출되면 모든 작업이 처리되고 complete가 호출되도록 구성해주었다.

이 상태에서 만약

Publisher -> Data1 -> map func1 -> Data2 -> map func2 -> Data3 -> Subscriber

와 같은 흐름을 만들어주고 싶다면, 우선 Map을 수행하는 Operator를 만들어줘야한다.

private static Publisher<Integer> mapPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
        return new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> sub) {
                //pub: 본인 왼쪽의 pub
                pub.subscribe(new DelegateSub(sub) {
                    @Override
                    public void onNext(Integer integer) {
                        sub.onNext(f.apply(integer));
                    }
                });
            }
        };
    }
public class DelegateSub implements Subscriber<Integer> {
    Subscriber sub;

    public DelegateSub(Subscriber sub) {
        this.sub = sub;
    }

    @Override
    public void onSubscribe(Subscription s) {
        sub.onSubscribe(s);
    }

    @Override
    public void onNext(Integer integer) {
        sub.onNext(integer);
    }

    @Override
    public void onError(Throwable t) {
        sub.onError(t);
    }

    @Override
    public void onComplete() {
        sub.onComplete();
    }
}

이때 mapPub도 원래 Pub처럼 subscribe를 overriding해서 구현해주는데, 다른 점은 subscribe 메서드 안에서 sub.onSubscribe(subscription객체)를 해주는 것이 아니라, pub.subscribe(..)를 해준다는 것이다.
이 코드를 통해 원래 Subscriber를 DelegateSub 객체에 넘겨줌으로써 원래 Subscriber로 데이터를 다 넘겨주되, onNext 메서드에서 별도의 조작을 통해 map을 수행해준다.

Publisher<Integer> pub = iterPub(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, s -> s * 10);
Publisher<Integer> map2Pub = mapPub(mapPub, s -> -s);
map2Pub.subscribe(logSub());

결국 pub/sub 구조는 내부적으로 이와 같은 방식으로 데이터를 가공하여 subscriber에 전달해줄 수 있다. sum, reduce 등은 아래와 같이 map과는 조금 다르게 동작하지만, 기본적인 동작 방식은 모두 같다.

private static <T, R> Publisher<R> reducePub(Publisher<T> pub, R init, BiFunction<R, T, R> bf) {
        return new Publisher<R>() {
            @Override
            public void subscribe(Subscriber<? super R> sub) {
                pub.subscribe(new DelegateSub<T, R>(sub){
                    R result = init;

                    @Override
                    public void onNext(T integer) {
                        // 이렇게 하면 + 아니라도 가능
                        result = bf.apply(result, integer);
                    }

                    @Override
                    public void onComplete() {
                        sub.onNext(result);
                        sub.onComplete();
                    }
                });
            }
        };
    }

마블 다이어그램

1) buffer

2) reduce

3) map

좋은 웹페이지 즐겨찾기