[오픈소스 기여][Armeria] no-op subscribe 기능 추가하기

안녕하세요!🖐

오픈소스 Armeria의 이슈를 해결하는 과정을 공유하고자 합니다.

이 글을 읽는 분들께서 제가 컨트리뷰터로 활동하는 Armeria에 관심이 생기셨으면 하는 마음과 기여를 하며 배운점을 기록하고 같은 실수를 방지하기위한 목적으로 글을 작성하게 되었습니다.

이슈 선정


Provide a method to do a no-op subscribe to a StreamMessage #4145

저번 첫 기여에 이어 이번에도 StreamMessage에 기능을 추가하는 이슈입니다.

StreamMessage에 대해선 저번 기여를 통해 꽤 익숙해졌기에 바로 댓글을 달아 이슈해결에 도전했습니다. 😃

이슈를 읽어보니 StreamMessage를 사용할때 데이터를 전부 넘겨주는 subscribe를 간단하게 하고싶은데 지금의 상태론 넘길때마다 특정 Subscriber를 만들어서 조건을 설정해줘야하는 불편함이 있다고 하네요! 이를 해결하기 위해 ReactorFlux#subscribe 처럼 no-op한 구독기능을 추가하셨으면 한다고 합니다.

어떻게 해결하는게 좋을까요? 흠...

제시해주신 Subscriber를 만들어주거나 Fluxsubscribe기능을 참고하면 될 것 같은데요. trustin님께서 Flux 처럼 이름을 subscribe로 만들면 좋겠다고 하시니
StreamMessage.아무기능.subscribe() 이런식으로 사용하고 CompletableFuture<Void>를 반환하도록 만들면 될것같습니다. 저는 우선 Fluxsubscribe기능에 대해서 알아봤습니다

여기서 Flux는 리액터의 Publisher로 Armeria의 StreamMessage와 같은 역할을하는 추상클래스 입니다.

Reactor Flux#subscribe

subscribe()를 사용하면 모든데이터를 request하고 구독을 취소하기위한 Disposable을 반환합니다. 오류처리를 지정하지 않으므로 다른변형을 고려해야합니다.

Flux#subscribe의 기능을 알아보니 이슈에서 제시해주신 Subscribersubscribe받는다면 Flux#subscribe의 기능을 비슷하게 구현할 수 있을것 같았습니다.

PR


기존에 있던NoopSubscriber가 이슈에서 제시한 Subscriber와 비슷한 기능을 제공하고있어 이것을 쓰기로 하였습니다.

public final class NoopSubscriber<T> implements Subscriber<T> {

    private static final NoopSubscriber<?> INSTANCE = new NoopSubscriber<>();

    /**
     * Returns a singleton {@link NoopSubscriber}.
     */
    @SuppressWarnings("unchecked")
    public static <T> NoopSubscriber<T> get() {
        return (NoopSubscriber<T>) INSTANCE;
    }

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T t) {}

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
}

StreamMessage를 완료해주는 whenComplete기능을 사용하여 CompletableFuture<Void>를 반환하고 StreamMessage를 닫아주었습니다.

  default CompletableFuture<Void> subscribe() {
      subscribe(new NoopSubscriber<>());
      return whenComplete();
	}

완성된 기능을 간단한 javadoc와 테스트를 작성해서 PR을 날렸습니다.

어째 goodfirstissue 보다 간단한게 뭔가 빼먹은게 있나 걱정되는 순간이었습니다 😰

산으로 가는 방향


감사하게도🙇 메인테이너인 ikhoon님과 jrhee17님께서 코드리뷰를 해주셨는데요! 피드백에서 주요하게 고려해야할 사항은 두가지였습니다.

  • Should we use NoopSubscriber.get()?
  • We need to release / close the subscribed objects. It is not related to this PR. But there was a bug.
    Could we close the objects passed in onNext(T t)?
    You might want to check out StreamMessageUtil.closeOrAbort().
  • NoopSubscriber의 싱글톤구성을 get()을 이용하면 사용할 수 있는데 이를 사용해도 문제가 없는지
  • onNext에서 구독된 객체는 방출하거나 닫을 필요가 있는데 이를 하지 않아서 버그가 발생하는데 이를 StreamMessageUtil.closeOrAbort()를 이용해서 해결하는것

우선 NoopSubscriber를 사용하던 기존에 클래스들에게 영향이 없는지 살펴보았는데요 ..!🧐 테스트를 이용하여 디버깅을 돌려보니 큰 문제는 없는것 같아 사용해도 될 것 같습니다.

두번째 피드백을 보면 closeOrAbort라는 기능을 사용하는것을 권해주시고 있습니다. closeOrAbort는 객체를 받아 GC의 RefereceCounting 기법을 이용해 메모리에서 객체를 제거해주는 기능을 해주는것을 소스코드를 통해 알수 있었습니다. onNext를 통해 받은 객체들을 이 기능을 통해 메모리에서 제거해주면 될 것 같습니다.

(여기서부터 피드백을 잘못 이해해서 수정이 산으로 가게됩니다...⛰️⛰️)

기존의 FuseableStreamMessageStreamMessageInputStreamcloseOrAbort() 에 객체를 넘기는 방식을 참고해보니 미리 선언한 boolean값을 onError에서 true로 바꿔주고 onNext로 가게되면 그로인해 closeOrAbort 가 동작하는 방식이었기에 이 방식을 따라하고자 했습니다. 그런데 여기에 boolean값을 쓰자니 싱글톤이기 때문에 이 후에는 제대로 동작하지 않을 것 같아 CompletableFuture<Void>을 사용하고 예외적으로 완료되거나 정상적으로 완료된 CompletableFuture<Void>를 반환하려 했습니다..

public final class NoopSubscriber<T> implements Subscriber<T> {

    private static final NoopSubscriber<?> INSTANCE = new NoopSubscriber<>();

    private CompletableFuture<Void> whenSubscribed = new CompletableFuture<>();

    CompletableFuture<Void> whenSubscribed() {
        return whenSubscribed;
    }
    @Override
    public void onNext(T item) {
        requireNonNull(item, "item");
        if (whenSubscribed.isDone()) {
            StreamMessageUtil.closeOrAbort(item);
            return;
        }
    }

    @Override
    public void onError(Throwable t) {
        whenSubscribed.completeExceptionally(t);
    }

    @Override
    public void onComplete() {
        whenSubscribed.complete(null);
    }

subscribe에서도 CompletableFuture<Void> 반환하도록 만들고 Test를 만들어 돌려본결과 모두 통과하였기에 그대로 커밋을 날리게 됩니다..😰

😵‍


수정한 커밋에 대한 코드리뷰는 trustin님께서 해주셨습니다.

처음 PR에 closeOrAbort만 붙이면 된다는 피드백에 머리를 한대맞은것 같았습니다.. 😵‍

이 경험을 통해서 백문이 불여일커밋.. 협업에 있어 혼자 머리싸매고 있는것 보단 커밋한번이 더 났다는 교훈을 얻게 되었습니다..

2번째 피드백과 ikhoon님께서 해주신 커밋을 통해 PR이 merge가 될 수 있었습니다.

No-op Subscribe 기능 정리


이번 이슈를 통해 만든 기능을 정리해보겠습니다

  1. StreamMessage.subscribe() 를하면 싱글톤인 NoopSubscriberPublisherStreamMessagesubscribe하게됩니다.
  2. Subscriber에서 onSubscribe를 통해SubscriptionPublisher의 모든 데이터를 요청합니다.
  1. onNext를 통해 Subscriber 로 데이터를 전달하게되면 closeOrAbort를 통해 메모리에서 제거됩니다.

이 기능을 통해 StreamMessage에 담긴 데이터를 이용하는 peek 같은 기능들을 보다 편리하게 사용할 수 있습니다.

마무리


배운것

  • CompletableFuture의 사용법
  • 협업엔 백문이 불여일 커밋
  • Reactor의 기능
  • Subscriber의 기능 구현 및 활용

반성

  • 아프다고 너무 늦게 수정함
  • ByteBuf를 활용하지 않고 테스트코드를 작성함

일부러 좀 어려운 이슈가 하고싶어 goodfirstissue가 아닌 이슈를 선택했는데 생각보다 쉬워서 당황했습니다ㅎㅎ 다음엔 더어려운 이슈를 해결 더 많이 성장해봐야겠습니다.

긴글봐주셔서 감사합니다! 🙇

좋은 웹페이지 즐겨찾기