RxJava의 다양한 장면 구현 요약

7146 단어 rxjava장면
1. 동작 실행 지연
timer+map 방법으로 실현할 수 있습니다.코드는 다음과 같습니다.

Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
   return doSomething();
  }).subscribe(System.out::println);
 }
2. 발송 실행 지연 결과
이런 장면은 데이터가 생성되는 동작을 즉시 실행하도록 요구하지만 결과는 발송을 늦춘다.이것은 위의 장면과 다르다.
이런 장면은 Observable.zip를 사용하여 실현할 수 있다.
zip 조작부호는 여러 Observable가 발사한 데이터를 순서대로 조합합니다. 모든 데이터는 한 번만 조합할 수 있고 질서정연합니다.최종 조합된 데이터의 수량은 발사 데이터가 가장 적은 Observable에 의해 결정된다.
각observable의 같은 위치의 데이터에 대해 서로 기다려야 한다. 즉, 첫 번째observable의 첫 번째 위치의 데이터가 발생한 후에 두 번째observable의 첫 번째 위치의 데이터가 발생하는 것을 기다려야 각 Observable의 같은 위치의 데이터가 모두 발생한 후에야 지정된 규칙에 따라 조합할 수 있다.이것은 정말 우리가 이용해야 할 것이다.
zip는 여러 가지 성명이 있지만 대체적으로 같다. 몇 개의observable를 전송한 다음에 하나의 규칙을 지정하여 각각observable가 대응하는 위치의 데이터를 처리하고 새로운 데이터를 생성하는 것이다. 다음은 그 중에서 가장 간단한 것이다.

 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
zip으로 푸시 전송 실행 결과는 다음과 같습니다.

 Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
         ,Observable.just(doSomething()), (x,y)->y)
   .subscribe(System.out::println));
3. defer를 사용하여 지정된 라인에서 어떤 동작을 수행합니다
아래의 코드와 같이, 비록 우리는 라인의 운행 방식을 지정했지만, doSomething() 이 함수는 현재 코드가 호출한 라인에서 실행된다.

 Observable.just(doSomething())
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
     .subscribe(v->Utils.printlnWithThread(v.toString()););
일반적으로 우리는 다음과 같은 방법으로 목적을 달성한다.

 Observable.create(s->{s.onNext(doSomething());})
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{
     Utils.printlnWithThread(v.toString());
  });
그러나 사실 우리는 defer를 채택해도 같은 목적을 달성할 수 있다.
defer 정보
defer 조작부호는create,just,from 등 조작부호와 마찬가지로 클래스 조작부호를 만들지만, 이 조작부호와 관련된 모든 데이터는 구독에 의해 효력이 발생합니다.
선언:

 public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
defer의 Func0의 Observable는 구독(subscribe)을 할 때 만들어집니다.
역할:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
즉, Observable는 구독할 때 만든 것이다.
위의 문제는 defer로 이루어집니다.

 Observable.defer(()->Observable.just(doSomething()))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
  });
4. compose를 사용하면 체인 구조를 끊지 마라
우리는 아래의 코드를 자주 본다.

 Observable.just(doSomething())
    .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
위의 코드에서 subscribeOn(xxx).observeOn(xxx) 많은 곳에서 똑같을 수 있습니다. 만약에 우리가 그것을 어느 곳에서 통일하여 실현하려고 한다면 우리는 이렇게 쓸 수 있습니다.

 private static <T> Observable<T> applySchedulers(Observable<T> observable) {
  return observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }
그러나 이렇게 매번 우리가 위의 방법을 사용해야 한다면 대체적으로 아래와 같이 가장 바깥쪽은 하나의 함수로 링크 구조를 깨뜨리는 것과 같다.

 applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
   @Override public Data call(Data data) {
   return manipulate(data);
   }
  })
 ).subscribe(new Action1<Data>() {
  @Override public void call(Data data) {
  doSomething(data);
  }
 });
compose 조작부호를 사용하여 링크 구조를 깨뜨리지 않는 목적을 달성할 수 있습니다.
compose의 설명은 다음과 같습니다.

 public Observable compose(Transformer<? super T, ? extends R> transformer);
그것의 인삼은 Transformer 인터페이스이고 출력은 Observable이다.Transformer는 실제적으로 하나Func1<Observable<T>,Observable<R>> 이다. 다시 말하면 그것을 통해 한 종류의 Observable를 다른 유형의 Observable로 전환할 수 있다.
간단하게 말하면compose는 지정한 전환 방식(입력 매개 변수transformer)을 통해 원래의observable를 다른 Observable로 전환할 수 있다.
compose를 통해 다음 방법으로 스레드 방식을 지정합니다.

 private static <T> Transformer<T, T> applySchedulers() {
   return new Transformer<T, T>() {
    @Override
    public Observable<T> call(Observable<T> observable) {
     return observable.subscribeOn(Schedulers.io())
       .observeOn(Schedulers.computation());
    }
   };
  }

 Observable.just(doSomething()).compose(applySchedulers())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
   });
함수 applySchedulers는 lambda 표현식을 사용하여 다음과 같이 간소화할 수 있습니다.

 private static <T> Transformer<T, T> applySchedulers() { 
  return observable->observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }
5. 우선 순위에 따라 다른 실행 결과 사용
위의 이 제목은 내가 표현하고 싶은 장면을 분명히 표현하지 못했을 것이다.사실 내가 표현하고자 하는 장면은 일반적인 네트워크 데이터 획득 장면과 유사하다. 캐시가 있으면 캐시에서 얻고, 없으면 네트워크에서 얻는다.
캐시가 있으면 네트워크에서 데이터를 가져오는 동작을 하지 않기를 요구합니다.
이것은concat+first로 실현할 수 있습니다.
concat은 몇 개의 Observable를 하나의 Observable로 통합하여 최종적인 Observable로 되돌려줍니다.그 데이터는 마치 Observable에서 나온 것과 같다.매개변수는 여러 Observable 또는 Observalbe를 포함하는 Iterator일 수 있습니다.
새로운observable의 데이터 배열은 원래concat의observable의 순서에 따라 배열됩니다. 즉, 새로운 결과의 데이터는 원래의 순서에 따라 배열됩니다.
다음은 상술한 수요의 실현이다.

 Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
    .subscribe(v->System.out.println("result:"+v));
 // 
 private static Observable<String> getDataFromCache(){
  return Observable.create(s -> {
   //dosomething to get data
   int value = new Random().nextInt();
   value = value%2;
   if (value!=0){
    s.onNext("data from cache:"+value); // 
   }
   //s.onError(new Throwable("none"));
   s.onCompleted();
  }
    );
 }
 // 
 private static Observable<String> getDataFromNetwork(){
  return Observable.create(s -> {
   for (int i = 0; i < 10; i++) {
    Utils.println("obs2 generate "+i);
    s.onNext("data from network:" + i); // 
   }
   s.onCompleted();
  }
    );
 }
위의 구현은 만약에 getDataFromCache에 데이터가 있다면 getDataFromNetwork의 코드는 실행되지 않을 것입니다. 이것이 바로 우리가 원하는 것입니다.
위에서 몇 가지 주의해야 할 사항이 있다.
1. 두 곳에서 데이터를 얻지 못할 수 있습니다. 이 장면에서first를 사용하면 이상 NoSuch Element Exception을 던집니다. 이런 장면이라면first OrDefault로 위의first를 교체해야 합니다.
2. 위getDataFromCache()에서 데이터가 없으면 onCompleted를 직접 호출하고, onCompleted가 아닌 onError를 호출하면 상술한 concat을 사용하면 어떤 결과도 얻을 수 없습니다.concat이 어떤 error를 받고 있기 때문에, 합병은 멈출 것입니다.따라서 onError를 사용하려면 concatDelayError로 대체해야 합니다. concat.concatDelayError 먼저 error를 무시하고 error를 마지막까지 연기합니다.
총결산
이상은 바로 이 글의 전체 내용입니다. 본고의 내용이 여러분의 학습이나 업무에 어느 정도 도움이 되고 의문이 있으면 댓글로 교류하시기 바랍니다.

좋은 웹페이지 즐겨찾기