ZooKeeper 소스 코드 를 보면 분포 식 시스템(4)session 관 리 를 어떻게 실현 합 니까?

15374 단어 zookeeperJavaZooKeeper
이 편 은 ZooKeeper 가 세 션 을 어떻게 관리 하 는 지 살 펴 보 자.Session 관련 인 터 페 이 스 는 다음 과 같 습 니 다.
Session:session 의 실체 클래스 를 표시 하고 sessionId 와 timeout 두 가지 주요 상 태 를 유지 합 니 다.
Session Tracker:Session 라 이 프 사이클 관리 에 관 한 작업
Session Expier:Session 만 료 작업
세 션 인터페이스 와 구현 클래스 인 SessionImpl 을 살 펴 보고 5 개의 속성 을 유지 합 니 다.sessionId,timeout 은 시간 초과,tickTime 은 클 라 이언 트 와 서버 의 심장 박동 시간 을 표시 합 니 다.isClosing 은 닫 을 지 여 부 를 표시 합 니 다.owner 는 해당 하 는 클 라 이언 트 를 표시 합 니 다.
public static interface Session {
        long getSessionId();
        int getTimeout();
        boolean isClosing();
    }

public static class SessionImpl implements Session {
        SessionImpl(long sessionId, int timeout, long expireTime) {
            this.sessionId = sessionId;
            this.timeout = timeout;
            this.tickTime = expireTime;
            isClosing = false;
        }

        final long sessionId;
        final int timeout;
        long tickTime;
        boolean isClosing;

        Object owner;

        public long getSessionId() { return sessionId; }
        public int getTimeout() { return timeout; }
        public boolean isClosing() { return isClosing; }
    }
 
   
  

SessionTracker的实现类是SessionTrackerImpl,它是一个单独运行的线程,根据tick周期来批量检查处理当前的session。SessionTrackerImpl直接继承了Thread类,它的定义和构造函数如下,几个主要的属性:

expirer是SessionExpirer的实现类

expirationInterval表示过期的周期,可以看到它的值是tickTime,即如果服务器端在tickTime里面没有收到客户端的心跳,就认为该session过期了

sessionsWithTimeout是一个ConcurrentHashMap,维护了一组sessionId和它对应的timeout过期时间

nextExpirationTime表示下次过期时间,线程会在nextExpirationTime时间来批量过期session

nextSessionId是根据sid计算出的下一个新建的sessionId

sessionById这个HashMap保存了sessionId和Session对象的映射

sessionSets这个HashMap保存了一个过期时间和一组保存在SessionSet中的Session的映射,用来批量清理过期的Session

public interface SessionTracker {
    public static interface Session {
        long getSessionId();
        int getTimeout();
        boolean isClosing();
    }
    public static interface SessionExpirer {
        void expire(Session session);

        long getServerId();
    }

    long createSession(int sessionTimeout);

    void addSession(long id, int to);

    boolean touchSession(long sessionId, int sessionTimeout);

    void setSessionClosing(long sessionId);

    void shutdown();

    void removeSession(long sessionId);

    void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException;

    void setOwner(long id, Object owner) throws SessionExpiredException;

    void dumpSessions(PrintWriter pwriter);
}

public class SessionTrackerImpl extends Thread implements SessionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);

    HashMap sessionsById = new HashMap();

    HashMap sessionSets = new HashMap();

    ConcurrentHashMap sessionsWithTimeout;
    long nextSessionId = 0;
    long nextExpirationTime;

    int expirationInterval;

public SessionTrackerImpl(SessionExpirer expirer,
            ConcurrentHashMap sessionsWithTimeout, int tickTime,
            long sid)
    {
        super("SessionTracker");
        this.expirer = expirer;
        this.expirationInterval = tickTime;
        this.sessionsWithTimeout = sessionsWithTimeout;
        nextExpirationTime = roundToInterval(System.currentTimeMillis());
        this.nextSessionId = initializeNextSession(sid);
        for (Entry e : sessionsWithTimeout.entrySet()) {
            addSession(e.getKey(), e.getValue());
        }
    }

 
 
   
  

看一下SessionTrackerImpl这个线程的run方法实现,实现了批量处理过期Session的逻辑

1. 如果下一次过期时间nextExpirationTime大于当前时间,那么当前线程等待nextExpirationTime - currentTime时间

2. 如果到了过期时间,就从sessionSets里面把当前过期时间对应的一组SessionSet取出

3. 批量关闭和过期这组session

4. 把当前过期时间nextExpirationTime 加上 expirationInterval作为下一个过期时间nextExpiration,继续循环

其中expirer.expire(s)这个操作,这里的expirer的实现类是ZooKeeperServer,它的expire方法会给给客户端发送session关闭的请求

// SessionTrackerImpl 
synchronized public void run() {
        try {
            while (running) {
                currentTime = System.currentTimeMillis();
                if (nextExpirationTime > currentTime) {
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
                SessionSet set;
                set = sessionSets.remove(nextExpirationTime);
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        setSessionClosing(s.sessionId);
                        expirer.expire(s);
                    }
                }
                nextExpirationTime += expirationInterval;
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

// ZookeeperServer
public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
        close(sessionId);
    }

private void close(long sessionId) {
        submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
    }

세 션 을 만 드 는 과정 을 다시 한 번 보도 록 하 겠 습 니 다.
1.createSession 방법 은 session Timeout 매개 변수 하나만 있 으 면 Session 의 만 료 시간 을 지정 합 니 다.현재 전역 의 nextSessionId 를 sessionId 로 addSession 방법 에 전달 합 니 다.
2.addSession 방법 은 sessionId 와 만 료 된 시간의 맵 을 sessions Withtimeout 이라는 맵 에 추가 합 니 다.sessionById 라 는 맵 에서 sessionId 에 대응 하 는 session 대상 을 찾 지 못 하면 session 대상 을 만 든 다음 session ById 맵 에 넣 습 니 다.마지막 으로 touch Session 방법 을 호출 하여 session 의 만 료 시간 등 정 보 를 설정 합 니 다.
3.touch Session 방법 은 session 상 태 를 먼저 판단 하고 닫 으 면 되 돌려 줍 니 다.현재 session 의 만 료 시간 을 계산 합 니 다.이 session 을 처음 터치 하면 tickTime 은 만 료 시간 인 expireTime 으로 설정 되 어 해당 하 는 sessuibSets 에 추가 합 니 다.첫 번 째 touch 가 아니라면 tickTime 은 현재 만 료 된 시간 입 니 다.만 료 되 지 않 았 다 면 돌아 갑 니 다.만 료 되면 만 료 시간 을 다시 계산 하고 tickTime 에 설정 한 다음 해당 하 는 session Sets 에서 먼저 이동 한 다음 새로운 session Sets 에 추가 합 니 다.touch Session 방법 은 주로 session 의 만 료 시간 을 업데이트 하기 위 한 것 입 니 다.
synchronized public long createSession(int sessionTimeout) {
        addSession(nextSessionId, sessionTimeout);
        return nextSessionId++;
    }

synchronized public void addSession(long id, int sessionTimeout) {
        sessionsWithTimeout.put(id, sessionTimeout);
        if (sessionsById.get(id) == null) {
            SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
            sessionsById.put(id, s);
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                        "SessionTrackerImpl --- Adding session 0x"
                        + Long.toHexString(id) + " " + sessionTimeout);
            }
        } else {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                        "SessionTrackerImpl --- Existing session 0x"
                        + Long.toHexString(id) + " " + sessionTimeout);
            }
        }
        touchSession(id, sessionTimeout);
    }

synchronized public boolean touchSession(long sessionId, int timeout) {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.CLIENT_PING_TRACE_MASK,
                                     "SessionTrackerImpl --- Touch session: 0x"
                    + Long.toHexString(sessionId) + " with timeout " + timeout);
        }
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
        if (s.tickTime >= expireTime) {
            // Nothing needs to be done
            return true;
        }
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
            set.sessions.remove(s);
        }
        s.tickTime = expireTime;
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);
        return true;
    }

SessionTracker 라 는 인 터 페 이 스 는 주로 ZooKeeper Server 와 같은 종류 로 사 용 됩 니 다.ZooKeeper Server 는 ZooKeeper 의 서버 클래스 를 표시 하고 ZooKeeper 서버 상 태 를 유지 합 니 다.
ZooKeeper Server 의 startup 방법 에서 sessionTracker 대상 이 비어 있 으 면 SessionTracker 대상 을 만 든 다음 startSessionTracker 방법 으로 SessionTracker Impl 을 시작 합 니 다.
 public void startup() {        
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        registerJMX();

        synchronized (this) {
            running = true;
            notifyAll();
        }
    }

 protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1);
    } 

protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }

ZooKeeper 서버 의 shutdown 방법 에서 session Tracker 의 shutdown 방법 을 사용 하여 session Tracker Impl 스 레 드 를 닫 습 니 다.
 public void shutdown() {
        LOG.info("shutting down");

        // new RuntimeException("Calling shutdown").printStackTrace();
        this.running = false;
        // Since sessionTracker and syncThreads poll we just have to
        // set running to false and they will detect it during the poll
        // interval.
        if (sessionTracker != null) {
            sessionTracker.shutdown();
        }
        if (firstProcessor != null) {
            firstProcessor.shutdown();
        }
        if (zkDb != null) {
            zkDb.clear();
        }

        unregisterJMX();
    }

// SessionTrackerImpl
public void shutdown() {
        LOG.info("Shutting down");

        running = false;
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                                     "Shutdown SessionTrackerImpl!");
        }
    }

ZooKeeper 서버 의 createSession 방법 은 ServerCnxn 에 연결 하여 해당 하 는 session 을 만 든 다음 클 라 이언 트 에 session 을 만 드 는 요청 을 보 냅 니 다.
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        cnxn.setSessionId(sessionId);
        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
        return sessionId;
    }

ZooKeeper Server 의 reopenSession 은 연결 을 끊 은 후 다시 연결 하 는 session 에 게 상 태 를 업데이트 하여 session 을 계속 사용 할 수 있 도록 합 니 다.
1.session 의 비밀번호 가 틀 리 면 finish Session Init 방법 으로 session 을 닫 고 비밀번호 가 정확 하 다 면 revaidate Session 방법 을 호출 합 니 다.
2.revateSession 방법 은 session Tracker 의 touch Session 을 호출 합 니 다.session 이 만 료 되 었 다 면 rc=false,session 이 만 료 되 지 않 았 다 면 session 의 만 료 시간 정 보 를 업데이트 합 니 다.마지막 으로 finish Session Init 방법 도 호출 합 니 다.
3.finishSession Init 방법 은 클 라 이언 트 에 응답 대상 ConnectResponse 를 보 내 고 인증 이 통과 되 지 않 으 면 연결 을 닫 습 니 다. cnxn.sendBuffer(ServerCnxnFactory.closeConn)。인증 통과,cnxn.enableRecv()호출;방법 은 연결 상 태 를 설정 하여 서버 측 연결 을 등록 합 니 다 SelectionKey.OPREAD 이벤트,클 라 이언 트 요청 접수 준비
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
            int sessionTimeout) throws IOException {
        if (!checkPasswd(sessionId, passwd)) {
            finishSessionInit(cnxn, false);
        } else {
            revalidateSession(cnxn, sessionId, sessionTimeout);
        }
    }

protected void revalidateSession(ServerCnxn cnxn, long sessionId,
            int sessionTimeout) throws IOException {
        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                     "Session 0x" + Long.toHexString(sessionId) +
                    " is valid: " + rc);
        }
        finishSessionInit(cnxn, rc);
    }

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
        // register with JMX
        try {
            if (valid) {
                serverCnxnFactory.registerConnection(cnxn);
            }
        } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
        }

        try {
            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            bos.writeInt(-1, "len");
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(
                        this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
            bb.putInt(bb.remaining() - 4).rewind();
            cnxn.sendBuffer(bb);    

            if (!valid) {
                LOG.info("Invalid session 0x"
                        + Long.toHexString(cnxn.getSessionId())
                        + " for client "
                        + cnxn.getRemoteSocketAddress()
                        + ", probably expired");
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            } else {
                LOG.info("Established session 0x"
                        + Long.toHexString(cnxn.getSessionId())
                        + " with negotiated timeout " + cnxn.getSessionTimeout()
                        + " for client "
                        + cnxn.getRemoteSocketAddress());
                cnxn.enableRecv();
            }
                
        } catch (Exception e) {
            LOG.warn("Exception while establishing session, closing", e);
            cnxn.close();
        }
    }

// NIOServerCnxn
public void enableRecv() {
        synchronized (this.factory) {
            sk.selector().wakeup();
            if (sk.isValid()) {
                int interest = sk.interestOps();
                if ((interest & SelectionKey.OP_READ) == 0) {
                    sk.interestOps(interest | SelectionKey.OP_READ);
                }
            }
        }
    }

좋은 웹페이지 즐겨찾기