Dubbo 소스 해석(6) Dubbo Protocol은 Dubbo Protocol을 예로 들면
코드는 다음과 같다.
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation:
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL
* 3. Invoker instance is passed in by the framework, protocol needs not to care
*
* @param Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
@Adaptive
Exporter export(Invoker invoker) throws RpcException;
/**
* Refer a remote service:
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation.
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
*
* @param Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
Invoker refer(Class type, URL url) throws RpcException;
/**
* Destroy protocol:
* 1. Cancel all services this protocol exports and refers
* 2. Release all occupied resources, for example: connection, port, etc.
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}
서비스 게시 방법
ServiceBean이 초기화되었을 때 서비스 서비스Config를 발표합니다.doExportUrlsFor1Protocol()
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
// ....
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// ServiceImpl , javassist、 jdk
Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// , ,
Exporter> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
게시 서비스 포털
public Exporter export(Invoker invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// .
String key = url.getAddress();
//
// client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
//
ExchangeServer server = serverMap.get(key);
if (server == null) {
// server
serverMap.put(key, createServer(url));
} else {
//
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
어떻게 서비스의
참조 서비스 포털
public Invoker refer(Class serviceType, URL url) throws RpcException {
//
optimizeSerialization(url);
// Invoker
DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
이 서비스에 대한 연결 가져오기
private ExchangeClient[] getClients(URL url) {
// connection
boolean service_share_connect = false;
// connections
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// , ,
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
/**
* ,
*/
private ExchangeClient getSharedClient(URL url) {
// ip
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
// client , 1
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
synchronized (key.intern()) {
ExchangeClient exchangeClient = initClient(url);
// client
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
}
/**
*
*/
private ExchangeClient initClient(URL url) {
// , netty3
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
//
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// , 60s
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
//
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
//
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// , Exchangers
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
총결산
Protocol은 게시 및 참조 서비스를 구현하는 데 사용됩니다.간단해, 내 생각에는 서비스의 매개체야.어쩌면 아는 것도 모자랄지도 몰라요.맏형이 말한 대로 나는 아직 등급이 부족하고 시야가 낮다.그래서 이 층은 봉인된 RPC 호출, 즉 인용과 발표 서비스를 봉인한 것이다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
간편한 디버깅dubbo 서비스의 범용 호출최근에 새로운 프로젝트를 만들었는데 마이크로서비스dubbo+zookeeper를 사용했습니다. 그 중 일부 인터페이스는 다른 부서에서 제공했습니다. 이 인터페이스에 대한 디버깅 검증을 할 때 문제가 발생했습니다. 그 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.