RPC 프레임워크에 동적 에이전트 적용
8895 단어 동적 프록시 RPC
인스턴스
1. 첫 번째 실례는 황용의 경량급 분포식 RPC 프레임워크 데모(https://gitee.com/huangyong/rpc) 실현중 통신 프레임워크에서 Netty를 사용했기 때문에 분석 과정에서 일부 Netty 코드의 정보가 있을 수 있지만 걱정하지 마세요. Netty를 모르더라도 설명하는 과정에서 최대한 피하고 반사와 동적 에이전트의 역할을 돋보이게 할 것입니다.rpc-simple-client에서 HelloClient.Class에는 다음과 같은 코드가 있습니다.
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);
이 코드는 무슨 일을 합니까?에이전트를 통해 Hello 서비스 대상을 생성하고 Hello 방법을 실행합니다.우리의 인상에서 집행하는 방법은 결국 인터페이스에서 실현되는 방법이다.그 사실이 이렇습니까?아래의 분석을 보시오.rpcProxy 코드는 다음과 같습니다.
public T create(final Class> interfaceClass, final String serviceVersion) {
//
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// RPC
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(method.getDeclaringClass().getName());
request.setServiceVersion(serviceVersion);
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// RPC
if (serviceDiscovery != null) {
String serviceName = interfaceClass.getName();
if (StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
serviceAddress = serviceDiscovery.discover(serviceName);
LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
}
if (StringUtil.isEmpty(serviceAddress)) {
throw new RuntimeException("server address is empty");
}
// RPC
String[] array = StringUtil.split(serviceAddress, ":");
String host = array[0];
int port = Integer.parseInt(array[1]);
// RPC RPC
RpcClient client = new RpcClient(host, port);
long time = System.currentTimeMillis();
RpcResponse response = client.send(request);
LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
if (response == null) {
throw new RuntimeException("response is null");
}
// RPC
if (response.hasException()) {
throw response.getException();
} else {
return response.getResult();
}
}
}
);
}
위의 코드에서 알 수 있듯이 에이전트를 거쳐 Hello 방법을 실행하는 것은 사실 요청을 하는 것이다.하나의 요청인 이상 클라이언트 측과 서버 측을 언급해야 한다. 위에는 사실 클라이언트 측 코드가 있다.그러면 서버가 무엇을 했는지 살펴보고 본고에서 소개한 것과 관련이 없는 코드를 제거하면 RpcServerHandler에서 핵심 코드는 다음과 같다.
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
Object result = handle(request);
response.setResult(result);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); // ,
}
hanlde
// , Client 。
Class> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// CGLib
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
2. 두 번째 실례는 xxl-job 분포식 임무 스케줄링 플랫폼 설명에서 따온 것이다. 이 소스 오픈 프로젝트의 RPC 통신은 Jetty로 이루어진 것이다.
xxl-job-admin에서 XxlJobTrigger.Class의 runExecutor는 다음과 같습니다.
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address); //
runResult = executorBiz.run(triggerParam);
아주 간단하게 실행기를 꺼내서 실행을 촉발합니다.하지만 getExecutorBiz에 들어가는 방법은 다음과 같습니다.
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address,
accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
익숙하지 않아요? 맞아요. 동적 에이전트는 NetComClientProxy의 실현인 것 같아요. 구조적으로 첫 번째 실례의 rpcProxy 코드와 비슷하지 않나요?new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();뭘 했을까요?
public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// send
RpcResponse response = client.send(request);
// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}
}
});
}
여전히 RpcRequest를 봉하여 요청을 보냅니다.그래서runResult=executorBiz에서.run (trigger Param) 은 사실 요청을 보내는 것입니다.위에는 클라이언트 쪽 코드가 있는데, 예전처럼 서버 코드를 보면, 여전히 아는 사이인 것 같다.본문과 무관한 코드를 제거하면 다음과 같다. xxl-job-core에서 JettyServerHandler.Class는 다음과 같습니다.
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
:
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class> serviceClass = serviceBean.getClass(); //
String methodName = request.getMethodName(); // run
Class>[] parameterTypes = request.getParameterTypes(); //
Object[] parameters = request.getParameters(); //
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// CGLib
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
반사에 따라 구체적인 종류를 생성하여 관련 방법을 집행하여 원하는 목적을 달성한다.위의 두 인스턴스 프로세스는 다음 그림으로 요약할 수 있습니다.
RPC, 원격 프로세스 호출.원격 기계를 조종하는 방법입니다.
원리는 사실 매우 간단하다. 클라이언트에서 실행하는 프로그램이 대상 방법을 호출할 때 밑바닥에서 이 방법에 대한 호출을 한다.
/**
* RPC
*
* @author huangyong
* @since 1.0.0
*/
public class RpcRequest {
private String requestId;// id
private String interfaceName;//
private String serviceVersion;
private String methodName;//
private Class>[] parameterTypes;//
private Object[] parameters;//
이를 TCP/HTTP 요청의 매개 변수로 원격 서버를 보내고 원격 서버는 고정 포트를 감청한다. 이 TCP/HTTP 요청을 받은 후 관련 정보를 해석한다. 즉,client에서 전송된 데이터에 따라 서버를 반사적으로 호출하는 방법, 클라이언트가 어떤 종류의 방법, 매개 변수가 무엇인지 등을 포함하여 해당하는 호출을 하고 호출 결과를 패키지를 통해 다시 보내면 된다.