RPC 프레임워크에 동적 에이전트 적용

8895 단어 동적 프록시 RPC

인스턴스


1. 첫 번째 실례는 황용의 경량급 분포식 RPC 프레임워크 데모(https://gitee.com/huangyong/rpc) 실현중 통신 프레임워크에서 Netty를 사용했기 때문에 분석 과정에서 일부 Netty 코드의 정보가 있을 수 있지만 걱정하지 마세요. Netty를 모르더라도 설명하는 과정에서 최대한 피하고 반사와 동적 에이전트의 역할을 돋보이게 할 것입니다.rpc-simple-client에서 HelloClient.Class에는 다음과 같은 코드가 있습니다.
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);

이 코드는 무슨 일을 합니까?에이전트를 통해 Hello 서비스 대상을 생성하고 Hello 방법을 실행합니다.우리의 인상에서 집행하는 방법은 결국 인터페이스에서 실현되는 방법이다.그 사실이 이렇습니까?아래의 분석을 보시오.rpcProxy 코드는 다음과 같습니다.
public  T create(final Class> interfaceClass, final String serviceVersion) {
        //         
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //    RPC            
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(method.getDeclaringClass().getName());
                        request.setServiceVersion(serviceVersion);
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        //    RPC     
                        if (serviceDiscovery != null) {
                            String serviceName = interfaceClass.getName();
                            if (StringUtil.isNotEmpty(serviceVersion)) {
                                serviceName += "-" + serviceVersion;
                            }
                            serviceAddress = serviceDiscovery.discover(serviceName);
                            LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
                        }
                        if (StringUtil.isEmpty(serviceAddress)) {
                            throw new RuntimeException("server address is empty");
                        }
                        //   RPC               
                        String[] array = StringUtil.split(serviceAddress, ":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
                        //    RPC          RPC   
                        RpcClient client = new RpcClient(host, port);
                        long time = System.currentTimeMillis();
                        RpcResponse response = client.send(request);
                        LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
                        if (response == null) {
                            throw new RuntimeException("response is null");
                        }
                        //    RPC     
                        if (response.hasException()) {
                            throw response.getException();
                        } else {
                            return response.getResult();
                        }
                    }
                }
        );
    }


위의 코드에서 알 수 있듯이 에이전트를 거쳐 Hello 방법을 실행하는 것은 사실 요청을 하는 것이다.하나의 요청인 이상 클라이언트 측과 서버 측을 언급해야 한다. 위에는 사실 클라이언트 측 코드가 있다.그러면 서버가 무엇을 했는지 살펴보고 본고에서 소개한 것과 관련이 없는 코드를 제거하면 RpcServerHandler에서 핵심 코드는 다음과 같다.
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
   Object result = handle(request);
   response.setResult(result);
   ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  //  ,      
}
    hanlde       
       //            ,    Client       。
  Class> serviceClass = serviceBean.getClass();
  String methodName = request.getMethodName();
  Class>[] parameterTypes = request.getParameterTypes();
  Object[] parameters = request.getParameters();
   //    CGLib       
   FastClass serviceFastClass = FastClass.create(serviceClass);
   FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
   return serviceFastMethod.invoke(serviceBean, parameters);

2. 두 번째 실례는 xxl-job 분포식 임무 스케줄링 플랫폼 설명에서 따온 것이다. 이 소스 오픈 프로젝트의 RPC 통신은 Jetty로 이루어진 것이다.
xxl-job-admin에서 XxlJobTrigger.Class의 runExecutor는 다음과 같습니다.
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);  //         
  runResult = executorBiz.run(triggerParam);

아주 간단하게 실행기를 꺼내서 실행을 촉발합니다.하지만 getExecutorBiz에 들어가는 방법은 다음과 같습니다.
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, 
                           accessToken).getObject();
  executorBizRepository.put(address, executorBiz);
  return executorBiz;

익숙하지 않아요? 맞아요. 동적 에이전트는 NetComClientProxy의 실현인 것 같아요. 구조적으로 첫 번째 실례의 rpcProxy 코드와 비슷하지 않나요?new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();뭘 했을까요?
public Object getObject() throws Exception {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                        // request  
                        RpcRequest request = new RpcRequest();
                        request.setServerAddress(serverAddress);
                        request.setCreateMillisTime(System.currentTimeMillis());
                        request.setAccessToken(accessToken);
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);

                        // send  
                        RpcResponse response = client.send(request);

                        // valid response
                        if (response == null) {
                            logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
                            throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
                        }
                        if (response.isError()) {
                            throw new RuntimeException(response.getError());
                        } else {
                            return response.getResult();
                        }

                    }
                });
    }


여전히 RpcRequest를 봉하여 요청을 보냅니다.그래서runResult=executorBiz에서.run (trigger Param) 은 사실 요청을 보내는 것입니다.위에는 클라이언트 쪽 코드가 있는데, 예전처럼 서버 코드를 보면, 여전히 아는 사이인 것 같다.본문과 무관한 코드를 제거하면 다음과 같다. xxl-job-core에서 JettyServerHandler.Class는 다음과 같습니다.
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
    :
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class> serviceClass = serviceBean.getClass();  //  
            String methodName = request.getMethodName();    //   run
            Class>[] parameterTypes = request.getParameterTypes();  //    
            Object[] parameters = request.getParameters();   //    

            FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            //    CGLib       
            Object result = serviceFastMethod.invoke(serviceBean, parameters);
            response.setResult(result);
        } catch (Throwable t) {
            t.printStackTrace();
            response.setError(t.getMessage());
        }
        return response;
}

반사에 따라 구체적인 종류를 생성하여 관련 방법을 집행하여 원하는 목적을 달성한다.위의 두 인스턴스 프로세스는 다음 그림으로 요약할 수 있습니다.
 
RPC, 원격 프로세스 호출.원격 기계를 조종하는 방법입니다.
원리는 사실 매우 간단하다. 클라이언트에서 실행하는 프로그램이 대상 방법을 호출할 때 밑바닥에서 이 방법에 대한 호출을 한다.
/**
 *    RPC   
 *
 * @author huangyong
 * @since 1.0.0
 */
public class RpcRequest {

    private String requestId;//   id
    private String interfaceName;//           
    private String serviceVersion;
    private String methodName;//     
    private Class>[] parameterTypes;//         
    private Object[] parameters;//       

이를 TCP/HTTP 요청의 매개 변수로 원격 서버를 보내고 원격 서버는 고정 포트를 감청한다. 이 TCP/HTTP 요청을 받은 후 관련 정보를 해석한다. 즉,client에서 전송된 데이터에 따라 서버를 반사적으로 호출하는 방법, 클라이언트가 어떤 종류의 방법, 매개 변수가 무엇인지 등을 포함하여 해당하는 호출을 하고 호출 결과를 패키지를 통해 다시 보내면 된다.

좋은 웹페이지 즐겨찾기