Dubbo 소스 해석(6) Dubbo Protocol은 Dubbo Protocol을 예로 들면

Protocol은 두 가지 기능을 포함하는데, 발표 서비스와 인용 서비스이기 때문에, 서비스 발표와 Reference 인용을 할 때 이 인터페이스를 호출해야 한다는 것을 알 수 있다.즉 ServiceBean, ReferenceBean을 초기화할 때도 이 층은 복잡하지 않다
코드는 다음과 같다.
@SPI("dubbo")
public interface Protocol {

    /**
     * Get default port when user doesn't config the port.
     *
     * @return default port
     */
    int getDefaultPort();

    /**
     * Export service for remote invocation: 
* 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when * export the same URL
* 3. Invoker instance is passed in by the framework, protocol needs not to care
* * @param Service type * @param invoker Service invoker * @return exporter reference for exported service, useful for unexport the service later * @throws RpcException thrown when error occurs during export the service, for example: port is occupied */
@Adaptive Exporter export(Invoker invoker) throws RpcException; /** * Refer a remote service:
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation.
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when * connection fails. * * @param Service type * @param type Service class * @param url URL address for the remote service * @return invoker service's local proxy * @throws RpcException when there's any error while connecting to the service provider */
@Adaptive Invoker refer(Class type, URL url) throws RpcException; /** * Destroy protocol:
* 1. Cancel all services this protocol exports and refers
* 2. Release all occupied resources, for example: connection, port, etc.
* 3. Protocol can continue to export and refer new service even after it's destroyed. */
void destroy(); }

서비스 게시 방법


ServiceBean이 초기화되었을 때 서비스 서비스Config를 발표합니다.doExportUrlsFor1Protocol()
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
        //    ....
        if (registryURLs != null && !registryURLs.isEmpty()) {
            for (URL registryURL : registryURLs) {

                url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                URL monitorUrl = loadMonitor(registryURL);
                if (monitorUrl != null) {
                 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                }
                //   ServiceImpl   ,   javassist、  jdk
                Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                //         ,        ,              
                Exporter> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        } else {
                Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
        }
    }

게시 서비스 포털
    public  Exporter export(Invoker invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
    private void openServer(URL url) {
        //        .
        String key = url.getAddress();
        //                   
        // client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            //         
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //   server
                serverMap.put(key, createServer(url));
            } else {
                //     
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            //     
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

어떻게 서비스의


참조 서비스 포털
    public  Invoker refer(Class serviceType, URL url) throws RpcException {
        //      
        optimizeSerialization(url);
        //   Invoker
        DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

이 서비스에 대한 연결 가져오기
    private ExchangeClient[] getClients(URL url) {
        //        connection
        boolean service_share_connect = false;
        //   connections
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        //       ,       ,          
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
    /**
     *        ,           
     */
    private ExchangeClient getSharedClient(URL url) {
        //   ip  
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            //   client   ,   1
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }
        synchronized (key.intern()) {
            ExchangeClient exchangeClient = initClient(url);
            //     client
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            return client;
        }
    }
    /**
     *      
     */
    private ExchangeClient initClient(URL url) {

        //      ,   netty3
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        //        
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        //          ,  60s
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        // 
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            //          
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                //        ,    Exchangers  
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

총결산


Protocol은 게시 및 참조 서비스를 구현하는 데 사용됩니다.간단해, 내 생각에는 서비스의 매개체야.어쩌면 아는 것도 모자랄지도 몰라요.맏형이 말한 대로 나는 아직 등급이 부족하고 시야가 낮다.그래서 이 층은 봉인된 RPC 호출, 즉 인용과 발표 서비스를 봉인한 것이다.

좋은 웹페이지 즐겨찾기