RxJava - 작업자 작성
5955 단어 설계사의 길
조작부호
묘사
just
하나 이상의 대상을 이 대상이나 이 대상을 발사하는 Observable로 변환합니다
from
Iterable,Future 또는 배열을 Observable로 변환
create
함수를 사용하여 처음부터 Observable 만들기
defer
구독자가 구독할 때만 Observable를 만들고 구독마다 새로운 Observable를 만듭니다
range
지정된 범위의 정수 시퀀스를 내보내는 Observable 만들기
interval
주어진 시간 간격에 따라 보내는 정수 서열을 만드는 Observable
timer
지정된 지연 시간을 만든 후 단일 데이터를 전송하는 Observable
empty
아무것도 하지 않고 완성된 Observable을 만듭니다.
error
아무 것도 하지 않고 오류를 직접 알리는 Observable 만들기
never
데이터 전송 안 함
2、create
Rxjava는 create 방법의 함수를 전달할 때 관찰자의 isDisposed 상태를 검사하여 관찰자가 없을 때 Observable가 데이터 발사를 멈추고 비싼 연산을 방지하도록 합니다
// create
Observable.create(observableEmitter -> {
try {
//
if (!observableEmitter.isDisposed()) {
for (int i = 0; i < 10; i++) {
observableEmitter.onNext(i);
}
}
} catch (Exception e) {
observableEmitter.onError(e);
} finally {
observableEmitter.onComplete();
}
}).subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);
3、just from
// just
Observable.just("A", "B", "C")
.subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);
// from 1
Observable.fromArray("A", "B", "C")
.subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);
// from 2
ExecutorService executorService = Executors.newCachedThreadPool();
Future submit = executorService.submit(new TestCallable());
// 1 java.util.concurrent.TimeoutException
Observable.fromFuture(submit, 1, TimeUnit.SECONDS)
.subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);
4、repeat
// repeat
Observable
.just("A", "B", "C")
// 3
.repeat(3)
.subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);
// repeatWhen
// Observable
Observable
.range(0, 3)
// 1
.repeatWhen(objectObservable -> Observable.timer(1, TimeUnit.SECONDS))
// .repeatWhen(objectObservable -> Observable.just("A","B"))
.subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);
TimeUnit.SECONDS.sleep(4);
// repeatUntil
//
Observable.interval(500, TimeUnit.MILLISECONDS)
.take(5)
// true false
.repeatUntil(() -> true)
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(10);
}
5、defer
// defer
// Observable, ,
Observable.defer(() -> Observable.just("A"))
.subscribe(
// onNext
System.out::println,
// onError
System.err::println,
// onComplete
() -> System.out.println("onComplete!!"),
// onSubscribe
(disposable) -> System.out.println("onSubscribe!!")
);