아 리 개원 분포 식 사무 구성 요소 seata: seata client 통신 층 분석

머리말
앞서 '아 리 개원 분포 식 사무 구성 요소 seata: seata server 통신 층 분석' 이라는 글 에서 server 엔 드 의 측면 에서 seata 의 네트워크 통신 모듈 을 간단하게 분석 한 적 이 있다.상기 글 에서 일부 개념 을 언급 했다. 예 를 들 어 비동기 전환 동기 화 된 Future 체제, 홍수 방지 체제, 그리고 메시지 큐 체제 등 은 서버 만 가지 고 있 는 기능 특성 이 아니 라 실제 클 라 이언 트 에 게 도 똑 같이 적용 되 고 이들 은 전체 통용 되 는 통신 체제 이다.이 글 은 client 각도 의 통신 세부 사항 과 지난 글 에서 언급 되 지 않 은 관건 적 인 부분 을 이야기 하 는데 이것 은 seata 전체 client - server 통신 모듈 에 대한 정리 라 고 할 수 있다.이미 지난 글 에서 언급 한 개념 은 이 글 에서 더 이상 군말 하지 않 는 다.
클 라 이언 트 역할
seata 에서 클 라 이언 트 에 대해 역할 구분 이 있 고 클 라 이언 트 를 두 가지 역할 로 나 누 는데 각각 TM 와 RM 이다.seata 의 디자인 에서 TM 는 전체 업무 의 발기인 이 고 RM 은 전체 업무 의 참여 자 이다.이 두 가지 역할 은 시작 할 때 모두 server 에 등록 해 야 합 니 다. 이 등록 은 하나의 인증 역할 과 같 을 수 있 고 이미 등 록 된 클 라 이언 트 만 다른 작업 을 할 수 있 습 니 다.그러나 등록 의 의 미 는 인증 뿐만 아니 라 특히 RM 에 있어 서 매우 중요 한 것 은 server 에 resourceId 를 등록 하 는 것 이다. 이 점 은 매우 중요 하 다. RM 의 높 은 사용 가능 문제 와 관련된다.만약 에 어떤 RM 이 정지 되 었 고 전체 업무 가 아직 끝나 지 않 았 다 면 이 럴 때 다른 RM 에 의뢰 하 는 방식 으로 전체 업무 가 정상적으로 끝 날 수 있 습 니 다. 여기 서 관건 적 인 부분 은 바로 resourceId 입 니 다.이 글 은 잠시 이곳 에서 전개 되 지 않 고, 뒤에 여기에 있 는 디자인 을 따로 써 라.
이 글 에서 우 리 는 클 라 이언 트 든 서버 의 실현 이 든 모두 AbstractRpc Remoting 을 추상 적 인 골격 으로 하 는 것 을 보 았 다.AbstractRpc Remoting 이라는 종 류 는 클 라 이언 트 와 서버 의 공공 논 리 를 포함 합 니 다.
메시지 의 발송 과 수신 논리
future 메커니즘
홍수 방지 메커니즘
연결 탱크
이것들 은 앞의 문장 에서 이미 언급 한 적 이 있다.클 라 이언 트 의 실현 디 테 일 은 주로 AbstractRpc Remoting Client 클래스 에서 AbstractRpc Remoting 클래스 를 계승 합 니 다.netty 를 사용 하기 때문에 반드시 Bootstrap 논리 가 있 을 것 입 니 다. seata 는 이 논 리 를 RpcClient Bootstrap 같은 종류 에 단독으로 넣 고 getNewChannel 방법 만 대외 적 으로 제공 합 니 다.이렇게 보면 연결 을 만 드 는 디 테 일 은 잘 봉인 되 었 고 RpcClient Bootstrap 의 직책 도 매우 명확 하 다. 그것 이 바로 새로운 연결 을 만 드 는 것 이다.그러나 대부분의 클 라 이언 트 는 연결 탱크 의 디자인 을 도입 하여 연결 에 대한 재 활용 을 달성 하고 연결 이 가 져 오 는 추가 비용 을 자주 만 들 거나 없 애 는 것 을 피한다.seata 도 연결 탱크 관리 체 제 를 사용 했다.그러나 seata 의 연결 풀 관 리 는 apache 의 comon pools 를 통 해 이 루어 진 것 입 니 다. comon pools 는 하나의 대상 풀 입 니 다. 여기 서 연결 채널 을 하나의 대상 으로 하면 연결 풀 이 됩 니 다.comon pools 를 통 해 대상 풀 을 실현 하려 면 Keyed Poolable Object Factory 인터페이스 에 대응 하 는 관련 방법 을 실현 해 야 합 니 다. 관건 은 다음 과 같은 세 가지 방법 입 니 다.
V makeObject(K key) throws Exception; //              。
void destroyObject(K key, V obj) throws Exception; //               。
boolean validateObject(K key, V obj);  //               。

대상 의 유형 과 액세스 대상 에 사용 할 키 의 유형 도 지정 해 야 합 니 다.
Netty PoolableFactory 라 는 종 류 는 Keyed PoolableObjectFactory 인 터 페 이 스 를 실현 하고 대상 의 유형 은 Channel 이 고 key 는 Netty PoolKey 라 고 정의 했다.MakeObject 의 실현 은 주로 몇 단계 로 나 뉘 는데 첫 번 째 는 새로운 연결 을 만 드 는 것 입 니 다.
  InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
  if (LOGGER.isInfoEnabled()) {
   LOGGER.info("NettyPool create channel to " + key);
  }
  Channel tmpChannel = clientBootstrap.getNewChannel(address);
  long start = System.currentTimeMillis();
  Object response;
  Channel channelToServer = null;
  if (null == key.getMessage()) {
   throw new FrameworkException(
     "register msg is null, role:" + key.getTransactionRole().name());
  }

다음은 연결 을 등록 합 니 다. 등록 에 실패 하면 이상 을 던 지고 방금 만 든 연결 을 닫 습 니 다. 이것 은 연결 이 만들어 진 후에 반드시 등록 에 성공 해 야 사용 할 수 있다 는 것 을 의미 합 니 다.다음 코드 와 같이:
  try {
   response = rpcRemotingClient.sendAsyncRequestWithResponse(tmpChannel, key.getMessage());
   if (!isResponseSuccess(response, key.getTransactionRole())) {
    rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
   } else {
    channelToServer = tmpChannel;
    rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
      key.getMessage());
   }
  } catch (Exception exx) {
   if (tmpChannel != null) {
    tmpChannel.close();
   }
   throw new FrameworkException(
     "register error,role:" + key.getTransactionRole().name() + ",err:" + exx.getMessage());
  }

위 에서 도 볼 수 있 듯 이 등록 메 시 지 는 Netty PoolKey 를 통 해 들 어 왔 다.
대상 풀 을 없 애 는 대상 은 여기 서 연결 을 닫 는 것 이 고 대상 의 유효성 을 검증 하 는 것 은 연결 의 유효성 을 검증 하 는 것 이다.이 두 가지 방법의 실현 은 비교적 간단 하 다.
 @Override
 public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {

  if (null != channel) {
   if (LOGGER.isInfoEnabled()) {
    LOGGER.info("will destroy channel:" + channel);
   }
   channel.disconnect();
   channel.close();
  }
 }


 @Override
 public boolean validateObject(NettyPoolKey key, Channel obj) {
  if (null != obj && obj.isActive()) {
   return true;
  }
  if (LOGGER.isInfoEnabled()) {
   LOGGER.info("channel valid false,channel:" + obj);
  }
  return false;
 }

연결 관리
Netty Client Channel Manager 와 같은 통합 연결 탱크 메커니즘 과 새로운 연결 을 구축 한 후에 연결 관리자 가 되 었 습 니 다.내부 논리 에 따라 서버 에 메 시 지 를 보 내야 할 때 Netty Client Channel Manager 를 통 해 연결 을 받 습 니 다.Netty Client 채널 관리 자 는 연결 탱크 를 통합 하 였 지만, 내부 에서 채널 캐 시 를 유지 하고 있 습 니 다.
private final ConcurrentMap channels = new ConcurrentHashMap<>();

외부 에서 연결 을 가 져 올 때 캐 시 에서 찾 습 니 다. 찾 으 면 채널 의 유효성 을 확인 하고 효과 가 있 으 면 이 채널 로 돌아 갑 니 다.채널 이 잘못 되 었 거나 캐 시 가 없 으 면 연결 풀 에 새 연결 을 의뢰 합 니 다.이 디 테 일 은 acquireChannel 방법 에서:
    /**
     * Acquire netty client channel connected to remote server.
     *
     * @param serverAddress server address
     * @return netty channel
     */
    Channel acquireChannel(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null) {
            channelToServer = getExistAliveChannel(channelToServer, serverAddress);
            if (null != channelToServer) {
                return channelToServer;
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("will connect to " + serverAddress);
        }
        channelLocks.putIfAbsent(serverAddress, new Object());
        synchronized (channelLocks.get(serverAddress)) {
            return doConnect(serverAddress);
        }
    }

새 연결 이 필요 하 다 면 자 물 쇠 를 추가 하여 동기 화 해 야 하 며, Netty Client Channel Manager 내부 에서 도 자 물 쇠 를 유지 하고 있 음 을 알 수 있 습 니 다.연결 을 만 드 는 디 테 일 을 살 펴 보 겠 습 니 다. 앞에서 등록 메 시 지 는 Netty PoolKey 를 통 해 가 져 왔 다 고 했 습 니 다.연결 을 만 들 때 Netty PoolKey 를 생 성 해 야 합 니 다. seata 는 자바 8 의 새로운 기능 인 함수 식 인 터 페 이 스 를 사용 합 니 다.즉, 하나의 Netty PoolKey 가 어떻게 생 성 되 는 지 외부 에서 하나의 함수 로 전달 되 어 결정 된다.Netty Client 채널 관리자 자체 가 관심 을 가 질 필요 가 없습니다. 이 생 성 방식 과 관련 된 변 수 는 바로
    private Function poolKeyFunction;

이것 은 Netty Client 채널 관리자 의 구조 함수 의 매개 변수 로 들 어 옵 니 다.RM Client 의 Netty PoolKey 생 성 방식 은 다음 과 같 습 니 다.
    @Override
    protected Function getPoolKeyFunction() {
        return (serverAddress) -> {
            String resourceIds = customerKeys == null ? getMergedResourceKeys() : customerKeys;
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("RM will register :" + resourceIds);
            }
            RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
            message.setResourceIds(resourceIds);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message);
        };
    }

Netty PoolKey 에 들 어 있 는 message 가 RegisterRMRequest 인지 RegisterTM Request 인지 결정 합 니 다.
마찬가지 로 TM Client 에 대해 서도 Netty PoolKey 생 성 규칙 이 있 습 니 다.
    @Override
    protected Function getPoolKeyFunction() {
        return (severAddress) -> {
            RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
        };
    }

함수 식 프로 그래 밍 의 특징 은 갑자기 IoC 의 느낌 을 준다.
그러면 Netty Client 채널 관리자 로 돌아 가 연결 을 만 드 는 논리 입 니 다. 위 에서 우 리 는 함수 식 과 Netty PoolKey 의 배경 을 설 명 했 습 니 다. doConnect 방법 을 직접 보 세 요.
    private Channel doConnect(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null && channelToServer.isActive()) {
            return channelToServer;
        }
        Channel channelFromPool;
        try {
            NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
            NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
            if (null != previousPoolKey && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
                RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
                ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
            }
            channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
            channels.put(serverAddress, channelFromPool);
        } catch (Exception exx) {
            LOGGER.error(FrameworkErrorCode.RegisterRM.getErrCode(), "register RM failed.", exx);
            throw new FrameworkException("can not register RM,err:" + exx.getMessage());
        }
        return channelFromPool;
    }

이 를 통 해 알 수 있 듯 이 논 리 는 특별한 것 이 없다. 대상 풀 에 가서 대상 이 되 어 새로운 연결 을 만 드 는 것 이다.그러나 여기 서도 주의 할 점 이 있 습 니 다. Netty Client Channel Manager 는 연결 을 캐 시 하고 자 물 쇠 를 유지 할 뿐만 아니 라 Netty PoolKey 도 캐 시 합 니 다. 새 연결 을 발견 하고 Netty PoolKey 가 이전에 캐 시 한 적 이 있다 면 RM Client 가 만 든 연결 인지 확인 할 것 입 니 다. 그렇다면 resource ids 를 업데이트 해 야 합 니 다.앞에서 말 했 듯 이 RM 등록 시 리 소스 ids 를 등록 하 는 것 이 매우 중요 하 다.
또한, Netty Client Channel Manager 에는 abstractRpc RemotingClient 가 초기 화 될 때 정시 작업 으로 실행 되 는 reconnect 방법 도 있 습 니 다.
    @Override
    public void init() {
        clientBootstrap.start();
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        super.init();

실제 등록 센터 에 가서 현재 서비스 그룹 에 사용 할 수 있 는 서버 가 있 는 지 확인 하고, 서버 와 의 연결 이 사용 가능 한 지 확인 하 며, 사용 할 수 없 으 면 즉시 새 연결 과 캐 시 를 만 듭 니 다.
메시지 패키지
이 안에서 도 mergeSendExecutor Service 라 는 스 레 드 풀 을 볼 수 있 습 니 다. 이전 글 은 '아 리 소스 분포 식 사무 구성 요소 seata: seata server 통신 층 분석' 에서 말 했 습 니 다. 실제로 메시지 큐 체제 로 메 시 지 를 합 쳐 보 냅 니 다.우 리 는 이전 글 에서 도 seata server 엔 드 의 실현 에 있어 서 정기 적 으로 basketMap 을 제거 하지 않 았 다 고 제시 한 적 이 있 습 니 다. 사실은 server 엔 드 가 통신 할 때 이 basketMap 을 전혀 사용 하지 않 았 기 때 문 입 니 다.클 라 이언 트 가 사용 할 수 있 기 때문에 여 기 는 스 레 드 탱크 가 있 습 니 다. basketMap 의 포장 정 보 를 비 웁 니 다.
seata 의 통신 모델 과 디 테 일 은 대체적으로 이 정도 이 고 본 고 는 다음 에 끝난다.

좋은 웹페이지 즐겨찾기