ZooKeeper 소스 코드 를 보면 분포 식 시스템(4)session 관 리 를 어떻게 실현 합 니까?
Session:session 의 실체 클래스 를 표시 하고 sessionId 와 timeout 두 가지 주요 상 태 를 유지 합 니 다.
Session Tracker:Session 라 이 프 사이클 관리 에 관 한 작업
Session Expier:Session 만 료 작업
세 션 인터페이스 와 구현 클래스 인 SessionImpl 을 살 펴 보고 5 개의 속성 을 유지 합 니 다.sessionId,timeout 은 시간 초과,tickTime 은 클 라 이언 트 와 서버 의 심장 박동 시간 을 표시 합 니 다.isClosing 은 닫 을 지 여 부 를 표시 합 니 다.owner 는 해당 하 는 클 라 이언 트 를 표시 합 니 다.
public static interface Session {
long getSessionId();
int getTimeout();
boolean isClosing();
}
public static class SessionImpl implements Session {
SessionImpl(long sessionId, int timeout, long expireTime) {
this.sessionId = sessionId;
this.timeout = timeout;
this.tickTime = expireTime;
isClosing = false;
}
final long sessionId;
final int timeout;
long tickTime;
boolean isClosing;
Object owner;
public long getSessionId() { return sessionId; }
public int getTimeout() { return timeout; }
public boolean isClosing() { return isClosing; }
}
SessionTracker的实现类是SessionTrackerImpl,它是一个单独运行的线程,根据tick周期来批量检查处理当前的session。SessionTrackerImpl直接继承了Thread类,它的定义和构造函数如下,几个主要的属性:
expirer是SessionExpirer的实现类
expirationInterval表示过期的周期,可以看到它的值是tickTime,即如果服务器端在tickTime里面没有收到客户端的心跳,就认为该session过期了
sessionsWithTimeout是一个ConcurrentHashMap,维护了一组sessionId和它对应的timeout过期时间
nextExpirationTime表示下次过期时间,线程会在nextExpirationTime时间来批量过期session
nextSessionId是根据sid计算出的下一个新建的sessionId
sessionById这个HashMap保存了sessionId和Session对象的映射
sessionSets这个HashMap保存了一个过期时间和一组保存在SessionSet中的Session的映射,用来批量清理过期的Session
public interface SessionTracker { public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); } public static interface SessionExpirer { void expire(Session session); long getServerId(); } long createSession(int sessionTimeout); void addSession(long id, int to); boolean touchSession(long sessionId, int sessionTimeout); void setSessionClosing(long sessionId); void shutdown(); void removeSession(long sessionId); void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException; void setOwner(long id, Object owner) throws SessionExpiredException; void dumpSessions(PrintWriter pwriter); } public class SessionTrackerImpl extends Thread implements SessionTracker { private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class); HashMap
sessionsById = new HashMap (); HashMap sessionSets = new HashMap (); ConcurrentHashMap sessionsWithTimeout; long nextSessionId = 0; long nextExpirationTime; int expirationInterval; public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMap sessionsWithTimeout, int tickTime, long sid) { super("SessionTracker"); this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; nextExpirationTime = roundToInterval(System.currentTimeMillis()); this.nextSessionId = initializeNextSession(sid); for (Entry e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } }
看一下SessionTrackerImpl这个线程的run方法实现,实现了批量处理过期Session的逻辑
1. 如果下一次过期时间nextExpirationTime大于当前时间,那么当前线程等待nextExpirationTime - currentTime时间
2. 如果到了过期时间,就从sessionSets里面把当前过期时间对应的一组SessionSet取出
3. 批量关闭和过期这组session
4. 把当前过期时间nextExpirationTime 加上 expirationInterval作为下一个过期时间nextExpiration,继续循环
其中expirer.expire(s)这个操作,这里的expirer的实现类是ZooKeeperServer,它的expire方法会给给客户端发送session关闭的请求
// SessionTrackerImpl synchronized public void run() { try { while (running) { currentTime = System.currentTimeMillis(); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } SessionSet set; set = sessionSets.remove(nextExpirationTime); if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } LOG.info("SessionTrackerImpl exited loop!"); } // ZookeeperServer public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of " + session.getTimeout() + "ms exceeded"); close(sessionId); } private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); }
세 션 을 만 드 는 과정 을 다시 한 번 보도 록 하 겠 습 니 다.
1.createSession 방법 은 session Timeout 매개 변수 하나만 있 으 면 Session 의 만 료 시간 을 지정 합 니 다.현재 전역 의 nextSessionId 를 sessionId 로 addSession 방법 에 전달 합 니 다.
2.addSession 방법 은 sessionId 와 만 료 된 시간의 맵 을 sessions Withtimeout 이라는 맵 에 추가 합 니 다.sessionById 라 는 맵 에서 sessionId 에 대응 하 는 session 대상 을 찾 지 못 하면 session 대상 을 만 든 다음 session ById 맵 에 넣 습 니 다.마지막 으로 touch Session 방법 을 호출 하여 session 의 만 료 시간 등 정 보 를 설정 합 니 다.
3.touch Session 방법 은 session 상 태 를 먼저 판단 하고 닫 으 면 되 돌려 줍 니 다.현재 session 의 만 료 시간 을 계산 합 니 다.이 session 을 처음 터치 하면 tickTime 은 만 료 시간 인 expireTime 으로 설정 되 어 해당 하 는 sessuibSets 에 추가 합 니 다.첫 번 째 touch 가 아니라면 tickTime 은 현재 만 료 된 시간 입 니 다.만 료 되 지 않 았 다 면 돌아 갑 니 다.만 료 되면 만 료 시간 을 다시 계산 하고 tickTime 에 설정 한 다음 해당 하 는 session Sets 에서 먼저 이동 한 다음 새로운 session Sets 에 추가 합 니 다.touch Session 방법 은 주로 session 의 만 료 시간 을 업데이트 하기 위 한 것 입 니 다.synchronized public long createSession(int sessionTimeout) { addSession(nextSessionId, sessionTimeout); return nextSessionId++; } synchronized public void addSession(long id, int sessionTimeout) { sessionsWithTimeout.put(id, sessionTimeout); if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout, 0); sessionsById.put(id, s); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Adding session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } else { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Existing session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } touchSession(id, sessionTimeout); } synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, "SessionTrackerImpl --- Touch session: 0x" + Long.toHexString(sessionId) + " with timeout " + timeout); } SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } long expireTime = roundToInterval(System.currentTimeMillis() + timeout); if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; }
SessionTracker 라 는 인 터 페 이 스 는 주로 ZooKeeper Server 와 같은 종류 로 사 용 됩 니 다.ZooKeeper Server 는 ZooKeeper 의 서버 클래스 를 표시 하고 ZooKeeper 서버 상 태 를 유지 합 니 다.
ZooKeeper Server 의 startup 방법 에서 sessionTracker 대상 이 비어 있 으 면 SessionTracker 대상 을 만 든 다음 startSessionTracker 방법 으로 SessionTracker Impl 을 시작 합 니 다.public void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); synchronized (this) { running = true; notifyAll(); } } protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1); } protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); }
ZooKeeper 서버 의 shutdown 방법 에서 session Tracker 의 shutdown 방법 을 사용 하여 session Tracker Impl 스 레 드 를 닫 습 니 다.public void shutdown() { LOG.info("shutting down"); // new RuntimeException("Calling shutdown").printStackTrace(); this.running = false; // Since sessionTracker and syncThreads poll we just have to // set running to false and they will detect it during the poll // interval. if (sessionTracker != null) { sessionTracker.shutdown(); } if (firstProcessor != null) { firstProcessor.shutdown(); } if (zkDb != null) { zkDb.clear(); } unregisterJMX(); } // SessionTrackerImpl public void shutdown() { LOG.info("Shutting down"); running = false; if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "Shutdown SessionTrackerImpl!"); } }
ZooKeeper 서버 의 createSession 방법 은 ServerCnxn 에 연결 하여 해당 하 는 session 을 만 든 다음 클 라 이언 트 에 session 을 만 드 는 요청 을 보 냅 니 다.long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; }
ZooKeeper Server 의 reopenSession 은 연결 을 끊 은 후 다시 연결 하 는 session 에 게 상 태 를 업데이트 하여 session 을 계속 사용 할 수 있 도록 합 니 다.
1.session 의 비밀번호 가 틀 리 면 finish Session Init 방법 으로 session 을 닫 고 비밀번호 가 정확 하 다 면 revaidate Session 방법 을 호출 합 니 다.
2.revateSession 방법 은 session Tracker 의 touch Session 을 호출 합 니 다.session 이 만 료 되 었 다 면 rc=false,session 이 만 료 되 지 않 았 다 면 session 의 만 료 시간 정 보 를 업데이트 합 니 다.마지막 으로 finish Session Init 방법 도 호출 합 니 다.
3.finishSession Init 방법 은 클 라 이언 트 에 응답 대상 ConnectResponse 를 보 내 고 인증 이 통과 되 지 않 으 면 연결 을 닫 습 니 다. cnxn.sendBuffer(ServerCnxnFactory.closeConn)。인증 통과,cnxn.enableRecv()호출;방법 은 연결 상 태 를 설정 하여 서버 측 연결 을 등록 합 니 다 SelectionKey.OPREAD 이벤트,클 라 이언 트 요청 접수 준비public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { if (!checkPasswd(sessionId, passwd)) { finishSessionInit(cnxn, false); } else { revalidateSession(cnxn, sessionId, sessionTimeout); } } protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); } finishSessionInit(cnxn, rc); } public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // register with JMX try { if (valid) { serverCnxnFactory.registerConnection(cnxn); } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } try { ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool( this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); cnxn.sendBuffer(bb); if (!valid) { LOG.info("Invalid session 0x" + Long.toHexString(cnxn.getSessionId()) + " for client " + cnxn.getRemoteSocketAddress() + ", probably expired"); cnxn.sendBuffer(ServerCnxnFactory.closeConn); } else { LOG.info("Established session 0x" + Long.toHexString(cnxn.getSessionId()) + " with negotiated timeout " + cnxn.getSessionTimeout() + " for client " + cnxn.getRemoteSocketAddress()); cnxn.enableRecv(); } } catch (Exception e) { LOG.warn("Exception while establishing session, closing", e); cnxn.close(); } } // NIOServerCnxn public void enableRecv() { synchronized (this.factory) { sk.selector().wakeup(); if (sk.isValid()) { int interest = sk.interestOps(); if ((interest & SelectionKey.OP_READ) == 0) { sk.interestOps(interest | SelectionKey.OP_READ); } } } }
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
ZooKeeper 서버의 예는 하나뿐입니다.-- Start zookeeper-3.4.6/conf 디렉터리에 zoo_sample.cfg의 파일입니다. 이 파일을zoo로 개명합니다.cfg, 파일 이름은zoo만 가능합니다.cfg, ZooKeeper의 스크립트가 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.