[오픈소스 기여][Armeria] no-op subscribe 기능 추가하기
안녕하세요!🖐
오픈소스 Armeria의 이슈를 해결하는 과정을 공유하고자 합니다.
이 글을 읽는 분들께서 제가 컨트리뷰터로 활동하는 Armeria에 관심이 생기셨으면 하는 마음과 기여를 하며 배운점을 기록하고 같은 실수를 방지하기위한 목적으로 글을 작성하게 되었습니다.
이슈 선정
Provide a method to do a no-op subscribe to a StreamMessage #4145
저번 첫 기여에 이어 이번에도 StreamMessage
에 기능을 추가하는 이슈입니다.
StreamMessage
에 대해선 저번 기여를 통해 꽤 익숙해졌기에 바로 댓글을 달아 이슈해결에 도전했습니다. 😃
이슈를 읽어보니 StreamMessage
를 사용할때 데이터를 전부 넘겨주는 subscribe
를 간단하게 하고싶은데 지금의 상태론 넘길때마다 특정 Subscriber
를 만들어서 조건을 설정해줘야하는 불편함이 있다고 하네요! 이를 해결하기 위해 Reactor
의 Flux#subscribe
처럼 no-op한 구독기능을 추가하셨으면 한다고 합니다.
어떻게 해결하는게 좋을까요? 흠...
제시해주신 Subscriber
를 만들어주거나 Flux
의 subscribe
기능을 참고하면 될 것 같은데요. trustin님께서 Flux 처럼 이름을 subscribe로 만들면 좋겠다고 하시니
StreamMessage.아무기능.subscribe()
이런식으로 사용하고 CompletableFuture<Void>
를 반환하도록 만들면 될것같습니다. 저는 우선 Flux
의 subscribe
기능에 대해서 알아봤습니다
여기서 Flux
는 리액터의 Publisher
로 Armeria의 StreamMessage
와 같은 역할을하는 추상클래스 입니다.
Reactor Flux#subscribe
subscribe()
를 사용하면 모든데이터를request
하고 구독을 취소하기위한Disposable
을 반환합니다. 오류처리를 지정하지 않으므로 다른변형을 고려해야합니다.
Flux#subscribe
의 기능을 알아보니 이슈에서 제시해주신 Subscriber
를 subscribe
받는다면 Flux#subscribe
의 기능을 비슷하게 구현할 수 있을것 같았습니다.
PR
기존에 있던NoopSubscriber
가 이슈에서 제시한 Subscriber
와 비슷한 기능을 제공하고있어 이것을 쓰기로 하였습니다.
public final class NoopSubscriber<T> implements Subscriber<T> {
private static final NoopSubscriber<?> INSTANCE = new NoopSubscriber<>();
/**
* Returns a singleton {@link NoopSubscriber}.
*/
@SuppressWarnings("unchecked")
public static <T> NoopSubscriber<T> get() {
return (NoopSubscriber<T>) INSTANCE;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T t) {}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
}
StreamMessage
를 완료해주는 whenComplete
기능을 사용하여 CompletableFuture<Void>
를 반환하고 StreamMessage
를 닫아주었습니다.
default CompletableFuture<Void> subscribe() {
subscribe(new NoopSubscriber<>());
return whenComplete();
}
완성된 기능을 간단한 javadoc와 테스트를 작성해서 PR을 날렸습니다.
어째 goodfirstissue 보다 간단한게 뭔가 빼먹은게 있나 걱정되는 순간이었습니다 😰
산으로 가는 방향
감사하게도🙇 메인테이너인 ikhoon님과 jrhee17님께서 코드리뷰를 해주셨는데요! 피드백에서 주요하게 고려해야할 사항은 두가지였습니다.
- Should we use NoopSubscriber.get()?
- We need to release / close the subscribed objects. It is not related to this PR. But there was a bug.
Could we close the objects passed in onNext(T t)?
You might want to check out StreamMessageUtil.closeOrAbort().
NoopSubscriber
의 싱글톤구성을get()
을 이용하면 사용할 수 있는데 이를 사용해도 문제가 없는지onNext
에서 구독된 객체는 방출하거나 닫을 필요가 있는데 이를 하지 않아서 버그가 발생하는데 이를StreamMessageUtil.closeOrAbort()
를 이용해서 해결하는것
우선 NoopSubscriber
를 사용하던 기존에 클래스들에게 영향이 없는지 살펴보았는데요 ..!🧐 테스트를 이용하여 디버깅을 돌려보니 큰 문제는 없는것 같아 사용해도 될 것 같습니다.
두번째 피드백을 보면 closeOrAbort
라는 기능을 사용하는것을 권해주시고 있습니다. closeOrAbort
는 객체를 받아 GC의 RefereceCounting 기법을 이용해 메모리에서 객체를 제거해주는 기능을 해주는것을 소스코드를 통해 알수 있었습니다. onNext
를 통해 받은 객체들을 이 기능을 통해 메모리에서 제거해주면 될 것 같습니다.
(여기서부터 피드백을 잘못 이해해서 수정이 산으로 가게됩니다...⛰️⛰️)
기존의 FuseableStreamMessage
와 StreamMessageInputStream
의 closeOrAbort()
에 객체를 넘기는 방식을 참고해보니 미리 선언한 boolean값을 onError
에서 true로 바꿔주고 onNext
로 가게되면 그로인해 closeOrAbort
가 동작하는 방식이었기에 이 방식을 따라하고자 했습니다. 그런데 여기에 boolean값을 쓰자니 싱글톤이기 때문에 이 후에는 제대로 동작하지 않을 것 같아 CompletableFuture<Void>
을 사용하고 예외적으로 완료되거나 정상적으로 완료된 CompletableFuture<Void>
를 반환하려 했습니다..
public final class NoopSubscriber<T> implements Subscriber<T> {
private static final NoopSubscriber<?> INSTANCE = new NoopSubscriber<>();
private CompletableFuture<Void> whenSubscribed = new CompletableFuture<>();
CompletableFuture<Void> whenSubscribed() {
return whenSubscribed;
}
@Override
public void onNext(T item) {
requireNonNull(item, "item");
if (whenSubscribed.isDone()) {
StreamMessageUtil.closeOrAbort(item);
return;
}
}
@Override
public void onError(Throwable t) {
whenSubscribed.completeExceptionally(t);
}
@Override
public void onComplete() {
whenSubscribed.complete(null);
}
subscribe에서도 CompletableFuture<Void>
반환하도록 만들고 Test를 만들어 돌려본결과 모두 통과하였기에 그대로 커밋을 날리게 됩니다..😰
😵
수정한 커밋에 대한 코드리뷰는 trustin님께서 해주셨습니다.
처음 PR에 closeOrAbort
만 붙이면 된다는 피드백에 머리를 한대맞은것 같았습니다.. 😵
이 경험을 통해서 백문이 불여일커밋.. 협업에 있어 혼자 머리싸매고 있는것 보단 커밋한번이 더 났다는 교훈을 얻게 되었습니다..
2번째 피드백과 ikhoon님께서 해주신 커밋을 통해 PR이 merge가 될 수 있었습니다.
No-op Subscribe 기능 정리
이번 이슈를 통해 만든 기능을 정리해보겠습니다
StreamMessage.subscribe()
를하면 싱글톤인NoopSubscriber
가Publisher
인StreamMessage
를subscribe
하게됩니다.Subscriber
에서onSubscribe
를 통해Subscription
에Publisher
의 모든 데이터를 요청합니다.
onNext
를 통해Subscriber
로 데이터를 전달하게되면closeOrAbort
를 통해 메모리에서 제거됩니다.
이 기능을 통해 StreamMessage
에 담긴 데이터를 이용하는 peek
같은 기능들을 보다 편리하게 사용할 수 있습니다.
마무리
배운것
CompletableFuture
의 사용법- 협업엔 백문이 불여일 커밋
- Reactor의 기능
Subscriber
의 기능 구현 및 활용
반성
- 아프다고 너무 늦게 수정함
ByteBuf
를 활용하지 않고 테스트코드를 작성함
일부러 좀 어려운 이슈가 하고싶어 goodfirstissue가 아닌 이슈를 선택했는데 생각보다 쉬워서 당황했습니다ㅎㅎ 다음엔 더어려운 이슈를 해결 더 많이 성장해봐야겠습니다.
긴글봐주셔서 감사합니다! 🙇
Author And Source
이 문제에 관하여([오픈소스 기여][Armeria] no-op subscribe 기능 추가하기), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@tlsdntjd95/Armeria-기여하기-4145-Provide-a-method-to-do-a-no-op-subscribe-to-a-StreamMessage-1저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)