Rxjava의 스레드 스케줄링 소스 분석
코드 호출
Observable.just(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Integer integer) throws Exception {
}
});
바로 주제에 들어가서subscribe에서 어떤 방법을 사용했는지 먼저 봅시다
//Observable.java
public final Disposable subscribe(Consumer super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,
Action onComplete, Consumer super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
// Observable subscribeActual
protected abstract void subscribeActual(Observer super T> observer);
다음은 subscribeOn 방법에서 어떤 조작이 진행되었는지 살펴보겠습니다.
//Observable.java
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// ObservableSubscribeOn , RxJavaPlugins.onAssembly ,
// , ,
// subscribeActual , ObservableSubscribeOn subscribeActual
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
다음은 Observable Subscribe On의subscribe Actual 방법을 보도록 하겠습니다.
//ObservableSubscribeOn.java
public void subscribeActual(final Observer super T> observer) {
final SubscribeOnObserver parent = new SubscribeOnObserver(observer);
observer.onSubscribe(parent);
// SubscribeTask, Runnable
// scheduler.scheduleDirect
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
scheduler를 보세요.scheduleDirect, 다시 한 번 말씀드리기 전에 전송된Scheduler를 살펴보겠습니다.IO 수신된 Schedule 보기
public static Scheduler io() {
// IO
return RxJavaPlugins.onIoScheduler(IO);
}
//new IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
// , Schedulers.io IoScheduler
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//scheduler
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// Worker, createWorker ,
// IoScheduler.createWorker, EventLoopWorker
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
// EventLoopWorker.schedule
w.schedule(task, delay, unit);
return task;
}
이제 EventLoopWorker를 보도록 하겠습니다.
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//NewThreadWorker.scheduleActual
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
진정으로 스레드 스케줄링에 들어가는 코드는 New ThreadWorker에서
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future> f;
try {
if (delayTime <= 0) {
//executor
f = executor.submit((Callable
그래서 마지막에 진정으로 스레드 스케줄링을 하는 것은 하나의 스레드 탱크이다. subscribe On을 보고 나서 observe On을 보자. 우선 안드로이드 Schedulers를 보자.mainThread () 는 도대체 어떤 라인입니까
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT
// Looper.getMainLooper() , Looper
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
자, 이 문제를 확정했으니, 우리 다시 계속 아래를 봅시다.
public final Observable observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// ObservableObserverOn
return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}
이어서 Observable ObserveOn 대상을 보도록 하겠습니다.
protected void subscribeActual(Observer super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// createWorker, HandlerScheduler.createWorker HandlerWorker
Scheduler.Worker w = scheduler.createWorker();
// ObserveOnObserver
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}
//내부 클래스 Observe OnObserver, 다음은 리셋 방법
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable qd = (QueueDisposable) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
// schedule
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// schedule
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
// schedule
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
// schedule
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// , worker.schedule
worker.schedule(this);
}
}
// HandlerWorker schedule , , ,Handler
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.