아 리 개원 분포 식 사무 구성 요소 seata: seata client 통신 층 분석
앞서 '아 리 개원 분포 식 사무 구성 요소 seata: seata server 통신 층 분석' 이라는 글 에서 server 엔 드 의 측면 에서 seata 의 네트워크 통신 모듈 을 간단하게 분석 한 적 이 있다.상기 글 에서 일부 개념 을 언급 했다. 예 를 들 어 비동기 전환 동기 화 된 Future 체제, 홍수 방지 체제, 그리고 메시지 큐 체제 등 은 서버 만 가지 고 있 는 기능 특성 이 아니 라 실제 클 라 이언 트 에 게 도 똑 같이 적용 되 고 이들 은 전체 통용 되 는 통신 체제 이다.이 글 은 client 각도 의 통신 세부 사항 과 지난 글 에서 언급 되 지 않 은 관건 적 인 부분 을 이야기 하 는데 이것 은 seata 전체 client - server 통신 모듈 에 대한 정리 라 고 할 수 있다.이미 지난 글 에서 언급 한 개념 은 이 글 에서 더 이상 군말 하지 않 는 다.
클 라 이언 트 역할
seata 에서 클 라 이언 트 에 대해 역할 구분 이 있 고 클 라 이언 트 를 두 가지 역할 로 나 누 는데 각각 TM 와 RM 이다.seata 의 디자인 에서 TM 는 전체 업무 의 발기인 이 고 RM 은 전체 업무 의 참여 자 이다.이 두 가지 역할 은 시작 할 때 모두 server 에 등록 해 야 합 니 다. 이 등록 은 하나의 인증 역할 과 같 을 수 있 고 이미 등 록 된 클 라 이언 트 만 다른 작업 을 할 수 있 습 니 다.그러나 등록 의 의 미 는 인증 뿐만 아니 라 특히 RM 에 있어 서 매우 중요 한 것 은 server 에 resourceId 를 등록 하 는 것 이다. 이 점 은 매우 중요 하 다. RM 의 높 은 사용 가능 문제 와 관련된다.만약 에 어떤 RM 이 정지 되 었 고 전체 업무 가 아직 끝나 지 않 았 다 면 이 럴 때 다른 RM 에 의뢰 하 는 방식 으로 전체 업무 가 정상적으로 끝 날 수 있 습 니 다. 여기 서 관건 적 인 부분 은 바로 resourceId 입 니 다.이 글 은 잠시 이곳 에서 전개 되 지 않 고, 뒤에 여기에 있 는 디자인 을 따로 써 라.
이 글 에서 우 리 는 클 라 이언 트 든 서버 의 실현 이 든 모두 AbstractRpc Remoting 을 추상 적 인 골격 으로 하 는 것 을 보 았 다.AbstractRpc Remoting 이라는 종 류 는 클 라 이언 트 와 서버 의 공공 논 리 를 포함 합 니 다.
메시지 의 발송 과 수신 논리
future 메커니즘
홍수 방지 메커니즘
연결 탱크
이것들 은 앞의 문장 에서 이미 언급 한 적 이 있다.클 라 이언 트 의 실현 디 테 일 은 주로 AbstractRpc Remoting Client 클래스 에서 AbstractRpc Remoting 클래스 를 계승 합 니 다.netty 를 사용 하기 때문에 반드시 Bootstrap 논리 가 있 을 것 입 니 다. seata 는 이 논 리 를 RpcClient Bootstrap 같은 종류 에 단독으로 넣 고 getNewChannel 방법 만 대외 적 으로 제공 합 니 다.이렇게 보면 연결 을 만 드 는 디 테 일 은 잘 봉인 되 었 고 RpcClient Bootstrap 의 직책 도 매우 명확 하 다. 그것 이 바로 새로운 연결 을 만 드 는 것 이다.그러나 대부분의 클 라 이언 트 는 연결 탱크 의 디자인 을 도입 하여 연결 에 대한 재 활용 을 달성 하고 연결 이 가 져 오 는 추가 비용 을 자주 만 들 거나 없 애 는 것 을 피한다.seata 도 연결 탱크 관리 체 제 를 사용 했다.그러나 seata 의 연결 풀 관 리 는 apache 의 comon pools 를 통 해 이 루어 진 것 입 니 다. comon pools 는 하나의 대상 풀 입 니 다. 여기 서 연결 채널 을 하나의 대상 으로 하면 연결 풀 이 됩 니 다.comon pools 를 통 해 대상 풀 을 실현 하려 면 Keyed Poolable Object Factory 인터페이스 에 대응 하 는 관련 방법 을 실현 해 야 합 니 다. 관건 은 다음 과 같은 세 가지 방법 입 니 다.
V makeObject(K key) throws Exception; // 。
void destroyObject(K key, V obj) throws Exception; // 。
boolean validateObject(K key, V obj); // 。
대상 의 유형 과 액세스 대상 에 사용 할 키 의 유형 도 지정 해 야 합 니 다.
Netty PoolableFactory 라 는 종 류 는 Keyed PoolableObjectFactory 인 터 페 이 스 를 실현 하고 대상 의 유형 은 Channel 이 고 key 는 Netty PoolKey 라 고 정의 했다.MakeObject 의 실현 은 주로 몇 단계 로 나 뉘 는데 첫 번 째 는 새로운 연결 을 만 드 는 것 입 니 다.
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("NettyPool create channel to " + key);
}
Channel tmpChannel = clientBootstrap.getNewChannel(address);
long start = System.currentTimeMillis();
Object response;
Channel channelToServer = null;
if (null == key.getMessage()) {
throw new FrameworkException(
"register msg is null, role:" + key.getTransactionRole().name());
}
다음은 연결 을 등록 합 니 다. 등록 에 실패 하면 이상 을 던 지고 방금 만 든 연결 을 닫 습 니 다. 이것 은 연결 이 만들어 진 후에 반드시 등록 에 성공 해 야 사용 할 수 있다 는 것 을 의미 합 니 다.다음 코드 와 같이:
try {
response = rpcRemotingClient.sendAsyncRequestWithResponse(tmpChannel, key.getMessage());
if (!isResponseSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
key.getMessage());
}
} catch (Exception exx) {
if (tmpChannel != null) {
tmpChannel.close();
}
throw new FrameworkException(
"register error,role:" + key.getTransactionRole().name() + ",err:" + exx.getMessage());
}
위 에서 도 볼 수 있 듯 이 등록 메 시 지 는 Netty PoolKey 를 통 해 들 어 왔 다.
대상 풀 을 없 애 는 대상 은 여기 서 연결 을 닫 는 것 이 고 대상 의 유효성 을 검증 하 는 것 은 연결 의 유효성 을 검증 하 는 것 이다.이 두 가지 방법의 실현 은 비교적 간단 하 다.
@Override
public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
if (null != channel) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will destroy channel:" + channel);
}
channel.disconnect();
channel.close();
}
}
@Override
public boolean validateObject(NettyPoolKey key, Channel obj) {
if (null != obj && obj.isActive()) {
return true;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel valid false,channel:" + obj);
}
return false;
}
연결 관리
Netty Client Channel Manager 와 같은 통합 연결 탱크 메커니즘 과 새로운 연결 을 구축 한 후에 연결 관리자 가 되 었 습 니 다.내부 논리 에 따라 서버 에 메 시 지 를 보 내야 할 때 Netty Client Channel Manager 를 통 해 연결 을 받 습 니 다.Netty Client 채널 관리 자 는 연결 탱크 를 통합 하 였 지만, 내부 에서 채널 캐 시 를 유지 하고 있 습 니 다.
private final ConcurrentMap channels = new ConcurrentHashMap<>();
외부 에서 연결 을 가 져 올 때 캐 시 에서 찾 습 니 다. 찾 으 면 채널 의 유효성 을 확인 하고 효과 가 있 으 면 이 채널 로 돌아 갑 니 다.채널 이 잘못 되 었 거나 캐 시 가 없 으 면 연결 풀 에 새 연결 을 의뢰 합 니 다.이 디 테 일 은 acquireChannel 방법 에서:
/**
* Acquire netty client channel connected to remote server.
*
* @param serverAddress server address
* @return netty channel
*/
Channel acquireChannel(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null) {
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (null != channelToServer) {
return channelToServer;
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will connect to " + serverAddress);
}
channelLocks.putIfAbsent(serverAddress, new Object());
synchronized (channelLocks.get(serverAddress)) {
return doConnect(serverAddress);
}
}
새 연결 이 필요 하 다 면 자 물 쇠 를 추가 하여 동기 화 해 야 하 며, Netty Client Channel Manager 내부 에서 도 자 물 쇠 를 유지 하고 있 음 을 알 수 있 습 니 다.연결 을 만 드 는 디 테 일 을 살 펴 보 겠 습 니 다. 앞에서 등록 메 시 지 는 Netty PoolKey 를 통 해 가 져 왔 다 고 했 습 니 다.연결 을 만 들 때 Netty PoolKey 를 생 성 해 야 합 니 다. seata 는 자바 8 의 새로운 기능 인 함수 식 인 터 페 이 스 를 사용 합 니 다.즉, 하나의 Netty PoolKey 가 어떻게 생 성 되 는 지 외부 에서 하나의 함수 로 전달 되 어 결정 된다.Netty Client 채널 관리자 자체 가 관심 을 가 질 필요 가 없습니다. 이 생 성 방식 과 관련 된 변 수 는 바로
private Function poolKeyFunction;
이것 은 Netty Client 채널 관리자 의 구조 함수 의 매개 변수 로 들 어 옵 니 다.RM Client 의 Netty PoolKey 생 성 방식 은 다음 과 같 습 니 다.
@Override
protected Function getPoolKeyFunction() {
return (serverAddress) -> {
String resourceIds = customerKeys == null ? getMergedResourceKeys() : customerKeys;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("RM will register :" + resourceIds);
}
RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
message.setResourceIds(resourceIds);
return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message);
};
}
Netty PoolKey 에 들 어 있 는 message 가 RegisterRMRequest 인지 RegisterTM Request 인지 결정 합 니 다.
마찬가지 로 TM Client 에 대해 서도 Netty PoolKey 생 성 규칙 이 있 습 니 다.
@Override
protected Function getPoolKeyFunction() {
return (severAddress) -> {
RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
};
}
함수 식 프로 그래 밍 의 특징 은 갑자기 IoC 의 느낌 을 준다.
그러면 Netty Client 채널 관리자 로 돌아 가 연결 을 만 드 는 논리 입 니 다. 위 에서 우 리 는 함수 식 과 Netty PoolKey 의 배경 을 설 명 했 습 니 다. doConnect 방법 을 직접 보 세 요.
private Channel doConnect(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
Channel channelFromPool;
try {
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
if (null != previousPoolKey && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
}
channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
channels.put(serverAddress, channelFromPool);
} catch (Exception exx) {
LOGGER.error(FrameworkErrorCode.RegisterRM.getErrCode(), "register RM failed.", exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}
이 를 통 해 알 수 있 듯 이 논 리 는 특별한 것 이 없다. 대상 풀 에 가서 대상 이 되 어 새로운 연결 을 만 드 는 것 이다.그러나 여기 서도 주의 할 점 이 있 습 니 다. Netty Client Channel Manager 는 연결 을 캐 시 하고 자 물 쇠 를 유지 할 뿐만 아니 라 Netty PoolKey 도 캐 시 합 니 다. 새 연결 을 발견 하고 Netty PoolKey 가 이전에 캐 시 한 적 이 있다 면 RM Client 가 만 든 연결 인지 확인 할 것 입 니 다. 그렇다면 resource ids 를 업데이트 해 야 합 니 다.앞에서 말 했 듯 이 RM 등록 시 리 소스 ids 를 등록 하 는 것 이 매우 중요 하 다.
또한, Netty Client Channel Manager 에는 abstractRpc RemotingClient 가 초기 화 될 때 정시 작업 으로 실행 되 는 reconnect 방법 도 있 습 니 다.
@Override
public void init() {
clientBootstrap.start();
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
super.init();
실제 등록 센터 에 가서 현재 서비스 그룹 에 사용 할 수 있 는 서버 가 있 는 지 확인 하고, 서버 와 의 연결 이 사용 가능 한 지 확인 하 며, 사용 할 수 없 으 면 즉시 새 연결 과 캐 시 를 만 듭 니 다.
메시지 패키지
이 안에서 도 mergeSendExecutor Service 라 는 스 레 드 풀 을 볼 수 있 습 니 다. 이전 글 은 '아 리 소스 분포 식 사무 구성 요소 seata: seata server 통신 층 분석' 에서 말 했 습 니 다. 실제로 메시지 큐 체제 로 메 시 지 를 합 쳐 보 냅 니 다.우 리 는 이전 글 에서 도 seata server 엔 드 의 실현 에 있어 서 정기 적 으로 basketMap 을 제거 하지 않 았 다 고 제시 한 적 이 있 습 니 다. 사실은 server 엔 드 가 통신 할 때 이 basketMap 을 전혀 사용 하지 않 았 기 때 문 입 니 다.클 라 이언 트 가 사용 할 수 있 기 때문에 여 기 는 스 레 드 탱크 가 있 습 니 다. basketMap 의 포장 정 보 를 비 웁 니 다.
seata 의 통신 모델 과 디 테 일 은 대체적으로 이 정도 이 고 본 고 는 다음 에 끝난다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
아 리 개원 분포 식 사무 구성 요소 seata: seata client 통신 층 분석예 를 들 어 비동기 전환 동기 화 된 Future 체제, 홍수 방지 체제, 그리고 메시지 큐 체제 등 은 서버 만 가지 고 있 는 기능 특성 이 아니 라 실제 클 라 이언 트 에 게 도 똑 같이 적용 되 고 이들 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.