[RxJava] RxJava 스케쥴러

📝RxJava의 학습 순서
1. Observable 클래스를 명확하게 이해(특히 Hot Observable과 Cold Observable의 개념을 꼭 이해해야함)
2. 변환, 제어, 결합 연산자 등 카테고리별 주요 함수 공부
3. 스케줄러의 의미, subscribeOn()과 observeOn()함수의 차이
4. 그 밖의 디버깅, 흐름 제어함수


1.RxJava Scheduler

RxJava에서의 Scheduler는 RxJava 비동기 프로그래밍을 위한 쓰레드 관리자이며
즉, 스케쥴러를 이용해서 어떤 쓰레드에서 무엇을 처리할 지에 대해서 제어할 수 있다.
RxJava만 사용한다고 해서 비동기 처리가되는 것이 아닌 Scheduler를 통해 쓰레드를 분리해주어야 비동기 작업이 가능한 것이다.

Scheduler를 이용해서 데이터를 통지하는 쪽과 데이터를 처리하는 쪽 쓰레드를 별도로 지정해서 분리할 수 있고 쓰레드를 위한 코드의 간결성과 쓰레드의 복잡함을 줄일 수 있다.
스케줄러를 지정하기 위해서 생산자쪽의 데이터 흐름을 제어하기 위해서는 subscribeOn(), 구독자쪽에서 전달받은 데이터 처리를 제어하기 위해서는 observeOn() 연산자를 사용한다.

subscribeOn은 여러번 호출되더라도 맨 처음의 호출만 영향을 주며 어디에 위치하든 상관 없고, observeOn은 여러번 호출될 수 있으며 이후에는 실행되는 연산에 영향을 주므로 위치가 중요하다.


Observable.fromIterable(shapes) // shapes 리스트는 (Red,Ball), (Green, Ball), (Blue,Ball) 이렇게 구성되어 있습니다.
        .subscribeOn(Schedulers.computation())
        .subscribeOn(Schedulers.io())
        .doOnSubscribe(data -> MyUtil.printData("doOnSubscribe")) // printData System.out.println(""+Thread.currentThread().getName()+" | "+message+" | ")
        .doOnNext(data -> MyUtil.printData("doOnNext", data))
        .observeOn(Schedulers.newThread())                  
        .map(data -> {data.shape = "Square"; return data;})
        .doOnNext(data -> MyUtil.printData("map(Square)", data))
        .observeOn(Schedulers.newThread())                  
        .map(data -> {data.shape = "Triangle"; return data;})
        .doOnNext(data -> MyUtil.printData("map(Triangle)", data))
        .observeOn(Schedulers.newThread())                  
        .subscribe(data -> MyUtil.printData("subscribe", data));

subscribeOn의 경우 여러번 호출되더라도 맨 처음의 호출만 영향을 주기 때문에 computation 스케줄러에서 데이터 흐름이 발생되고 , 그 후 observeOn(Schedulers.newThread())에 의해 연산이 new thread에서 실행된다.

main | doOnSubscribe | 
RxComputationThreadPool-1 | doOnNext | MyShape{color='Red', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Green', shape='Ball'}
RxComputationThreadPool-1 | doOnNext | MyShape{color='Blue', shape='Ball'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Red', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Green', shape='Square'}
RxNewThreadScheduler-1 | map(Square) | MyShape{color='Blue', shape='Square'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-2 | map(Triangle) | MyShape{color='Blue', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Red', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Green', shape='Triangle'}
RxNewThreadScheduler-3 | subscribe | MyShape{color='Blue', shape='Triangle'}

2.Scheduler 종류

Scheduler의 종류로는 single, computation, io, trampoline, new_thread가 존재하고,
RxJava에서는 이 중 Computation, IO, Trampoline 세 가지의 Scheduler를 권장합니다.

public final class Schedulers {

    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;
    
    @NonNull
    static final Scheduler NEW_THREAD;
}
Scheduler생성 방법내용
SINGLESchedulers.single()단일 스레드를 생성해 계속 재사용
COMPUTATIONSchedulers.computation()내부적으로 스레드 풀 생성, 스레드 개수 = 프로세서 개수
IOSchedulers.io()필요할 때 마다 스레드를 계속 생성
TRAMPOLINESchedulers.trampoline()현재 스레드에 무한한 크기의 대기 큐 생성
NEW_THREADSchedulers.newThread()매번 새로운 스레드 생성

1. Single Thread Scheduler

단일 스레드를 계속 재사용합니다. RxJava 내부에서 스레드를 별로도 생성하며, 한 번 생성된 스레드로 여러 작업을 처리한다. 비동기 처리를 지향한다면 Single 스레드 스케쥴러를 사용할 일은 거의 없다.

2. Computation Thread Scheduler

CPU에 대응하는 계산용 스케줄러이며 IO 작업을 하지 않고 일반적인 계산/연산 작업을 할 때 사용한다.

3. IO Thread Scheduler

파일 입출력 등의 IO 작업을 하거나 네트워크 요청 처리 시에 사용하는 스케쥴러이다. Computation 스케쥴러와 다르게 필요할 때 마다 스레드를 계속 생성한다.

4. Trampoline Thread Scheduler

새로운 스레드를 생성하지 않고 사용하고 있는 현재 스레드에 무한한 크기의 대기 큐를 생성한다.

5. New Thread Scheduler

다른 스케쥴러와 달리 요청을 받을 때 마다 매번 새로운 스레드를 생성한다.

출처
https://4z7l.github.io/2020/12/14/rxjava-5.html

좋은 웹페이지 즐겨찾기