RxJava 구독 취소 다양한 방식 의 실현

9871 단어 RxJava구독 취소
구독 취소
소비자 유형
Observable 생 성 되 돌리 기 Disposable 취소

public class SecondActivity extends AppCompatActivity {

  private static final String TAG = "SecondActivity";
  private Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_second);
    disposable = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d(TAG, "accept: "+s);
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //    
    if(disposable != null && !disposable.isDisposed()){
      disposable.dispose();
      Log.d(TAG, "onDestroy: dispose");
    }
  }
}

일반 타 입 Observer
Observer 에서 Disposable 을 가 져 와 서 취소 합 니 다.

public class ThirdActivity extends AppCompatActivity {
  private static final String TAG = "ThirdActivity";
  Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_third);
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            disposable = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //                
    if (disposable != null && !disposable.isDisposed()) {
      Log.d(TAG, "dispose: ");
      disposable.dispose();
    }
  }
}

DisposableObserver 형식
Disposable Observer 와 Subscribe With 를 이용 하여 Disposable 로 바로 돌아 간 후 취소 합 니 다.

public class FourthActivity extends AppCompatActivity {
  private static final String TAG = "FourthActivity";
  private DisposableObserver<String> observer;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_fourth);
    observer = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new DisposableObserver<String>() {
      @Override
      public void onNext(String o) {
        Log.d(TAG, "onNext: "+o);
      }

      @Override
      public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
      }

      @Override
      public void onComplete() {
        Log.d(TAG, "onComplete: ");
      }
    });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    if (observer != null && !observer.isDisposed()) {
      Log.d(TAG, "dispose: ");
      observer.dispose();
    }
  }
}

여러 Observer 취소
여러 Observer 에 Composite Disposable 을 추가 하여 한 번 에 취소 합 니 다.

public class ComDisposableActivity extends AppCompatActivity {

  private Disposable disposable1;
  private Disposable disposable2;
  private static final String TAG = "ComDisposableActivity";
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_com_disposable);
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG, "run: Unsubscribing subscription from onCreate()");
          }
        })
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            disposable1 = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            disposable2 = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    //    
    compositeDisposable.add(disposable1);
    compositeDisposable.add(disposable2);
    //           
    compositeDisposable.dispose();
  }
}

RxLifestyle 취소
OnDestory 취소

Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");
          }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long num) throws Exception {
            Log.d(TAG, "accept: " + num);
          }
        });

지정 생명주기 취소

Observable.interval(1,TimeUnit.SECONDS)
        .doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");
          }
        }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            Log.d(TAG, "bindUntilEvent accept: " + aLong);
          }
        });
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기