zookeeper 분포 식 공유 잠 금 실현
11872 단어 빅 데이터zookeeper 빅 데이터
zookeeper 를 이용 한 EPHEMERALSEQUENTIAL 유형 노드 와 watcher 메커니즘 은 분포 식 자 물 쇠 를 간단하게 실현 한다.
주요 사상:
1. 10 개의 스 레 드 를 열 고 disLocks 노드 에서 각각 sub 라 는 EPHEMERAL 을 만 듭 니 다.필수 노드;
2. disLocks 노드 아래 의 모든 하위 노드 를 가 져 와 정렬 하고 자신의 노드 번호 가 가장 적 으 면 자 물 쇠 를 가 져 옵 니 다.
3. 그렇지 않 으 면 watch 가 자신의 앞 에 있 는 노드 를 감청 하고 삭 제 를 감청 한 후에 두 번 째 단계 에 들 어 갑 니 다. (순 서 를 재 검 측 하 는 것 은 감청 노드 의 연결 이 효력 을 잃 고 노드 삭제 상황 을 방지 하 는 것 입 니 다)
4. 자신의 sub 노드 를 삭제 하고 연결 해제;우 리 는 가장 작은 노드 가 집행 권 을 가진다 고 생각 합 니 다. 하 나 는 자물쇠 zk 가 분포 식 공유 자 물 쇠 를 실현 하 는 실현 방향 을 얻 었 습 니 다. 1: LogProcessBolt 1 병행 도 는 2 이 고 정시 작업 을 수행 할 때 두 스 레 드 가 동시에 입고 되 어야 합 니 다. 그러면 문제 가 존재 할 수 있 습 니 다. 그래서 이 코드 내용 이 실 행 될 때마다 / locks 이 노드 아래 에 임시 질서 있 는 노드 를 등록 하고 두 스 레 드 는 노드 를 등록 합 니 다. 예 를 들 어 스 레 드 1 에 등 록 된 임시 질서 있 는 노드 는 test 이다.lock_0000000002 스 레 드 2 등 록 된 임시 질서 있 는 노드 는: testlock_00000001 2: 모든 스 레 드 가 노드 를 등록 한 후에 자 물 쇠 를 가 져 오 려 고 시도 해 야 합 니 다. 이때 어느 노드 가 가장 작고 어느 스 레 드 가 자 물 쇠 를 가 져 오 며 아래 코드 를 실행 해 야 합 니 다. 이 예 에서 스 레 드 2 의 노드 가 가장 작 기 때문에 이 노드 는 먼저 자 물 쇠 를 가 져 온 다음 에 코드 를 실행 합 니 다. 코드 가 실 행 된 후에 자 물 쇠 를 풀 것 입 니 다. 사실은 자신의 이 노드 를 삭제 하 는 것 입 니 다. 한편, 스 레 드 1 에서 얻 은 노드 가 가장 작은 것 이 아니 기 때문에 그 보다 작은 노드 를 얻 은 다음 에 어느 노드 의 변화 상황 을 감시 해 야 한다. 만약 에 그 노드 가 효력 을 잃 은 것 을 발견 하면 그리고 / locks 이 노드 아래 의 모든 임시 질서 있 는 하위 노드 를 다시 가 져 옵 니 다. 자신 이 가장 작은 것 인지, 가장 작은 것 이 라면 자 물 쇠 를 성공 적 으로 가 져 와 다음 코드 를 실행 합 니 다. 마지막 으로 자 물 쇠 를 풀 어 줍 니 다.zookeeper 의 4 가지 노드 유형:
public enum CreateMode {
/**
* : , , ;
*/
PERSISTENT (0, false, false),
/**
* : , ,zookeeper , ;
*/
PERSISTENT_SEQUENTIAL (2, false, true),
/**
* : , , , :org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException;
*/
EPHEMERAL (1, true, false),
/**
* : , ,zookeeper , ;
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class);
private boolean ephemeral;
private boolean sequential;
private int flag;
CreateMode(int flag, boolean ephemeral, boolean sequential) {
this.flag = flag;
this.ephemeral = ephemeral;
this.sequential = sequential;
}
public boolean isEphemeral() {
return ephemeral;
}
public boolean isSequential() {
return sequential;
}
public int toFlag() {
return flag;
}
static public CreateMode fromFlag(int flag) throws KeeperException {
switch(flag) {
case 0: return CreateMode.PERSISTENT;
case 1: return CreateMode.EPHEMERAL;
case 2: return CreateMode.PERSISTENT_SEQUENTIAL;
case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ;
default:
LOG.error("Received an invalid flag value to convert to a CreateMode");
throw new KeeperException.BadArgumentsException();
}
}
}
테스트 실례:
package zookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
public class DistributedLock implements Watcher{
private int threadId;
private ZooKeeper zk = null;
private String selfPath;
private String waitPath;
private String LOG_PREFIX_OF_THREAD;
private static final int SESSION_TIMEOUT = 10000;
private static final String GROUP_PATH = "/disLocks";
private static final String SUB_PATH = "/disLocks/sub";
private static final String CONNECTION_STRING = "192.168.*.*:2181";
private static final int THREAD_NUM = 10;
// zk ;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
// ;
private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class);
public DistributedLock(int id) {
this.threadId = id;
LOG_PREFIX_OF_THREAD = "【 "+threadId+" 】";
}
public static void main(String[] args) {
for(int i=0; i < THREAD_NUM; i++){
final int threadId = i+1;
new Thread(){
@Override
public void run() {
try{
DistributedLock dc = new DistributedLock(threadId);
dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
//GROUP_PATH , ;
synchronized (threadSemaphore){
dc.createPath(GROUP_PATH, " " + threadId + " ", true);
}
dc.getLock();
} catch (Exception e){
LOG.error("【 "+threadId+" 】 :");
e.printStackTrace();
}
}
}.start();
}
try {
threadSemaphore.await();
LOG.info(" !");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
*
* @return
*/
private void getLock() throws KeeperException, InterruptedException {
selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOG.info(LOG_PREFIX_OF_THREAD+" :"+selfPath);
if(checkMinPath()){
getLockSuccess();
}
}
/**
*
* @param path path
* @param data
* @return
*/
public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {
if(zk.exists(path, needWatch)==null){
LOG.info( LOG_PREFIX_OF_THREAD + " , Path: "
+ this.zk.create( path,
data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT )
+ ", content: " + data );
}
return true;
}
/**
* ZK
* @param connectString ZK
* @param sessionTimeout Session
*/
public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {
zk = new ZooKeeper( connectString, sessionTimeout, this);
connectedSemaphore.await();
}
/**
*
*/
public void getLockSuccess() throws KeeperException, InterruptedException {
if(zk.exists(this.selfPath,false) == null){
LOG.error(LOG_PREFIX_OF_THREAD+" ...");
return;
}
LOG.info(LOG_PREFIX_OF_THREAD + " , !");
Thread.sleep(2000);
LOG.info(LOG_PREFIX_OF_THREAD + " :"+selfPath);
zk.delete(this.selfPath, -1);
releaseConnection();
threadSemaphore.countDown();
}
/**
* ZK
*/
public void releaseConnection() {
if ( this.zk !=null ) {
try {
this.zk.close();
} catch ( InterruptedException e ) {}
}
LOG.info(LOG_PREFIX_OF_THREAD + " ");
}
/**
*
* @return
*/
public boolean checkMinPath() throws KeeperException, InterruptedException {
List subNodes = zk.getChildren(GROUP_PATH, false);
Collections.sort(subNodes);
int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));
switch (index){
case -1:{
LOG.error(LOG_PREFIX_OF_THREAD+" ..."+selfPath);
return false;
}
case 0:{
LOG.info(LOG_PREFIX_OF_THREAD+" , "+selfPath);
return true;
}
default:{
this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);
LOG.info(LOG_PREFIX_OF_THREAD+" , "+waitPath);
try{
zk.getData(waitPath, true, new Stat());
return false;
}catch(KeeperException e){
if(zk.exists(waitPath,false) == null){
LOG.info(LOG_PREFIX_OF_THREAD+" , "+waitPath+" , ?");
return checkMinPath();
}else{
throw e;
}
}
}
}
}
@Override
public void process(WatchedEvent event) {
if(event == null){
return;
}
Event.KeeperState keeperState = event.getState();
Event.EventType eventType = event.getType();
if ( Event.KeeperState.SyncConnected == keeperState) {
if ( Event.EventType.None == eventType ) {
LOG.info( LOG_PREFIX_OF_THREAD + " ZK " );
connectedSemaphore.countDown();
}else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
LOG.info(LOG_PREFIX_OF_THREAD + " , , ?");
try {
if(checkMinPath()){
getLockSuccess();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else if ( Event.KeeperState.Disconnected == keeperState ) {
LOG.info( LOG_PREFIX_OF_THREAD + " ZK " );
} else if ( Event.KeeperState.AuthFailed == keeperState ) {
LOG.info( LOG_PREFIX_OF_THREAD + " " );
} else if ( Event.KeeperState.Expired == keeperState ) {
LOG.info( LOG_PREFIX_OF_THREAD + " " );
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.