Zookeeper 서버 독립 실행형 시작

16560 단어 분산zookeeper
먼저 ZooKeeper 서버의 전체 구조를 살펴보겠습니다.

1. 독립 실행형 서버 시작


ZooKeeper 서버의 시작은 크게 다음과 같은 5단계로 나뉜다. 프로필 분석, 데이터 관리자 초기화, 네트워크 I/O 관리자 초기화, 데이터 복구와 대외 서비스이다.다음 그림은 독립 실행형 서버 시작 프로세스입니다.

1.1 사전 시작


사전 시작 단계는 다음과 같습니다.
(1) QuorumPeerMain을 시작 클래스로 통일합니다.
독립 실행형 버전이든 클러스터 버전이든 ZooKeeper 서버를 시작할 때 zkServer에 있습니다.cmd 및 zkServer.sh 두 스크립트에서 org를 사용하도록 설정했습니다.apach.zookeeper.server.quorum.QuorumPeerMain을 시작 클래스 입구로 사용합니다.
public class QuorumPeerMain {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
 
    private static final String USAGE = "Usage: QuorumPeerMain configfile";
 
    protected QuorumPeer quorumPeer;
 
    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }
 
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);  // zoo.cfg
        }
 
        // Start and schedule the the purge task  // 
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
 
        if (args.length == 1 && config.servers.size() > 0) {   // 
            runFromConfig(config); 
        } else {   // 
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);  // 
        }
    }

(2) 프로필 zoo를 확인합니다.cfg.
ZooKeeper는 먼저 프로필을 해석합니다. 프로필의 해석은 사실 zoo입니다.cfg 파일의 해석입니다. 이 파일은 ZooKeeper가 실행될 때의 기본 매개 변수를 설정합니다. ticketTime, DataDir, clientPort 등 매개 변수를 포함합니다.
(3) 히스토리 파일 청소기 DatadirCleanupManager를 만들고 시작합니다.
3.4.0 버전부터 ZooKeeper는 트랜잭션 로그와 스냅샷 파일을 정기적으로 정리하는 등 자동으로 히스토리 데이터 파일을 정리하는 메커니즘을 추가했습니다.
(4) 현재 집단 모드인지 단기 모드의 시작인지 판단한다.
위 단계 (2) 에서 해석한 클러스터 서버 주소 목록을 통해 현재 클러스터 모드인지 단일 모드인지 판단하고 단일 모드라면 ZooKeeperServerMain에 시작 처리를 의뢰합니다.
(5) 프로필 zoo를 다시 실행합니다.cfg의 해석.
(6) 서버 인스턴스 ZooKeeperServer를 만듭니다.
      org.apach.zookeeper.server.ZooKeeperServer는 독립 실행형 ZooKeeper Server의 가장 핵심적인 실체 클래스입니다.ZooKeeper 서버는 먼저 서버 실례를 생성합니다. 다음 단계는 이 서버 실례에 대한 초기화 작업입니다. 연결기, 메모리 데이터베이스, 요청 프로세서 등 구성 요소의 초기화를 포함합니다.
public class ZooKeeperServerMain {
    private static final Logger LOG =
        LoggerFactory.getLogger(ZooKeeperServerMain.class);
 
    private static final String USAGE =
        "Usage: ZooKeeperServerMain configfile | port datadir [ticktime] [maxcnxns]";
 
    private ServerCnxnFactory cnxnFactory;
 
    /*
     * Start up the ZooKeeper server.
     *
     * @param args the configfile or the port datadir [ticktime]
     */
    public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }
 
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }
 
        // zoo.cfg 
        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        } else {
            config.parse(args);
        }
 
        // ZooKeeperServer。
        runFromConfig(config);
    }
 
    /**
     * Run from a ServerConfig.
     * @param config ServerConfig to use.
     * @throws IOException
     */
    public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            ZooKeeperServer zkServer = new ZooKeeperServer();  // ZooKeeperServer。
 
            // ZooKeeper FileTxnSnapLog
            FileTxnSnapLog ftxn = new FileTxnSnapLog(new
                   File(config.dataLogDir), new File(config.dataDir));
            zkServer.setTxnLogFactory(ftxn);
  
            // tickTime 
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
  
            // ServerCnxnFactory
            cnxnFactory = ServerCnxnFactory.createFactory();
  
            // ServerCnxnFactory
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
  
            // ServerCnxnFactory   。。
            cnxnFactory.startup(zkServer);
            cnxnFactory.join();
            if (zkServer.isRunning()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        }
    }
 
    /**
     * Shutdown the serving instance
     */
    protected void shutdown() {
        cnxnFactory.shutdown();
    }
}

1.2 초기화


초기화 절차는 다음과 같다.
(1) 서버 통계 ServerStats를 생성합니다.
ServerStats는 ZooKeeper 서버가 실행될 때 가장 기본적인 실행 시 통신을 포함하는 통계기입니다.ZooKeeper 서버의 기본 통계는 다음과 같습니다.
(2) ZooKeeper 데이터 관리자 FileTxnSnapLog를 생성합니다.
FileTxnSnapLog는 ZooKeeper 상부 서비스와 하부 데이터 저장소 간의 연결층으로 사무 로그 파일과 스냅샷 파일을 포함한 일련의 데이터 파일을 조작하는 인터페이스를 제공합니다.ZooKeeper는 zoo를 기준으로 합니다.cfg 파일에서 해석된 스냅샷 데이터 디렉터리 dataDIr와 사무 로그 디렉터리 dataLodDir를 사용하여 FIleTxnSnapLog를 만듭니다.
(3) 서버 tickTime 및 세션 시간 초과 제한을 설정합니다.
(4) ServerCnxnFactory를 생성합니다.
3.4.0 이후 사용자는 시스템 속성 zookeeper를 설정할 수 있습니다.server Cnxn Factory는 ZooKeeper가 자체적으로 실현하는 NIO를 사용할 것인지 아니면 Netty 프레임워크를 사용하여 ZooKeeper 서버 네트워크로 공장을 연결할 것인지를 지정합니다.
public abstract class ServerCnxnFactory {
 
    public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
    public abstract int getLocalPort();
    public abstract Iterable getConnections();
    public abstract void closeSession(long sessionId);
    public abstract void configure(InetSocketAddress addr,
                                   int maxClientCnxns) throws IOException;
    protected SaslServerCallbackHandler saslServerCallbackHandler;
    public Login login;
    /** Maximum number of connections allowed from particular host (ip) */
    public abstract int getMaxClientCnxnsPerHost();
    /** Maximum number of connections allowed from particular host (ip) */
    public abstract void setMaxClientCnxnsPerHost(int max);
 
    public abstract void startup(ZooKeeperServer zkServer)
        throws IOException, InterruptedException;
 
    public abstract void join() throws InterruptedException;
    public abstract void shutdown();
    public abstract void start();
 
    protected ZooKeeperServer zkServer;
    
    // ZooKeeper 。
    final public void setZooKeeperServer(ZooKeeperServer zk) {
        this.zkServer = zk;
        if (zk != null) {
            zk.setServerCnxnFactory(this);
        }
    }
     
    // ServerCnxnFactory。
    static public ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
        }
    }
}
public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
 
    ...  ...
    Thread thread;
    @Override
    // ServerCnxnFactory
    //ZooKeeper Thread, ServerCnxnFactory , NIO 
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        configureSaslLogin();
 
        thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
        thread.setDaemon(true);
        maxClientCnxns = maxcc;
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port " + addr);
        ss.socket().bind(addr);
        ss.configureBlocking(false);
        ss.register(selector, SelectionKey.OP_ACCEPT);
    }
 
   
    @Override
    // ServerCnxnFactory 
    public void start() {
        // ensure thread is started once and only once
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
 
    @Override
    public void startup(ZooKeeperServer zks) throws IOException,
            InterruptedException {
        start();  // ServerCnxnFactory 
        zks.startdata(); // 。
        zks.startup();
        setZooKeeperServer(zks);
    }
 
    //   run 
    public void run() {
        while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                ArrayList selectedList = new ArrayList(
                        selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k
                                .channel()).accept();
                        InetAddress ia = sc.socket().getInetAddress();
                        int cnxncount = getClientCnxnCount(ia);
                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                            LOG.warn("Too many connections from " + ia
                                     + " - max is " + maxClientCnxns );
                            sc.close();
                        } else {
                            LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
                        }
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                      + k.readyOps());
                        }
                    }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
        }
        closeAll();
        LOG.info("NIOServerCnxn factory exited run method");
    }
}

(5) ServerCnxnFactory를 초기화합니다.
ZooKeeper는 먼저 Thread를 전체 ServerCnxnFactory의 주 스레드로 초기화한 다음 NIO 서버를 초기화합니다.
(6) ServerCnxnFactory 마스터 스레드를 시작합니다.
시작 (5) 에서 초기화된 주 루틴 ServerCnxnFactory의 주 논리 (run 방법) 입니다.주의해야 할 것은 ZooKeeper의 NIO 서버가 외부에 개방되어 있고 클라이언트가 ZooKeeper의 클라이언트 서비스 포트 2181에 접근할 수 있지만 이때 ZooKeeper 서버는 클라이언트의 요청을 정상적으로 처리할 수 없습니다.
// 。
public void startdata()
throws IOException, InterruptedException {
    //check to see if zkDb is not null
    if (zkDb == null) {
        zkDb = new ZKDatabase(this.txnLogFactory);
    } 
    if (!zkDb.isInitialized()) {
        loadData();
    }
}
  
  
public void startup() {       
    if (sessionTracker == null) {
        createSessionTracker(); // 
    }
    startSessionTracker();   // 
    setupRequestProcessors();  // ZooKeeper 
 
    registerJMX();  // JMX 。
 
    synchronized (this) {
        running = true;
        notifyAll();
    }
}
  
// 
protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, 1);
}
// 
protected void startSessionTracker() {
    ((SessionTrackerImpl)sessionTracker).start();
}
  
// ZooKeeper 
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

(7) 로컬 데이터를 복구합니다.
ZooKeeper를 시작할 때마다 로컬 스냅샷 파일과 트랜잭션 로그 파일에서 데이터를 복구해야 합니다.
(8) 세션 관리를 생성하고 시작합니다.
ZooKeeper 시작 단계에서 세션 관리자 SessionTracker가 생성됩니다.SessionTracker는 주로 ZooKeeper 서버의 세션 관리를 담당합니다.SessionTracker를 만들 때 expirationInterval, next Expiration Time, Session With Timeout을 초기화하고 초기화된 sessionId를 계산합니다.
(9) ZooKeeper의 요청 처리 체인을 초기화합니다.
ZooKeeper의 요청 처리 방식은 전형적인 책임 체인 모델의 실현이다. ZooKeeper 서버에서 여러 개의 요청 프로세서가 차례대로 클라이언트 요청을 처리한다.서버가 시작될 때 이 요청 프로세서를 연결시켜 요청 처리 체인을 형성합니다.독립 실행형 서버의 요청 처리 체인은 PrepRequestProcessor,syncRequestProcessor, FinalRequestProcessor 단일 요청 프로세서를 포함합니다. 아래와 같습니다.
(10) JMX 서비스를 등록합니다.
ZooKeeper는 서버에서 실행되는 일부 정보를 JML 방식으로 외부에 노출합니다.
(11) ZooKeeper 서버 인스턴스를 등록합니다.
(6) 에서 ZooKeeper는 ServerCnxnFactory 메인 스레드를 시작했지만, 이때 ZooKeeper는 클라이언트 요청을 처리할 수 없습니다. 왜냐하면 네트워크 층이 ZooKeeper 서버에 접근할 수 없기 때문입니다.다음 단계의 초기화를 거친 후에 ZooKeeper 서버의 실례가 초기화되었습니다. Server Cnxn Factory에 등록하면 됩니다. 그 후에 ZooKeeper는 대외적으로 정상적인 서비스를 제공할 수 있습니다.
Paxos에서 ZooKeeper까지의 분산 일관성 원리와 실천 참조

좋은 웹페이지 즐겨찾기