Java - RxJava 2 노트

참고:http://blog.csdn.net/maplejaw_/article/details/52442065 http://www.jianshu.com/nb/5864063
작년 에 RxJava2. x 가 발표 되 었 습 니 다. RxJava1. x 에 비해 사용 에 있어 많은 변경 사항 이 있 습 니 다 (API 함수 이름 만 바 뀌 었 을 뿐 사용 절차 의 사 고 는 변 하지 않 습 니 다). 그래서 여기 서 필기 파일 을 기록 합 니 다!
기본 용법
1. Observable 만 들 기 (피 관찰자 / 발표 자 / 발사 자)
(1)create()
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter observableEmitter) throws Exception {
        observableEmitter.onNext("    1");
        observableEmitter.onNext("    2");
        observableEmitter.onError(new Throwable("    "));          
        observableEmitter.onComplete(); //  
    }
});

(2)just
Observable observable = Observable.just("    1", "    2");

(3)fromIterable, fromArray
ArrayList list = new ArrayList<>();
list.add("    1");
list.add("    2");
Observable observable = Observable.fromIterable(list);

(4)range,         ,         ,   0    ,      
Observable observable = Observable.range(10, 5)

(5)defer,  ,          Observable
Observable observable = Observable.defer(new Callable() {
    @Override
    public ObservableSource call() throws Exception {
        return Observable.just("    1","    2");
    }
});

(6)interval,        
 Observable observable = Observable.interval(500, TimeUnit.MILLISECONDS); //  500ms

(7)timer,      
Observable observable = Observable.timer(300, TimeUnit.MILLISECONDS); //  300ms

(8)repeat,      
Observable observable = Observable.just("    1").repeat(3); //    3 

2. Observer 만 들 기 (관찰자 / 구독 자 / 수신 자)
(1).Observer    
Observer observer = new Observer() {
    @Override
    public void onSubscribe(@NonNull Disposable disposable) {
        //Disposable    RxJava1.x  Subscription,      
    }

    @Override
    public void onNext(@NonNull Object o) {
        //    
    }

    @Override
    public void onError(@NonNull Throwable throwable) {
        //    
    }

    @Override
    public void onComplete() {
        //    
    }
};

(2).Observer    
Consumer onNext = new Consumer() {//    
    @Override
    public void accept(Object o) throws Exception {
    }
};

Consumer onError = new Consumer() {//    
    @Override
    public void accept(Throwable throwable) throws Exception {
    }
};

Action onComplete = new Action() {//    
    @Override
    public void run() throws Exception {
    }
};

Consumer onSubscribe = new Consumer() {
    @Override
    public void accept(Disposable disposable) throws Exception {
    }
};

3. Observer 구독 Observable
(1).Observer    
observable.subscribe(observer); //  

(2).Observer    
observable.subscribe(onNext);
observable.subscribe(onNext, onError);
observable.subscribe(onNext, onError, onComplete);
observable.subscribe(onNext, onError, onComplete, onSubscribe);

스 레 드 스케줄 링
     
Schedulers.computation( )        ,           ,    IO  (IO     Schedulers.io());             
Schedulers.from(executor)       Executor     
Schedulers.io( )             IO     ,     IO  ,                ;
                                    ,   Schedulers.computation();
                           Schedulers.io( )     CachedThreadScheduler,                
Schedulers.newThread( )                
Schedulers.trampoline( )              ,           
AndroidSchedulers.mainThread()       RxAndroid  ,    ,   Android UI   

Observable.just("    ...")
    .subscribeOn(Schedulers.io())//io  -   
    .observeOn(AndroidSchedulers.mainThread())//   -   
    .subscribe(new Consumer() {
        @Override
        public void accept(String s) throws Exception {                        
        }
    });

3. 상용 조작 부호
1. map - 데이터 형식 변환
Observable.just("123")
    .map(new Function() {
        @Override
        public Integer apply(@NonNull String s) throws Exception {
            return Integer.parseInt(s);
        }
    })
    .subscribe(new Consumer() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });

2. flatMap - 데이터 집합 평면 화 (모든 요 소 를 반복)
List list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
List> listSSS = new ArrayList<>();//      
listSSS.add(list);
Observable.fromIterable(listSSS)
.flatMap(new Function, ObservableSource>() {
    @Override
    public ObservableSource apply(@NonNull List list) throws Exception {
        return Observable.fromIterable(list);
    }
})
.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

3. buffer - 캐 시 가 가득 차 면 list 집합 으로 데 이 터 를 보 냅 니 다.
List list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
Observable.fromIterable(list)
    .buffer(list.size())  //      
    .subscribe(new Consumer>() {
        @Override
        public void accept(List list) throws Exception {
            System.out.println(list.size());
        }
    });

4. take (n) - 전송 전 n 항 데이터
Observable.just(1, 2, 1, 1, 2, 3)
    .take(3) //   3   
    .subscribe(new Consumer() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });

5. distinct - 중복 항목 제거
Observable.just(1, 2, 1, 1, 2, 3)
    .distinct() //  
    .subscribe(new Consumer() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });

6. filter - 필터
Observable.just(1, 2, 3, 4, 5)
    .filter(new Predicate() {
        @Override
        public boolean test(@NonNull Integer integer) throws Exception {
            return integer > 3; //    3
        }
    })
    .subscribe(new Consumer() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });

5. Flowable - 배 압
Flowable RxJava2.x     ,        Backpressure  
  :                     ,   Android        ,               !
Flowable.create(new FlowableOnSubscribe() {
        @Override
        public void subscribe(FlowableEmitter e) throws Exception {
            for (int i = 0; i < 10000; i++)
                e.onNext(i);
            e.onComplete();
        }
}, BackpressureStrategy.ERROR) //        ,     
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.newThread())
    .subscribe(new Consumer() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
            Thread.sleep(1000);
        }
    }, new Consumer() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable);
        }
    });

//  Rxjava1.x    
Flowable.range(1,10000)
.onBackpressureDrop() //   
.subscribe(new Consumer() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println(integer);
    }
});

4. Subject
Subject extends Observable implements Observe
  :
       Observable
       Observer
     Observable Observer             
Subject      : AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject
  :
            onNext(on    ),       Serialized,      !
    SerializedSubject ser = new SerializedSubject(publishSubject);

Processor Subject     ,  Processor RxJava2.x   ,   Flowable,        
//Processor
AsyncProcessor processor = AsyncProcessor.create();
processor.subscribe(o -> Log.d("JG",o)); //three
processor.onNext("one");
processor.onNext("two");
processor.onNext("three");
processor.onComplete();

1. AsyncSubject 는 onComplete () 호출 전의 마지막 데이터 만 받 습 니 다.
AsyncSubject asyncSubject = AsyncSubject.create();
asyncSubject.onNext("asyncSubject1");
asyncSubject.onNext("asyncSubject2");
asyncSubject.onNext("asyncSubject3");
asyncSubject.onComplete();
asyncSubject.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);//    asyncSubject3
    }
});    

2. Behavior Subject 는 구독 되 기 전의 마지막 데 이 터 를 받 고 구독 후의 데 이 터 를 받는다.
BehaviorSubject behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext("behaviorSubject1");
behaviorSubject.onNext("behaviorSubject2");
behaviorSubject.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s); //   behaviorSubject2, behaviorSubject3, behaviorSubject4
    }
});
behaviorSubject.onNext("behaviorSubject3");
behaviorSubject.onNext("behaviorSubject4");

3. PublishSubject 는 구독 한 데이터 만 받 습 니 다.
PublishSubject publishSubject = PublishSubject.create();
publishSubject.onNext("publishSubject1");
publishSubject.onNext("publishSubject2");
publishSubject.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);    behaviorSubject3, behaviorSubject4
    }
});
publishSubject.onNext("publishSubject3");
publishSubject.onNext("publishSubject4");

4. ReplaySubject 는 모든 데 이 터 를 받 습 니 다. 언제든지 구독 하 세 요!그러나 일정 시간 또는 일정 시간 까지 캐 시 하면 오래된 데 이 터 를 버 립 니 다!
ReplaySubject replaySubject = ReplaySubject.create(); //           16
//replaySubject = ReplaySubject.create(100);//           100
//replaySubject = ReplaySubject.createWithSize(2);//        2   
//replaySubject = ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());//       1     
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");

5. Subject 를 교량 으로 하고 예시 사용
//1.Subject    
Subject subject = BehaviorSubject.create();

//2.  
subject.subscribe(new Consumer() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

//3.    
Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter observableEmitter) throws Exception {
        observableEmitter.onNext("as Bridge");
    }
}).subscribe(subject);

: http://www.jianshu.com/p/724c937e3d0c CSDN 블 로그:http://blog.csdn.net/qq_32115439 / article / details / 78090944 GitHub 블 로그:http://lioil.win/2017/09/25/JavaSE-RxJava.html 코딩 블 로그:http://c.lioil.win/2017/09/25/JavaSE-RxJava.html

좋은 웹페이지 즐겨찾기