Java 확장 라이브러리 RxJava의 기본 구조와 적용 장면 소결

5711 단어 JavaRxJava
기본 구조
우리는 먼저 가장 기본적인 코드를 보고 이 코드가 RxJava에서 어떻게 실현되었는지 분석한다.

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("1");
    subscriber.onCompleted();
  }
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
  @Override
  public void onCompleted() {

  }

  @Override
  public void onError(Throwable e) {

  }

  @Override
  public void onNext(String s) {

  }
};

Observable.create(onSubscriber1)
    .subscribe(subscriber1);

우선 Observable을 살펴보겠습니다.create 코드

public final static <T> Observable<T> create(OnSubscribe<T> f) {
  return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
  this.onSubscribe = f;
}

직접적으로 Observable의 구조 함수를 호출하여 새로운 Observable 대상을 만들었습니다. 이 대상은 우리가 잠시 Observable1로 표시하여 뒤로 거슬러 올라갈 수 있도록 합니다.
또한 우리가 전송한 Onsubscribe 대상인 onSubscribe1을observable1의 onSubscribe 속성에 저장할 것입니다. 이 속성은 뒤에 있는 상하문에서 매우 중요합니다. 주의하십시오.
다음은 Subscribe 방법을 살펴보겠습니다.

public final Subscription subscribe(Subscriber<? super T> subscriber) {
  return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
  ...
  subscriber.onStart();
  hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
  return hook.onSubscribeReturn(subscriber);
}

보시다시피,subscribe 이후,observable1을 직접 호출했습니다.onSubscribe.call 방법, 즉 우리 코드의 onSubscribe1 대상의call 방법
전송된 매개 변수는 우리 코드에 정의된subscriber1 대상입니다.콜 방법에서 하는 일은 전송된subscriber1 대상의 onNext와 onComplete 방법을 호출하는 것입니다.
이렇게 하면 관찰자와 피관찰자 간의 통신을 실현하는데 매우 간단하지 않습니까?

public void call(Subscriber<? super String> subscriber) {
  subscriber.onNext("1");
  subscriber.onCompleted();
}

RxJava에서 장면 소결 사용하기
1. 데이터를 추출하여 캐시된 장면을 확인합니다.
데이터를 추출하려면 먼저 메모리에 캐시가 있는지 확인하십시오
그런 다음 파일 캐시에 있는지 확인합니다.
마지막으로 네트워크에서
앞의 어떤 조건이 충족되면 뒤의 것을 집행하지 않을 것이다

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
    if (memoryCache != null) {
      subscriber.onNext(memoryCache);
    } else {
      subscriber.onCompleted();
    }
  }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> subscriber) {
    String cachePref = rxPreferences.getString("cache").get();
    if (!TextUtils.isEmpty(cachePref)) {
      subscriber.onNext(cachePref);
    } else {
      subscriber.onCompleted();
    }
  }
});

Observable<String> network = Observable.just("network");

// concat operator 
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
  memoryCache = "memory";
  System.out.println("--------------subscribe: " + s);
});

2. 인터페이스는 여러 개의 인터페이스가 데이터를 보내고 업데이트할 때까지 기다려야 한다

// Observable , , 
private void testMerge() {
  Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
  Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

  Observable.merge(observable1, observable2)
      .subscribeOn(Schedulers.newThread())
      .subscribe(System.out::println);
}

3. 한 인터페이스의 요청은 다른 API 요청에 의해 되돌아오는 데이터에 의존한다
예를 들어 우리는 자주 로그인을 한 후에 얻은 토큰에 따라 메시지 목록을 얻는다.
여기에서 RxJava로 주로 플러그인 리셋 문제를 해결합니다. Callback hell이라는 전문 명사가 있습니다.

NetworkService.getToken("username", "password")
  .flatMap(s -> NetworkService.getMessage(s))
  .subscribe(s -> {
    System.out.println("message: " + s);
  });

4. 인터페이스 버튼은 연속 클릭을 방지해야 합니다

RxView.clicks(findViewById(R.id.btn_throttle))
  .throttleFirst(1, TimeUnit.SECONDS)
  .subscribe(aVoid -> {
    System.out.println("click");
  });

5. 응답식 인터페이스
예를 들어 어떤 체크박스를 선택하면 대응하는preference를 자동으로 업데이트합니다

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
    .subscribe(checked.asAction());

6. 복잡한 데이터 변환

Observable.just("1", "2", "2", "3", "4", "5")
  .map(Integer::parseInt)
  .filter(s -> s > 1)
  .distinct()
  .take(3)
  .reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
  .subscribe(System.out::println);//9

좋은 웹페이지 즐겨찾기