Dubbo 분석의 Exchange 계층
10091 단어 dubbo
시리즈
Dubbo 분석 Serialize 층 Dubbo 분석의 Transport 층 Dubbo 분석의 Exchange 층
앞말
이어 상기 Dubbo분석의 Transport층에 이어 본고는 Exchange층을 계속 소개한다. 이 층의 공식 소개는 정보 교환층이다. 봉인 요청 응답 모델, 동기화 비동기화, Request, Response를 중심으로 확장 인터페이스는 Exchanger, Exchange Channel, Exchange Client, Exchange Server이다.다음은 각각 소개하겠습니다.
Exchanger 분석
Exchanger는 이 층의 핵심 인터페이스 클래스로connect ()와bind () 인터페이스를 제공하여 각각 ExchangeClient와 ExchangeServer를 되돌려줍니다.dubbo는 이 인터페이스의 기본 구현 클래스인 HeaderExchanger를 제공합니다. 코드는 다음과 같습니다.
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
실현 클래스에서connect와bind에서 각각 HeaderExchangeClient와 HeaderExchangeServer를 실례화했고 전송된 매개 변수는Transporters로 여기가 Transport층의 입구 클래스라고 할 수 있다.이곳의 Exchange Client/Exchange Server는 사실 Client/Server에 대한 포장이고 자신의 ChannelHandler에 전송되었다.ChannelHandler는 이미 Transport층에서 소개를 했고 연결 구축, 연결 포트, 요청 발송, 요청 수락 등 인터페이스를 제공했다.기본적으로 사용된 Netty를 예로 들면 Netty Client와 Netty Server에 대한 포장이며 DecodeHandler에 전송되어 NettyHandler에서 호출됩니다.
ExchangeClient 분석
ExchangeClient 자체도 Client에 계승되고 ExchangeChannel에도 계승됩니다.
public interface ExchangeClient extends Client, ExchangeChannel {
}
public interface ExchangeChannel extends Channel {
ResponseFuture request(Object request) throws RemotingException;
ResponseFuture request(Object request, int timeout) throws RemotingException;
ExchangeHandler getExchangeHandler();
@Override
void close(int timeout);
}
ExchangeChannel은 상부의 데이터를 Request로 포장하여 Transport층에 보내는 것을 책임진다.구체적인 논리는 HeaderExchangeChannel에 있습니다.
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
Request를 생성합니다. 구조기에서 RequestId가 동시에 생성됩니다.프로토콜 버전을 설정하고 양방향 통신 여부를 설정한 다음에 실제 업무 데이터를 설정했다.다음은 DefaultFuture 클래스를 실례화했습니다. 이런 식으로 동기화 비동기화 방식을 실현했습니다. 채널은send 전송 요청을 호출한 후에 결과를 기다리지 않고 DefaultFuture를 상층부에 되돌려줍니다. 상층부는 DefaultFuture의 get 방법을 호출해서 응답을 얻을 수 있고 get 방법은 서버의 응답을 기다리는 것을 막아서 되돌려줍니다.Client 수신 메시지는handler에 있습니다. 예를 들어 Netty가 NettyHandler에서 메시지Received 방법으로 응답 메시지를 소개합니다. NettyHandler는 최종적으로 위에서 전송된 DecodeHandler를 호출합니다. DecodeHandler는 디코딩이 되었는지 먼저 판단하고 디코딩을 하면 HeaderExchangeHandler를 호출합니다. 기본적으로 디코딩기가 설정되어 있기 때문에 HeaderExchangeHandler에서received 방법을 호출합니다.
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
서버와 클라이언트가 모두 이 방법을 사용합니다. 여기는 클라이언트가 받아들인 Response입니다. handleResponse 방법을 직접 호출합니다.
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
응답을 받은 후에 Default Future가 응답을 받았음을 알려줍니다. Default Future 자체에 Request Id가 Default Future에 대응하는 Concurrent Hash Map을 저장합니다.구체적으로 어떻게 비추는지, Response도responseId를 포함하는데, 이responseId와requestId는 같다.
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
responseId를 통해 이전에 요청할 때 만든 DefaultFuture를 가져온 다음 DefaultFuture 내부의 response 대상을 업데이트합니다. 업데이트가 끝난 후에 Condition의 signal 방법을 호출하면 사용자는 DefaultFuture의 get 방법을 통해 응답을 받는 막힌 라인을 불러옵니다.
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
막히거나signal 방법으로 깨어나거나 시간 초과를 기다리는 것을 발견할 수 있습니다.이상은 대체로 클라이언트가 응답을 받는 프로세스입니다. 다음은 서버 측 프로세스를 보십시오
ExchangeServer 분석
ExchangeServer는 서버에서 상속되며 두 개의 패키지 서버 채널 방법을 제공합니다
public interface ExchangeServer extends Server {
Collection getExchangeChannels();
ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}
서버 측은 주로 Request 메시지를 수신한 다음에 메시지를 처리하고 마지막으로 응답을 클라이언트에게 보냅니다. 관련 수신 메시지는 위에서 소개했습니다. 마찬가지로 Header Exchange Handler에서의received 방법에서 이곳의 메시지 형식은 Request에 불과합니다.
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
먼저 Response를 만들고 responseId를 requestId로 지정하여 클라이언트에서 구체적인 DefaultFuture를 찾을 수 있도록 합니다.그리고handler의reply방법을 호출하여 메시지를 처리하고 결과를 되돌려줍니다. 어떻게 처리하는지는 뒤에 있는protocol층에서 소개합니다. 대체적으로 Request의 정보를 통해 서버 측의 서비스를 반사하여 결과를 되돌려주고 결과를 Response 대상에 넣고 채널을 통해 메시지를 클라이언트에게 보내는 것입니다.
총결산
본고는 Exchanger, ExchangeClient와 ExchangeServer를 둘러싼 Exchange층의 대체적인 절차를 소개한다.요청은 Request로 봉인되고 응답은 Response로 봉인되며 클라이언트는 비동기적인 방식으로 서버 요청을 수신합니다.
예제 코드 주소
https://github.com/ksfzhaohui...https://gitee.com/OutOfMemory...
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
간편한 디버깅dubbo 서비스의 범용 호출최근에 새로운 프로젝트를 만들었는데 마이크로서비스dubbo+zookeeper를 사용했습니다. 그 중 일부 인터페이스는 다른 부서에서 제공했습니다. 이 인터페이스에 대한 디버깅 검증을 할 때 문제가 발생했습니다. 그 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.