Retrofit 2.1 + Rxjava 소스 분석 (1)
1. Retrofit 객체 작성
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
.client(okHttpClient.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(base_url)
.build();
이것은 일반적인 Retrofit 대상 창설 과정으로 필요한 매개 변수를 전달합니다:
okHttpClient
,converterFactory
,callAdapterFactory
(Rxjava와 어울리지 않을 때 Retrofit callAdapterFactory
,아무것도 하지 않음),baseUrl.여기서 특히 주의해야 할 것은
RxJavaCallAdapterFactory.create()
이것RxjavaCallAdapter
이 전송되었는데, 이 대상은 Retrofit의 사용을 철저히 바꾸게 될 것이다.Retrofit와 Rxjava를 조합하는 것이 가능해졌고 Retrofit 작가의 프로그래밍 기초에 탄복하지 않을 수 없었다. 이 인터페이스를 개방하여 Retrofit의 유연성을 높였다.2. 인터페이스의 동적 프록시 대상 만들기
실험의 인터페이스를 제시하다
public interface NetApiService {
//post
@FormUrlEncoded
@POST("{url}")
Observable executePost(
@Path("url") String url,
@Field("params") String params,
@Field("signature") String signature
);
}
netApiService = retrofit.create(NetApiService .class); //
여기도 Retrofit의 신기한 부분이다. 하나의 인터페이스를 전달하면 이 인터페이스를 실현하는 대상을 생성할 수 있다. 물론 이것은 자바 코드가 생성한 동적 프록시 대상일 뿐이다.다음은
CallAdapterFactory
방법을 들어가 봅시다.public T create(final Class service) {
Utils.validateServiceInterface(service); // “ ”
if (validateEagerly) {
eagerlyValidateMethods(service); // validateEagerly
}
// Proxy 。
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
여기서 우리는 먼저 전송된 인터페이스에 대해 인터페이스인지 확인한 다음에
create()
에 따라 인터페이스의 모든 방법을 캐시하는지 판단하고 마지막으로 validateEagerly
로 일반적인 동적 에이전트 대상을 만들어서 이 대상을 되돌려줍니다.(자바 동적 에이전트 기술을 모르는 학생은 조급해하지 마세요. 제가 문말에 참고 자료를 드릴게요)3. Observable 만들기
Observable observable = netApiService.executePost(url, params, signature);
동적 프록시 대상의 인터페이스 방법을 호출합니다. 이럴 때 호출합니다.
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
}
java.lang.reflect.Proxy;
의InvocationHandler
방법은 여기서 세 가지 파라미터를 볼 수 있다. invoke()
는 proxy
를 통해 생성된 에이전트 대상을 나타낸다.Proxy.newProxyInstance()
는 프록시 대상이 호출된 함수를 나타낸다.method
프록시 대상이 호출된 함수의 매개 변수를 나타낸다.호출 프록시 대상의 모든 함수는 실제적으로 args
의 InvocationHandler
함수를 호출했다.이것은 인터페이스를 연결하는 방법이기 때문에 첫 번째
invoke
는 들어가지 않을 것이고, 기본 방법도 아니기 때문에 두 번째if
도 들어가지 않을 것이다.이렇게 하면 우리의 에이전트 대상이 인터페이스 방법을 호출한 후에 실제로는 newif
대상이고 이 대상을 매개 변수로 okHttpCall<>
방법에 전송한 것을 볼 수 있다.우리가 이전에 전입한 것은
callAdapter.adapt();
이기 때문에 우리는 RxJavaCallAdapterFactory.create()
에 깊이 들어가 구조RxJavaCallAdapterFactory.java
의 방법을 보았고Observable
에서 볼 수 있다.static final class ResponseCallAdapter implements CallAdapter> {
private final Type responseType;
private final Scheduler scheduler;
ResponseCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}
@Override public Type responseType() {
return responseType;
}
@Override public Observable> adapt(Call call) {
Observable> observable = Observable.create(new CallOnSubscribe<>(call));
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}
여기서 우리는 여기에 전달된
adapt()
를 매개 변수로 구성한 것을 보았다okHttp
.CallOnSubscribe
는 어디에서 신성한가???CallOnSubscribe
구조Rxjava
방법에 따르면 이Observable
는 CallOnSubscribe
인터페이스를 실현한 대상이 될 것이다.우리 원본 코드를 보자, 과연 이렇다.
static final class CallOnSubscribe implements Observable.OnSubscribe> {
private final Call originalCall;
CallOnSubscribe(Call originalCall) {
this.originalCall = originalCall;
}
@Override public void call(final Subscriber super Response> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call call = originalCall.clone();
// Wrap the call in a helper which handles both unsubscription and backpressure.
RequestArbiter requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(requestArbiter);
subscriber.setProducer(requestArbiter);
}
}
여기를 보면 생성된 동적 프록시 대상을 호출하는 인터페이스 방법이
Observable.OnSubscribe
만 사용하는 것처럼 Retrofit
를 되돌려 주는 것이 아니라 okHttpCall<>
대상을 되돌려 주는 이유를 알 수 있을 것이다.사실 이것Observable
이 한 전환이다.RxJavaCallAdapterFactory
의CallOnSubscribe
방법을 자세히 보면 여기call()
(사실은 subscriber
방법을 호출할 때 들어온subscribe()
외부 관찰자)에 subscriber
대상이 추가된 것을 알 수 있다.이 대상은 매우 중요하다. requestArbiter
에서는 subscriber.setProducer(requestArbiter);
직접 네트워크를 통해 데이터를 얻고 관찰자 okHttpCall
에게 되돌려준다.4.observable.subscribe(subscriber);가입
여기 코드가 많지 않아요. 한 줄만
subscriber
.observable.subscribe(subscriber);
방법에서 무슨 신기한 일이 일어났는지 자세히 볼까요?(미리 스포일러, subscribe
좋아요. 다리에 해당해요. Observable와 Observer를 연결해요) public final Subscription subscribe(Subscriber super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static Subscription subscribe(Subscriber super T> subscriber, Observable observable) {
// validate and proceed
// , , , 。
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
// ,
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
이 코드에서 우리는
Observable.OnSubscribe
과정에서 먼저 호출subscribe()
하는 것을 보았다. 일반적으로 이 방법은subscribe가 시작되기 전에 호출되고 이벤트가 발송되기 전에 호출된다. 예를 들어 데이터의 삭제나 리셋을 준비하는 데 사용할 수 있다.이것은 선택할 수 있는 방법으로 기본적으로 실행이 비어 있습니다.주의해야 할 것은 준비 작업의 라인에 대한 요구가 있다면 (예를 들어 진도를 표시하는 대화상자가 뜨면 메인 라인에서 실행해야 함)onStart()
는 적용되지 않습니다. (항상 subscribe에서 발생하는 스레드가 호출되기 때문에 스레드를 지정할 수 없습니다. 지정된 스레드에서 준비를 하려면 onStart()
방법을 사용할 수 있습니다.) (이 지식을 기억하고 보류하세요. 지금은 스레드를 바꾸지 않습니다.)고에너지가 왔다!!!우리는 이 코드를 중점적으로 보았다.
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
public OnSubscribe onSubscribeStart(Observable extends T> observableInstance, final OnSubscribe onSubscribe) {
// pass through by default
return onSubscribe;
}
사실
doOnSubscribe()
방법은 onSubscribeStart()
대상을 직접 되돌려주고 onSubscribe
의onSubscribe
방법을 직접 호출한다.우리가 방금 분석한 것을 기억하자면, 여기는 사실 call(subscriber)
의CallOnSubscribe
방법을 호출한 것이다.바로 이곳에서 네트워크를 통해 데이터를 얻고 call()
관찰자를 되돌리는 방법이다.(구체적인 코드는 Subscriber
방법의 call()
이다.public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
마지막으로
subscriber.setProducer(requestArbiter);
방법을 사용합니다.이 producer.request(toRequest);
방법이 바로 request()
의RequestArbiter requestArbiter = new RequestArbiter<>(call, subscriber);
입니다.static final class RequestArbiter extends AtomicBoolean implements Subscription, Producer {
private final Call call;
private final Subscriber super Response> subscriber;
RequestArbiter(Call call, Subscriber super Response> subscriber) {
this.call = call;
this.subscriber = subscriber;
}
@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.
try {
Response response = call.execute();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override public void unsubscribe() {
call.cancel();
}
@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
}
request()
여기에서 네트워크 요청을 진행했습니다.Response response = call.execute();
이곳에서 리셋을 진행한다.다음
if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); }
과onError()
방법의 리셋과 같은 것은 분석하지 않겠습니다.이로써 Retrofit + Rxjava에서 Observable와 Observer를 만들고 Observable에서 Observer를 구독하는 절차와 중간에 숨겨진 네트워크와 리셋의 과정을 완전하게 이해하게 되었다.
참고 자료
Retrofit 2.1 소스 분석 Java 동적 에이전트 기술 Rxjava 소스 분석 1
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.