zookeeper 분포 식 공유 잠 금 실현

본문 전재:http://blog.csdn.net/desilting/article/details/41280869
zookeeper 를 이용 한 EPHEMERALSEQUENTIAL 유형 노드 와 watcher 메커니즘 은 분포 식 자 물 쇠 를 간단하게 실현 한다.
주요 사상:
1. 10 개의 스 레 드 를 열 고 disLocks 노드 에서 각각 sub 라 는 EPHEMERAL 을 만 듭 니 다.필수 노드;
2. disLocks 노드 아래 의 모든 하위 노드 를 가 져 와 정렬 하고 자신의 노드 번호 가 가장 적 으 면 자 물 쇠 를 가 져 옵 니 다.
3. 그렇지 않 으 면 watch 가 자신의 앞 에 있 는 노드 를 감청 하고 삭 제 를 감청 한 후에 두 번 째 단계 에 들 어 갑 니 다. (순 서 를 재 검 측 하 는 것 은 감청 노드 의 연결 이 효력 을 잃 고 노드 삭제 상황 을 방지 하 는 것 입 니 다)
4. 자신의 sub 노드 를 삭제 하고 연결 해제;우 리 는 가장 작은 노드 가 집행 권 을 가진다 고 생각 합 니 다. 하 나 는 자물쇠 zk 가 분포 식 공유 자 물 쇠 를 실현 하 는 실현 방향 을 얻 었 습 니 다. 1: LogProcessBolt 1 병행 도 는 2 이 고 정시 작업 을 수행 할 때 두 스 레 드 가 동시에 입고 되 어야 합 니 다. 그러면 문제 가 존재 할 수 있 습 니 다.    그래서 이 코드 내용 이 실 행 될 때마다 / locks 이 노드 아래 에 임시 질서 있 는 노드 를 등록 하고 두 스 레 드 는 노드 를 등록 합 니 다.    예 를 들 어 스 레 드 1 에 등 록 된 임시 질서 있 는 노드 는 test 이다.lock_0000000002   스 레 드 2 등 록 된 임시 질서 있 는 노드 는: testlock_00000001 2: 모든 스 레 드 가 노드 를 등록 한 후에 자 물 쇠 를 가 져 오 려 고 시도 해 야 합 니 다. 이때 어느 노드 가 가장 작고 어느 스 레 드 가 자 물 쇠 를 가 져 오 며 아래 코드 를 실행 해 야 합 니 다.                이 예 에서 스 레 드 2 의 노드 가 가장 작 기 때문에 이 노드 는 먼저 자 물 쇠 를 가 져 온 다음 에 코드 를 실행 합 니 다. 코드 가 실 행 된 후에 자 물 쇠 를 풀 것 입 니 다. 사실은 자신의 이 노드 를 삭제 하 는 것 입 니 다.                한편, 스 레 드 1 에서 얻 은 노드 가 가장 작은 것 이 아니 기 때문에 그 보다 작은 노드 를 얻 은 다음 에 어느 노드 의 변화 상황 을 감시 해 야 한다. 만약 에 그 노드 가 효력 을 잃 은 것 을 발견 하면           그리고 / locks 이 노드 아래 의 모든 임시 질서 있 는 하위 노드 를 다시 가 져 옵 니 다. 자신 이 가장 작은 것 인지, 가장 작은 것 이 라면 자 물 쇠 를 성공 적 으로 가 져 와 다음 코드 를 실행 합 니 다.          마지막 으로 자 물 쇠 를 풀 어 줍 니 다.zookeeper 의 4 가지 노드 유형:
    public enum CreateMode {  
         
        /** 
         *     :     ,     ,             ; 
         */  
        PERSISTENT (0, false, false),  
      
        /** 
        *       :           ,        ,zookeeper                     ,       ;  
        */  
        PERSISTENT_SEQUENTIAL (2, false, true),  
      
        /** 
         *      :             ,         ,               ,      :org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException; 
         */  
        EPHEMERAL (1, true, false),  
      
        /** 
         *       :           ,        ,zookeeper                     ,       ;  
         */  
        EPHEMERAL_SEQUENTIAL (3, true, true);  
        private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);  
        private boolean ephemeral;  
        private boolean sequential;  
        private int flag;  
        CreateMode(int flag, boolean ephemeral, boolean sequential) {  
            this.flag = flag;  
            this.ephemeral = ephemeral;  
            this.sequential = sequential;  
        }  
        public boolean isEphemeral() {  
            return ephemeral;  
        }  
        public boolean isSequential() {  
            return sequential;  
        }  
        public int toFlag() {  
            return flag;  
        }  
        static public CreateMode fromFlag(int flag) throws KeeperException {  
            switch(flag) {  
            case 0: return CreateMode.PERSISTENT;  
            case 1: return CreateMode.EPHEMERAL;  
            case 2: return CreateMode.PERSISTENT_SEQUENTIAL;  
            case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;  
            default:  
                LOG.error("Received an invalid flag value to convert to a CreateMode");  
                throw new KeeperException.BadArgumentsException();  
            }  
        }  
    }  

테스트 실례:
    package zookeeper;  
    import org.slf4j.Logger;  
    import org.slf4j.LoggerFactory;  
    import org.apache.zookeeper.*;  
    import org.apache.zookeeper.data.Stat;  
    import java.util.List;  
    import java.io.IOException;  
    import java.util.Collections;  
    import java.util.concurrent.CountDownLatch;  
      
    public class DistributedLock implements Watcher{  
        private int threadId;  
        private ZooKeeper zk = null;  
        private String selfPath;  
        private String waitPath;  
        private String LOG_PREFIX_OF_THREAD;  
        private static final int SESSION_TIMEOUT = 10000;  
        private static final String GROUP_PATH = "/disLocks";  
        private static final String SUB_PATH = "/disLocks/sub";  
        private static final String CONNECTION_STRING = "192.168.*.*:2181";  
          
        private static final int THREAD_NUM = 10;   
        //    zk  ;  
        private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
        //          ;  
        private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);  
        private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class);  
        public DistributedLock(int id) {  
            this.threadId = id;  
            LOG_PREFIX_OF_THREAD = "【 "+threadId+"   】";  
        }  
        public static void main(String[] args) {  
            for(int i=0; i < THREAD_NUM; i++){  
                final int threadId = i+1;  
                new Thread(){  
                    @Override  
                    public void run() {  
                        try{  
                            DistributedLock dc = new DistributedLock(threadId);  
                            dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);  
                            //GROUP_PATH     ,         ;  
                            synchronized (threadSemaphore){  
                                dc.createPath(GROUP_PATH, "      " + threadId + "  ", true);  
                            }  
                            dc.getLock();  
                        } catch (Exception e){  
                            LOG.error("【 "+threadId+"   】      :");  
                            e.printStackTrace();  
                        }  
                    }  
                }.start();  
            }  
            try {  
                threadSemaphore.await();  
                LOG.info("        !");  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
        /** 
         *     
         * @return 
         */  
        private void getLock() throws KeeperException, InterruptedException {  
            selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
            LOG.info(LOG_PREFIX_OF_THREAD+"     :"+selfPath);  
            if(checkMinPath()){  
                getLockSuccess();  
            }  
        }  
        /** 
         *      
         * @param path   path 
         * @param data        
         * @return 
         */  
        public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {  
            if(zk.exists(path, needWatch)==null){  
                LOG.info( LOG_PREFIX_OF_THREAD + "      , Path: "  
                        + this.zk.create( path,  
                        data.getBytes(),  
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,  
                        CreateMode.PERSISTENT )  
                        + ", content: " + data );  
            }  
            return true;  
        }  
        /** 
         *   ZK   
         * @param connectString  ZK        
         * @param sessionTimeout Session     
         */  
        public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {  
                zk = new ZooKeeper( connectString, sessionTimeout, this);  
                connectedSemaphore.await();  
        }  
        /** 
         *       
        */  
        public void getLockSuccess() throws KeeperException, InterruptedException {  
            if(zk.exists(this.selfPath,false) == null){  
                LOG.error(LOG_PREFIX_OF_THREAD+"       ...");  
                return;  
            }  
            LOG.info(LOG_PREFIX_OF_THREAD + "     ,    !");  
            Thread.sleep(2000);  
            LOG.info(LOG_PREFIX_OF_THREAD + "     :"+selfPath);  
            zk.delete(this.selfPath, -1);  
            releaseConnection();  
            threadSemaphore.countDown();  
        }  
        /** 
         *   ZK   
         */  
        public void releaseConnection() {  
            if ( this.zk !=null ) {  
                try {  
                    this.zk.close();  
                } catch ( InterruptedException e ) {}  
            }  
            LOG.info(LOG_PREFIX_OF_THREAD + "    ");  
        }  
        /** 
         *              
         * @return 
         */  
        public boolean checkMinPath() throws KeeperException, InterruptedException {  
             List subNodes = zk.getChildren(GROUP_PATH, false);  
             Collections.sort(subNodes);  
             int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));  
             switch (index){  
                 case -1:{  
                     LOG.error(LOG_PREFIX_OF_THREAD+"       ..."+selfPath);  
                     return false;  
                 }  
                 case 0:{  
                     LOG.info(LOG_PREFIX_OF_THREAD+"    ,      "+selfPath);  
                     return true;  
                 }  
                 default:{  
                     this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);  
                     LOG.info(LOG_PREFIX_OF_THREAD+"      ,      "+waitPath);  
                     try{  
                         zk.getData(waitPath, true, new Stat());  
                         return false;  
                     }catch(KeeperException e){  
                         if(zk.exists(waitPath,false) == null){  
                             LOG.info(LOG_PREFIX_OF_THREAD+"    ,      "+waitPath+"   ,       ?");  
                             return checkMinPath();  
                         }else{  
                             throw e;  
                         }  
                     }  
                 }  
                       
             }  
           
        }  
        @Override  
        public void process(WatchedEvent event) {  
            if(event == null){  
                return;  
            }  
            Event.KeeperState keeperState = event.getState();  
            Event.EventType eventType = event.getType();  
            if ( Event.KeeperState.SyncConnected == keeperState) {  
                if ( Event.EventType.None == eventType ) {  
                    LOG.info( LOG_PREFIX_OF_THREAD + "     ZK   " );  
                    connectedSemaphore.countDown();  
                }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {  
                    LOG.info(LOG_PREFIX_OF_THREAD + "    ,         ,         ?");  
                    try {  
                        if(checkMinPath()){  
                            getLockSuccess();  
                        }  
                    } catch (KeeperException e) {  
                        e.printStackTrace();  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }else if ( Event.KeeperState.Disconnected == keeperState ) {  
                LOG.info( LOG_PREFIX_OF_THREAD + " ZK       " );  
            } else if ( Event.KeeperState.AuthFailed == keeperState ) {  
                LOG.info( LOG_PREFIX_OF_THREAD + "      " );  
            } else if ( Event.KeeperState.Expired == keeperState ) {  
                LOG.info( LOG_PREFIX_OF_THREAD + "    " );  
            }  
        }  
    }  

좋은 웹페이지 즐겨찾기