RxJava 및 멀티스레드 동시 실행에 대한 간단한 설명

앞말
RxJava에 대해 모두가 잘 알고 있을 거라고 믿습니다. 그의 가장 핵심적인 두 글자는 바로 비동기입니다. 물론 비동기에 대한 처리는 매우 뛰어나지만 비동기는 절대 병발과 같지 않고 라인 안전과 같지 않습니다. 만약에 이 몇 가지 개념을 헷갈리게 하면 RxJava를 잘못 사용하면 매우 많은 문제를 가져올 수 있습니다.
RxJava 및 동시
먼저 RxJava 프로토콜의 원문을 살펴보겠습니다.

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
위에서 말한 바와 같이 RxJava는 다중 스레드 병발에 대해 사실 매우 많은 보호를 하지 않았다. 이 부분에서 만약에 여러 Observables가 다중 스레드에서 데이터를 발사한다면happens-before 원칙을 만족시켜야 한다.
다음은 간단한 예입니다.

final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  Log.d("TAG", "onNext: " + unSafeCount);
 }
});

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;
  for(int i = 0;i < 10;i++) {
   new Thread(new Runnable() {
    @Override
    public void run() {
     for (int j = 0; j < 1000; j++) {
      subject.onNext(unit);
     }
    }
   }).start();
  }
 }
});
이것은 가장 전형적인 다중 스레드 문제로 10개의 스레드에서 데이터를 발사하고 더하면 최종적으로 얻은 답은 10000보다 적다.RxJava를 사용했지만 이러한 사용은 병발에 의미가 없다. 왜냐하면 RxJava는 병발이 가져오는 문제를 처리하지 않았기 때문이다.우리는subject의 onNext 방법의 원본 코드를 볼 수 있습니다. 그 안에는 간단합니다. 바로observer에 대응하는 onNext 방법을 호출했을 뿐입니다.뿐만 아니라 절대 다수의 Subject는 라인이 안전하지 않기 때문에 이런 종류를 사용할 때(전형적인 장면은 자제된 RxBus), 여러 라인에서 데이터를 발사하면 조심해야 한다.
이러한 문제에 대해 다음과 같은 두 가지 해결 방안이 있습니다.
첫 번째는 전통적인 해결 방법을 간단하게 사용하는 것이다. 예를 들어 int 대신 Atomic Integer를 사용하는 것이다.
두 번째는 RxJava를 사용하는 솔루션입니다. 여기서 Subject 대신 SerializedSubject를 사용합니다.

final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  count.addAndGet(integer);

  Log.d("TAG", "onNext: " + count);
 }
});

final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;

  for(int i = 0;i < 10;i++){
   new Thread(new Runnable() {
    @Override
    public void run() {
     for(int j = 0;j < 1000;j++){
      ser.onNext(unit);
     }
    }
   }).start();
  }
 }
});
SerializedSubject의 onNext 방법이 무엇을 하는지 볼 수 있습니다.

@Override
public void onNext(T t) {
 if (terminated) {
  return;
 }
 synchronized (this) {
  if (terminated) {
   return;
  }
  if (emitting) {
   FastList list = queue;
   if (list == null) {
    list = new FastList();
    queue = list;
   }
   list.add(nl.next(t));
   return;
  }
  emitting = true;
 }
 try {
  actual.onNext(t);
 } catch (Throwable e) {
  terminated = true;
  Exceptions.throwOrReport(e, actual, t);
  return;
 }
 for (;;) {
  for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
   FastList list;
   synchronized (this) {
    list = queue;
    if (list == null) {
     emitting = false;
     return;
    }
    queue = null;
   }
   for (Object o : list.array) {
    if (o == null) {
     break;
    }
    try {
     if (nl.accept(actual, o)) {
      terminated = true;
      return;
     }
    } catch (Throwable e) {
     terminated = true;
     Exceptions.throwIfFatal(e);
     actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
     return;
    }
   }
  }
 }
}
처리 방식은 매우 간단하다. 만약 다른 라인이 데이터를 발사하고 있다면, 데이터를 대기열에 놓고 다음 발사를 기다린다.이것은 같은 시간에 onNext, onComplete, onError 같은 방법만 호출할 수 있음을 보장합니다.
그러나 이러한 조작은 분명히 성능에 영향을 미칠 수 있기 때문에 RxJava는 모든 조작을 안전한 라벨에 붙이지 않는다.
여기서 한 가지 문제를 인용해야 한다. 그것은 사용자가create 방법에 대한 남용이다. 사실 이 방법은 사용자에게 빈번하게 호출되어서는 안 된다. 왜냐하면 모든 데이터 발사, 수신의 논리를 조심스럽게 처리해야 하기 때문이다.반대로 기존의 조작부호를 사용하면 이 문제를 잘 해결할 수 있기 때문에 다음에 문제가 발생할 때create를 간단하게 사용해서 스스로 쓰지 말고 기존의 조작부호가 상응하는 수요를 완성할 수 있는지 생각해 봐야 한다.
RxJava의 일부 연산자
RxJava 중의 일부 조작부호도 다중 스레드와 병발하는 것과 관련이 있습니다. 다음은 merge와concat, 그리고 그들의 변종 조작부호에 대해 말씀드리겠습니다.
다중 스레드 발사 데이터에 대해 때때로 우리가 얻어야 할 결과도 발사 때와 같은 순서를 유지한다. 이때 만약에 우리가merge라는 조작부호를 사용하여 여러 개의 발사원을 결합한다면 일정한 문제가 생길 수 있다. (예에서 매우 나쁜 시범을 보였는데create 조작부호를 사용했으니 이런 작법을 배우지 마세요. 여기는 단순히 결과를 구하기 위해서입니다.)

Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(final Subscriber<? super Integer> subscriber) {
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     Thread.sleep(1000);
     subscriber.onNext(1);
     subscriber.onCompleted();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(Subscriber<? super Integer> subscriber) {
  subscriber.onNext(2);
  subscriber.onCompleted();
 }
});

Observable.merge(o1,o2)
  .subscribe(new Subscriber<Integer>() {
   @Override
   public void onCompleted() {

   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(Integer i) {
    Log.d("TAG", "onNext: " + i);
   }
  });
이런 장면에 대해 우리가 얻은 답은 o1이 발사한 데이터를 먼저 얻고 o2의 데이터를 얻는 것이 아니라 2,1이다.
그 이유는 merge가 사실 무엇을 전달하고 데이터 발사 순서를 상관하지 않기 때문이다.

@Override
public void onNext(Observable<? extends T> t) {
  if (t == null) {
    return;
  }
  if (t == Observable.empty()) {
    emitEmpty();
  } else
  if (t instanceof ScalarSynchronousObservable) {
    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
  } else {
    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
    addInner(inner);
    t.unsafeSubscribe(inner);
    emit();
  }
}
lift 조작을 거친 후에 대응하는 중간인 Merge Subscriber의 onNext를 볼 수 있습니다. 여분의 코드가 없기 때문에 여러 개의 Observable가 다중 노드에서 데이터를 발사할 때 순서는 당연히 보장되지 않습니다.
한 단어가 이 문제를 설명한다.merge 후의 데이터 원본은 엇갈릴 수 있습니다.merge는 이런 데이터가 엇갈리는 문제가 있기 때문에 변종인 flatMap도 같은 문제가 있을 수 있다.
이러한 장면에 대해 우리는 concat 조작부호를 사용하여 완성할 수 있다.

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.
문서에 근거하여 우리는 concat 조작부호가 데이터 원본을 하나하나 처리하는 데이터라는 것을 안다.

if (wip.getAndIncrement() != 0) {
  return;
}

final int delayErrorMode = this.delayErrorMode;

for (;;) {
  if (actual.isUnsubscribed()) {
    return;
  }

  if (!active) {
    if (delayErrorMode == BOUNDARY) {
      if (error.get() != null) {
        Throwable ex = ExceptionsUtils.terminate(error);
        if (!ExceptionsUtils.isTerminated(ex)) {
          actual.onError(ex);
        }
        return;
      }
    }

    boolean mainDone = done;
    Object v = queue.poll();
    boolean empty = v == null;

    if (mainDone && empty) {
      Throwable ex = ExceptionsUtils.terminate(error);
      if (ex == null) {
        actual.onCompleted();
      } else
      if (!ExceptionsUtils.isTerminated(ex)) {
        actual.onError(ex);
      }
      return;
    }

    if (!empty) {

      Observable<? extends R> source;

      try {
        source = mapper.call(NotificationLite.<T>instance().getValue(v));
      } catch (Throwable mapperError) {
        Exceptions.throwIfFatal(mapperError);
        drainError(mapperError);
        return;
      }

      if (source == null) {
        drainError(new NullPointerException("The source returned by the mapper was null"));
        return;
      }

      if (source != Observable.empty()) {

        if (source instanceof ScalarSynchronousObservable) {
          ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

          active = true;

          arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
        } else {
          ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
          inner.set(innerSubscriber);

          if (!innerSubscriber.isUnsubscribed()) {
            active = true;

            source.unsafeSubscribe(innerSubscriber);
          } else {
            return;
          }
        }
        request(1);
      } else {
        request(1);
        continue;
      }
    }
  }
  if (wip.decrementAndGet() == 0) {
    break;
  }
}
원본 코드를 통해 알 수 있듯이active 필드는 만약에 이전 데이터 원본이 데이터를 발사하지 않았다면 for순환에서 기다릴 것입니다. 이전 데이터 원본이 발사될 때까지active 필드를 리셋합니다.
concat에 대해 사실 아직도 문제가 존재한다. 그것은 여러 Observable가 직렬로 바뀌어 전체 RxJava 이벤트 흐름의 처리 시간을 크게 증가시킬 수 있다는 것이다. 이 장면에 대해 우리는 concateEager를 사용하여 해결할 수 있다.concat Eager의 원본 코드는 분석할 필요가 없습니다. 관심 있는 학생들은 스스로 볼 수 있습니다.
총결산
이 글은 비교적 짧고 말하는 것도 비교적 간단명료하다. 사실은 RxJava에서 다중 루틴이 병발하는 몇 가지 문제를 토론한 것이다.마지막으로 나는 RxJava가 결코 높은 것이 아니라고 말하고 싶다. 당신의 프로젝트가 도입되기 전에 정말 이렇게 할 필요가 있는지 고려해야 한다.설령 정말 장면이 RxJava를 필요로 한다 하더라도 프로젝트의 모든 조작을 단숨에 RxJava로 바꾸지 마십시오. 일부 간단한 조작은 반드시 RxJava의 조작부호를 사용해야 하는 것은 아닙니다. 사용하면 오히려 코드의 가독성을 낮출 수 있습니다. Rx를 사용하기 위해 Rx를 사용하지 마십시오.
자, 이상은 이 글의 전체 내용입니다. 본고의 내용이 여러분의 학습이나 업무에 일정한 도움을 줄 수 있기를 바랍니다. 의문이 있으면 댓글을 남겨 주십시오.

좋은 웹페이지 즐겨찾기