자바 병렬 시리즈 의 Semaphore 소스 코드 분석

12280 단어 자바Semaphore
Semaphore(신 호 량)는 JUC 패키지 에서 자주 사용 되 는 클래스 로 AQS 공유 모드 의 응용 프로그램 으로 여러 라인 이 동시에 공유 자원 을 조작 할 수 있 고 병발 수 를 효과적으로 제어 할 수 있 으 며 이 를 이용 하여 데이터 제 어 를 잘 실현 할 수 있다.Semaphore 는 이 허가증 을 버스 차표 로 볼 수 있 는 허가증 개념 을 제 공 했 습 니 다.차 표를 성공 적 으로 얻 은 사람 만 탈 수 있 고 차 표 는 일정한 수량 이 있 기 때문에 제한 없 이 발 급 될 수 없습니다.그러면 버스 가 과 적 될 수 있 습 니 다.그래서 차표 가 다 나 왔 을 때 다른 사람들 은 다음 차 를 기다 릴 수 밖 에 없 었 다.중간 에 내 리 는 사람 이 있 으 면 그의 자 리 는 비어 있 기 때문에 다른 사람 이 타 려 고 하면 차 표를 얻 을 수 있다.Semaphore 를 이용 하여 각종 풀 을 실현 할 수 있 습 니 다.우 리 는 이 편 말미 에 간단 하고 쉬 운 데이터베이스 연결 풀 을 손 으로 쓸 것 입 니 다.우선 Semaphore 의 구조 기 를 살 펴 보 겠 습 니 다.

//   1
public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}

//   2
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore 는 두 개의 파 라 메 트릭 구조 기 를 제공 하 였 으 며,파 라 메 트릭 구조 기 를 제공 하지 않 았 습 니 다.이 두 구조 기 는 모두 초기 허가증 수량 을 입력 해 야 한다.구조 기 1 로 구 성 된 신 호 량 은 허가증 을 얻 을 때 불공 정 한 방식 으로 얻 을 수 있 고 구조 기 2 를 사용 하면 매개 변 수 를 통 해 허가증 을 얻 을 수 있 는 방식(공평 or 불공 정)을 지정 할 수 있다.Semaphore 는 주로 대외 적 으로 두 가지 API 를 제공 합 니 다.허가증 을 얻 고 허가증 을 방출 합 니 다.기본 적 인 것 은 허가증 을 얻 고 방출 하 는 것 입 니 다.또한 매개 변 수 를 입력 하여 여러 개의 허가증 을 동시에 얻 고 방출 할 수 있 습 니 다.이 편 에서 우 리 는 매번 허가증 을 얻 고 방출 하 는 상황 만 말한다.
1.허가증 획득

//       (    )
public void acquire() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

//       (     )
public void acquireUninterruptibly() {
  sync.acquireShared(1);
}

//       (     )
public boolean tryAcquire() {
  return sync.nonfairTryAcquireShared(1) >= 0;
}

//       (    )
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
  return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
위의 API 는 Semaphore 가 제공 하 는 기본 라이선스 획득 작업 입 니 다.매번 하나의 허가증 만 받 는 것 도 현실 생활 에서 흔히 볼 수 있 는 상황 이다.직접 가 져 오 는 것 외 에 가 져 오 려 는 시도 도 제공 합 니 다.직접 가 져 오 는 작업 은 실패 한 후에 스 레 드 를 막 을 수 있 으 며 가 져 오 려 고 시도 하면 그렇지 않 습 니 다.또 주의해 야 할 것 은 try Acquire 방법 은 불공 정 한 방식 으로 시도 한 것 이다.평소에 우리 가 비교적 자주 사용 하 는 것 은 acquire 방법 으로 허가증 을 얻 는 것 이다.그것 이 어떻게 얻 었 는 지 살 펴 보 자.acquire 방법 에서 바로 sync.acquire Shared Interruptibly(1)를 호출 하 는 것 을 볼 수 있 습 니 다.이 방법 은 AQS 안의 방법 입 니 다.우 리 는 AQS 소스 코드 시 리 즈 를 이야기 할 때 말 한 적 이 있 습 니 다.지금 다시 한 번 살 펴 보 겠 습 니 다.

//         (    )
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  //          ,         
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  //1.      
  if (tryAcquireShared(arg) < 0) {
    //2.             
    doAcquireSharedInterruptibly(arg);
  }
}
acquireShared Interruptibly 방법 은 먼저 try AcquireShared 방법 을 사용 하여 얻 으 려 고 시도 하 는 것 입 니 다.try AcquireShared 는 AQS 에서 추상 적 인 방법 입 니 다.FairSync 와 NonfairSync 라 는 두 파생 류 는 이 방법의 논 리 를 실현 합 니 다.FairSync 는 공정 하 게 가 져 오 는 논 리 를 실현 하고,NonfairSync 는 불공 정 하 게 가 져 오 는 논 리 를 실현 합 니 다.

abstract static class Sync extends AbstractQueuedSynchronizer {
  //         
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      //       
      int available = getState();
      //       
      int remaining = available - acquires;
      //1.  remaining  0     remaining
      //2.  remaining  0           remaining
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}

//      
static final class NonfairSync extends Sync {
  private static final long serialVersionUID = -2694183684443567898L;

  NonfairSync(int permits) {
    super(permits);
  }

  //       
  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
  }
}

//     
static final class FairSync extends Sync {
  private static final long serialVersionUID = 2014338818796000944L;

  FairSync(int permits) {
    super(permits);
  }

  //       
  protected int tryAcquireShared(int acquires) {
    for (;;) {
      //              
      if (hasQueuedPredecessors()) {
        //          -1,        
        return -1;
      }
      //       
      int available = getState();
      //       
      int remaining = available - acquires;
      //1.  remaining  0     remaining
      //2.  remaining  0           remaining
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}
여기 서 주의해 야 할 것 은 NonfairSync 의 try AcquireShared 방법 은 nonfair Try AcquireShared 방법 을 직접 호출 하 는 것 입 니 다.이 방법 은 부모 클래스 Sync 에 있 습 니 다.불공 정 하 게 자 물 쇠 를 가 져 오 는 논 리 는 현재 동기 화 상태(동기 화 상 태 는 허가증 개수 표시)를 먼저 꺼 내 현재 동기 화 상 태 를 참 여 된 매개 변 수 를 뺀 것 이다.결과 가 0 보다 적 지 않 으 면 사용 가능 한 허가증 이 있다 는 것 을 증명 한다 면 CAS 작업 으로 동기 화 상 태 를 업데이트 하 는 값 을 직접 사용 하고 마지막 으로 결과 가 0 보다 적 든 작 든 이 결과 값 을 되 돌려 준다.여기 서 우 리 는 try AcquireShared 방법 이 값 을 되 돌려 주 는 의 미 를 알 아야 합 니 다.마이너스 로 돌아 가 는 것 은 얻 는 데 실 패 를 표시 합 니 다.0 은 현재 스 레 드 가 성공 을 거 두 었 지만 후속 스 레 드 는 더 이상 얻 을 수 없습니다.정 수 는 현재 스 레 드 가 성공 을 거 두 었 고 후속 스 레 드 도 얻 을 수 있 음 을 나타 냅 니 다.acquireShared Interruptibly 방법의 코드 를 다시 보 겠 습 니 다.

//         (    )
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  //          ,         
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  //1.      
  //  :      
  //  :          ,             
  //  :          ,               
  if (tryAcquireShared(arg) < 0) {
    //2.             
    doAcquireSharedInterruptibly(arg);
  }
}
되 돌아 오 는 remaining 이 0 보다 적 으 면 가 져 오 는 데 실 패 했 음 을 의미 합 니 다.따라서 try Acquire Shared(arg)<0 은 true 입 니 다.그래서 다음 에 doAcquire Shared Interruptibly 방법 을 호출 합 니 다.이 방법 은 우리 가 AQS 를 이야기 할 때 말 했 듯 이 현재 스 레 드 를 노드 로 포장 하여 동기 화 대기 열 끝 에 넣 고 스 레 드 를 걸 수 있 습 니 다.이것 도 remaining 이 0 보다 적 을 때 라인 이 줄 을 서서 막 히 는 원인 이다.돌아 오 는 remaining>=0 은 현재 스 레 드 가 성공 한 것 을 의미 하기 때문에 try AcquireShared(arg)<0 은 flase 이기 때문에 doAcquireShared Interruptibly 방법 으로 현재 스 레 드 를 막 지 않 습 니 다.이상 은 불공 정 하 게 얻 은 전체 논리 입 니 다.공정 하 게 얻 을 때 는 그 전에 hasQueued Predecessors 방법 으로 동기 화 대기 열 에 누가 줄 을 서 있 는 지 판단 합 니 다.있 으 면 return-1 로 얻 는 데 실 패 했 음 을 표시 합 니 다.그렇지 않 으 면 다음 과 같은 절 차 를 계속 수행 합 니 다.
2.라이선스 발급

//       
public void release() {
  sync.releaseShared(1);
}
release 방법 을 호출 하 는 것 은 허가증 을 발급 하 는 것 입 니 다.조작 이 간단 합 니 다.AQS 의 release Shared 방법 을 호출 했 습 니 다.이 방법 을 살 펴 보 겠 습 니 다.

//      (    )
public final boolean releaseShared(int arg) {
  //1.      
  if (tryReleaseShared(arg)) {
    //2.             
    doReleaseShared();
    return true;
  }
  return false;
}
AQS 의 releaseShared 방법 은 먼저 try ReleaseShared 방법 으로 자 물 쇠 를 풀 려 고 시도 합 니 다.이 방법의 실현 논 리 는 하위 클래스 Sync 에 있 습 니 다.

abstract static class Sync extends AbstractQueuedSynchronizer {
  ...
  //      
  protected final boolean tryReleaseShared(int releases) {
    for (;;) {
      //        
      int current = getState();
      //              
      int next = current + releases;
      //                   
      if (next < current) {
        throw new Error("Maximum permit count exceeded");
      }
      // CAS          ,        true,       
      if (compareAndSetState(current, next)) {
        return true;
      }
    }
  }
  ...
}
try ReleaseShared 방법 에서 for 순환 으로 자전 하 는 것 을 볼 수 있 습 니 다.먼저 동기 화 상 태 를 가 져 오고 동기 화 상태 에 들 어 오 는 인 자 를 추가 한 다음 에 CAS 방식 으로 동기 화 상 태 를 업데이트 하고 업데이트 에 성공 하면 true 로 돌아 가 방법 을 뛰 어 내 립 니 다.그렇지 않 으 면 성공 할 때 까지 계속 순환 합 니 다.이것 이 바로 Semaphore 가 허가증 을 방출 하 는 절차 입 니 다.
3.연결 풀 을 쓰기
Semaphore 코드 는 복잡 하지 않 습 니 다.자주 사용 하 는 작업 은 허가증 을 얻 고 방출 하 는 것 입 니 다.이러한 작업 의 실현 논리 도 비교적 간단 하지만 이것 은 Semaphore 의 광범 위 한 응용 에 방해 가 되 지 않 습 니 다.다음은 Semaphore 를 이용 하여 간단 한 데이터베이스 연결 탱크 를 실현 하고 이 예 를 통 해 독자 들 이 Semaphore 의 운용 을 더욱 깊이 있 게 파악 하 기 를 바란다.

public class ConnectPool {
  
  //     
  private int size;
  //       
  private Connect[] connects;
  //      
  private boolean[] connectFlag;
  //       
  private volatile int available;
  //   
  private Semaphore semaphore;
  
  //   
  public ConnectPool(int size) { 
    this.size = size;
    this.available = size;
    semaphore = new Semaphore(size, true);
    connects = new Connect[size];
    connectFlag = new boolean[size];
    initConnects();
  }
  
  //     
  private void initConnects() {
    //            
    for(int i = 0; i < this.size; i++) {
      connects[i] = new Connect();
    }
  }
  
  //       
  private synchronized Connect getConnect(){ 
    for(int i = 0; i < connectFlag.length; i++) {
      //            
      if(!connectFlag[i]) {
        //         
        connectFlag[i] = true;
        //      1
        available--;
        System.out.println("【"+Thread.currentThread().getName()+"】             :" + available);
        //      
        return connects[i];
      }
    }
    return null;
  }
  
  //      
  public Connect openConnect() throws InterruptedException {
    //     
    semaphore.acquire();
    //       
    return getConnect();
  }
  
  //      
  public synchronized void release(Connect connect) { 
    for(int i = 0; i < this.size; i++) {
      if(connect == connects[i]){
        //         
        connectFlag[i] = false;
        //      1
        available++;
        System.out.println("【"+Thread.currentThread().getName()+"】             :" + available);
        //     
        semaphore.release();
      }
    }
  }
  
  //       
  public int available() {
    return available;
  }
  
}
테스트 코드:

public class TestThread extends Thread {
  
  private static ConnectPool pool = new ConnectPool(3);
  
  @Override
  public void run() {
    try {
      Connect connect = pool.openConnect();
      Thread.sleep(100); //    
      pool.release(connect);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  
  public static void main(String[] args) {
    for(int i = 0; i < 10; i++) {
      new TestThread().start();
    }
  }

}
테스트 결과:

데이터베이스 연결 에 대한 인용 을 저장 하기 위해 서 배열 을 사용 합 니 다.연결 풀 을 초기 화 할 때 initConnects 방법 으로 지정 한 수량의 데이터 베 이 스 를 만 들 고 인용 을 배열 에 저장 합 니 다.또한 같은 크기 의 배열 로 연결 이 사용 가능 한 지 기록 합 니 다.외부 스 레 드 가 연결 을 요청 할 때마다 먼저 semaphore.acquire()방법 으로 허가증 을 받 은 다음 연결 상 태 를 사용 중 으로 설정 하고 마지막 으로 이 연결 의 참조 로 되 돌려 줍 니 다.허가증 의 수량 은 구조 할 때 들 어 오 는 매개 변수 에 의 해 결정 되 며,매번 semaphore.acquire()방법 을 호출 할 때마다 허가증 의 수량 이 1 로 줄 어 들 고,수량 이 0 으로 줄 어 들 때 연결 이 사용 할 수 없다 는 것 을 설명 합 니 다.이 때 다른 스 레 드 를 다시 가 져 오 면 막 힙 니 다.스 레 드 가 연결 을 풀 때마다 semaphore.release()를 호출 하여 허가증 을 방출 합 니 다.이때 허가증 의 총량 이 증가 합 니 다.사용 가능 한 연결 수가 증가 한 것 을 의미 합 니 다.그러면 이전에 막 힌 스 레 드 가 깨 어 나 계속 연결 을 얻 을 것 입 니 다.이때 다시 가 져 오 면 연결 을 성공 적 으로 얻 을 수 있 습 니 다.테스트 예제 에서 3 개의 연결 풀 을 초기 화 했 습 니 다.테스트 결과 에서 볼 수 있 듯 이 스 레 드 가 연결 을 가 져 올 때마다 나머지 연결 수 는 1 로 줄 어 들 고 0 으로 줄 어 들 때 다른 스 레 드 는 더 이상 가 져 올 수 없습니다.이 때 는 하나의 스 레 드 가 연결 이 풀 릴 때 까지 기 다 려 야 계속 가 져 올 수 있 습 니 다.남 은 연결 수가 항상 0 에서 3 사이 에 변동 하 는 것 을 볼 수 있 는데 이것 은 우리 의 이번 테스트 가 성공 적 이라는 것 을 설명 한다.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기