tomcat 세 션 복사 (2)
수신 자 tomcat 가 session 이 필요 한 메 시 지 를 받 았 을 때 최종 적 으로 GroupChannel 의 message Received () 방법 을 호출 하 였 습 니 다.
public void messageReceived(ChannelMessage msg) {
if ( msg == null ) return;
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
}
Serializable fwd = null;
if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
fwd = new ByteMessage(msg.getMessage().getBytes());
} else {
try {
fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());
}catch (Exception sx) {
log.error("Unable to deserialize message:"+msg,sx);
return;
}
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
}
//get the actual member with the correct alive time
Member source = msg.getAddress();
boolean rx = false;
boolean delivered = false;
for ( int i=0; i<channelListeners.size(); i++ ) {
ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
if (channelListener != null && channelListener.accept(fwd, source)) {
channelListener.messageReceived(fwd, source);
delivered = true;
//if the message was accepted by an RPC channel, that channel
//is responsible for returning the reply, otherwise we send an absence reply
if ( channelListener instanceof RpcChannel ) rx = true;
}
}//for
if ((!rx) && (fwd instanceof RpcMessage)) {
//if we have a message that requires a response,
//but none was given, send back an immediate one
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
}
} catch ( Exception x ) {
//this could be the channel listener throwing an exception, we should log it
//as a warning.
if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
throw new RemoteProcessException("Exception:"+x.getMessage(),x);
}
}
이 를 통 해 알 수 있 듯 이 여 기 는 주로 Channel Listener 를 호출 하 는 message Received 방법 입 니 다. 이 실현 유형 은 Simple Tcp Cluster 입 니 다.
public void messageReceived(Serializable message, Member sender) {
ClusterMessage fwd = (ClusterMessage)message;
fwd.setAddress(sender);
messageReceived(fwd);
}
public void messageReceived(ClusterMessage message) {
long start = 0;
if (log.isDebugEnabled() && message != null)
log.debug("Assuming clocks are synched: Replication for "
+ message.getUniqueId() + " took="
+ (System.currentTimeMillis() - (message).getTimestamp())
+ " ms.");
//invoke all the listeners
boolean accepted = false;
if (message != null) {
for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {
ClusterListener listener = (ClusterListener) iter.next();
if (listener.accept(message)) {
accepted = true;
listener.messageReceived(message);
}
}
}
if (!accepted && log.isDebugEnabled()) {
if (notifyLifecycleListenerOnFailure) {
Member dest = message.getAddress();
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,
new SendMessageData(message, dest, null));
}
log.debug("Message " + message.toString() + " from type "
+ message.getClass().getName()
+ " transfered but no listener registered");
}
return;
}
사실 여기 서 호출 된 것 은 ClusterListener 의 message Received () 방법 입 니 다. 기본 적 인 두 개의 ClusterListener 는 ClusterSession Listener 와 JvmRoute Session ID BinderListener 입 니 다. JvmRoute Session ID BinderListener 는 session 에 관 한 메시지 만 보 내 고 있 습 니 다. id 가 변 경 된 메 시 지 는 클 러 스 터 Session Listener. messageReceived () 방법 에 중점 을 두 고 있 습 니 다. 사실은 Delta Manager 의 message DataReceived () 방법 으로 호출 되 었 습 니 다.
public void messageDataReceived(ClusterMessage cmsg) {
if (cmsg != null && cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg;
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS:
case SessionMessage.EVT_SESSION_CREATED:
case SessionMessage.EVT_SESSION_EXPIRED:
case SessionMessage.EVT_SESSION_ACCESSED:
case SessionMessage.EVT_SESSION_DELTA:
case SessionMessage.EVT_CHANGE_SESSION_ID: {
synchronized(receivedMessageQueue) {
if(receiverQueue) {
receivedMessageQueue.add(msg);
return ;
}
}
break;
}
default: {
//we didn't queue, do nothing
break;
}
} //switch
messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
}
여기 사건 은 EVTGET_ALL_SESSIONS, message Received () 방법 에 이 분기 의 처리 코드 가 있 습 니 다. 맨 뒤에 호출 됩 니 다.
handleGET_ALL_SESSIONS () 방법:
protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_GET_ALL_SESSIONS++;
//get a list of all the session from this manager
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
// Write the number of active sessions, followed by the details
// get all sessions and serialize without sync
Session[] currentSessions = findSessions();
long findSessionTimestamp = System.currentTimeMillis() ;
if (isSendAllSessions()) {
sendSessions(sender, currentSessions, findSessionTimestamp);
} else {
// send session at blocks
for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions,findSessionTimestamp);
if (getSendAllSessionsWaitTime() > 0) {
try {
Thread.sleep(getSendAllSessionsWaitTime());
} catch (Exception sleep) {
}
}//end if
}//for
}//end if
SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
newmsg.setTimestamp(findSessionTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
cluster.send(newmsg, sender);
}
주로 모든 session 을 찾 은 다음 에 모든 session 을 보 낸 다음 에 표지 가 끝 난 메 시 지 를 보 내 서 발송 이 완료 되 었 음 을 나타 낸다.이렇게 첫 번 째 tomcat 의 시작 으로 클 러 스 터 에 가입 하 는 것 은 사용 하 는 session 을 가 져 오 는 것 입 니 다. 주요 한 절 차 는 다음 과 같 습 니 다.
발송 자:
StandardContext.start()--->SimpleTcpCluster.createManager()--->StandardContext.setManager()--->
DeltaManager.start()--->DeltaManager.getAllClusterSessions()--->SimpleTcpCluster.send()
---->GroupChannel.send()---->ParallelNioSender.sendMessage()--->NioSender.process()
수신 자:
NioReceiver.listen()--->NioReceiver.readDataFromSocket()---->NioReplicationTask.drainChannel()--->
ListenCallback.messageDataReceived()---->ChannelCoordinator.messageReceived()--->
GroupChannel.messageReceived()--->SimpleTcpCluster.messageReceived()--->
DeltaManager.messageDataReceived()---->DeltaManager.handleGET_ALL_SESSIONS()
수신 자가 데 이 터 를 준비 하고 발송 자 에 게 보 내야 합 니 다. 쉽게 말 하면 통신 은 이 렇 습 니 다.
발송 자 발송 요청 - > 수신 자가 요청 을 받 아들 이 고 데 이 터 를 보 냅 니 다. 하 나 는 session 의 데이터 이 고 하 나 는 session 이 보 낸 데이터 입 니 다 - >
발송 자 는 수신 자가 온 두 가지 데 이 터 를 받 았 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Exception in thread main java.lang. NoClassDefFoundError 오류 해결 방법즉,/home/hadoop/jarfile) 시스템은 Hello World 패키지 아래의class라는 클래스 파일을 실행하고 있다고 오인하여 시스템의 CLASSPATH 아래 (일반적으로 현재 디렉터리를 포함) Hell...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.