Java Condition 학습 노트

Table of Contents
작용
인터페이스 정의
Condition Object 실현 류
  • AbstractQueuedSynchronizer
  • Node
  • ConditionObject


  • 역할.
    Condition 은 jdk 1.5 가 도입 한 모니터 대체 방법 입 니 다. 예 를 들 어 wait, notify, notify All 은 더욱 사용 하기 쉽 고 유연 한 동기 화 차단 방법 을 제공 합 니 다.Condition 번역 은 경쟁 조건 으로 잠시 번역 되 며, 다른 스 레 드 알림 이 깨 어 나 기 전에 스 레 드 를 막 는 데 사 용 됩 니 다.다 중 스 레 드 환경 에서 자원 경쟁 문제 가 발생 하면 다 중 스 레 드 가 같은 자원 을 조작 할 때 스 레 드 안전 문제 가 발생 하지 않도록 순서대로 실행 해 야 한다.스 레 드 가 경쟁 조건 을 사용 할 때 차단 효 과 는 wait () 방법 과 유사 하지만 본질 적 으로 정적 조건 은 자물쇠 이다.정적 조건 을 만족 시 키 기 를 기다 리 는 것 은 정적 조건 의 new Condition () 방법 으로 특수 한 자 물 쇠 를 초기 화 하 는 것 입 니 다.
    다음은 경쟁 조건 의 예 이다.
     class BoundedBuffer {
      final Lock lock = new ReentrantLock();
      final Condition notFull  = lock.newCondition();
      final Condition notEmpty = lock.newCondition();
    
      final Object[] items = new Object[100];
      int putptr, takeptr, count;
    
      public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
          while (count == items.length)
            notFull.await();
          items[putptr] = x;
          if (++putptr == items.length) putptr = 0;
          ++count;
          notEmpty.signal();
        } finally {
          lock.unlock();
        }
      }
    
      public Object take() throws InterruptedException {
        lock.lock();
        try {
          while (count == 0)
            notEmpty.await();
          Object x = items[takeptr];
          if (++takeptr == items.length) takeptr = 0;
          --count;
          notFull.signal();
          return x;
        } finally {
          lock.unlock();
        }
      }
    }
    

    위 는 put (하나의 요 소 를 저장 합 니 다) 와 take (하나의 요 소 를 가 져 갑 니 다) 를 포함 하고 경계 가 있 는 용기 의 예 입 니 다.용기 가 용량 상한 선 에 이 르 렀 다 면 put 방법 을 사용 하 는 스 레 드 가 막 힐 것 입 니 다.다른 스 레 드 가 요 소 를 성공 적 으로 가 져 갔 을 때 put 가 막 힌 스 레 드 를 호출 하면 깨 어 납 니 다.경쟁 조건 은 모니터 의 차단 과 깨 우기 보다 일부 기능 을 증가 시 켰 다. 예 를 들 어 깨 우 는 순서 와 통 지 를 지정 할 때 자 물 쇠 를 가지 지 않 아 도 된다.경쟁 조건 의 인 스 턴 스 도 일반적인 자바 대상 으로 Object. wait 방법 과 notify 방법 을 갖 추고 있 으 며 이 모니터 방법 을 단독으로 사용 할 수 있 습 니 다.그러나 이러한 모니터 방법 과 정적 조건 자체 의 디자인 방법 은 서로 독립 되 어 있 기 때문에 특수 한 디자인 을 실현 하지 않 는 한 정적 조건 의 모니터 방법 을 사용 하지 않 고 헷 갈 리 지 않도록 하 는 것 을 권장 합 니 다.플랫폼 자체 에 허위 각성 (Spurious Wakeup) 이 발생 할 수 있 으 므 로 대기 조건 을 설계 할 때 는 하나의 순환 체 에 넣 어 판단 해 야 합 니 다. if 조건 판단 문구 에 넣 었 을 때 허위 각성 이 발생 하면 조건 이 통과 되 지 않 고 프로그램 은 예 정 된 디자인 에 따라 가지 않 습 니 다. 경쟁 조건 의 대기 방법 (interruptible, non - interruptible, timed)각 플랫폼 에서 의 실현 과 성능 의 표현 이 일치 하지 않 을 수 있 습 니 다. 또한 일부 플랫폼 에 서 는 순서대로 깨 우 는 특성 을 제공 하기 어렵 습 니 다. 더욱이 스 레 드 가 중단 되 는 기능 도 모든 플랫폼 에서 스 레 드 를 즉시 중단 시 킬 수 있 는 것 이 아 닙 니 다. 일반적으로 경쟁 조건 의 실현 류 가 순서대로 깨 우 는 기능 이 필요 하지 않 고 스 레 드 를 즉시 종료 시 킬 필요 도 없습니다. 그러나오해 가 발생 하지 않도록 이러한 기능 의 정확 한 표현 형식 을 명확 하 게 지적 해 야 합 니 다. 작업 을 중단 하 는 것 은 보통 하나의 스 레 드 를 중지 하 는 것 을 의미 합 니 다. 그러나 작업 을 중단 하 는 것 이 스 레 드 가 하나의 임 무 를 처리 하 는 과정 에서 발생 한다 면 스 레 드 는 기록 되 어 막 힐 수 없습니다. 실현 류 는 이러한 행 위 를 명확 하 게 묘사 해 야 합 니 다.
    인터페이스 정의
    /**
       * Causes the current thread to wait until it is signalled or
       * {@linkplain Thread#interrupt interrupted}.
       *
       *                      。
       *
       * 

    The lock associated with this {@code Condition} is atomically * released and the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of four things happens: *

      *
    • Some other thread invokes the {@link #signal} method for this * {@code Condition} and the current thread happens to be chosen as the * thread to be awakened; or *
    • Some other thread invokes the {@link #signalAll} method for this * {@code Condition}; or *
    • Some other thread {@linkplain Thread#interrupt interrupts} the * current thread, and interruption of thread suspension is supported; or *
    • A "spurious wakeup" occurs. *
    * * :1) (Condition.signal), ; * 2) (Condition.signalAll); * 3) (spurious wakeup); * *

    In all cases, before this method can return the current thread must * re-acquire the lock associated with this condition. When the * thread returns it is guaranteed to hold this lock. * * , , 。 * BoundedBuffer , put , 。 , notFull.await()。 * notFull.await() 。 * *

    If the current thread: *

      *
    • has its interrupted status set on entry to this method; or *
    • is {@linkplain Thread#interrupt interrupted} while waiting * and interruption of thread suspension is supported, *
    * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. It is not specified, in the first * case, whether or not the test for interruption occurs before the lock * is released. * * (waiting) (Blocked) (Bloked) , (interrupted), * InterruptedException , (interrupted) 。 * , 。 * *

    Implementation Considerations * *

    The current thread is assumed to hold the lock associated with this * {@code Condition} when this method is called. * It is up to the implementation to determine if this is * the case and if not, how to respond. Typically, an exception will be * thrown (such as {@link IllegalMonitorStateException}) and the * implementation must document that fact. * *

    An implementation can favor responding to an interrupt over normal * method return in response to a signal. In that case the implementation * must ensure that the signal is redirected to another waiting thread, if * there is one. * * : * 。 * , IllegalMonitorStateException 。 。 * , signal 。 * * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ void await() throws InterruptedException; /** * Causes the current thread to wait until it is signalled. * * 。 , 。 * */ void awaitUninterruptibly(); /** * Causes the current thread to wait until it is signalled or interrupted, * or the specified waiting time elapses. * * * * * @param nanosTimeout the maximum time to wait, in nanoseconds * @return an estimate of the {@code nanosTimeout} value minus * the time spent waiting upon return from this method. * A positive value may be used as the argument to a * subsequent call to this method to finish waiting out * the desired time. A value less than or equal to zero * indicates that no time remains. * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ long awaitNanos(long nanosTimeout) throws InterruptedException; /** * Causes the current thread to wait until it is signalled or interrupted, * or the specified waiting time elapses. This method is behaviorally * equivalent to: * * awaitNanos 。 *

     {@code awaitNanos(unit.toNanos(time)) > 0}
    *
    * @param time the maximum time to wait
    * @param unit the time unit of the {@code time} argument
    * @return {@code false} if the waiting time detectably elapsed
    * before return from the method, else {@code true}
    * @throws InterruptedException if the current thread is interrupted
    * (and interruption of thread suspension is supported)
    */
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    /*
    * awaitNanos 방법 을 사용 합 니 다. awaitNanos 는 현재 의 나 초 를 정 하고 awaitUntil 은 마감 일 을 정 합 니 다.
    */
    boolean awaitUntil(Date deadline) throws InterruptedException;
    /**
    * Wakes up one waiting thread.
    *
    * 기타 대기 라인 깨 우기
    *
    * If any threads are waiting on this condition then one
    * is selected for waking up. That thread must then re-acquire the
    * lock before returning from {@code await}.
    *
    * 깨 어 난 스 레 드 는 await 방법 을 호출 하기 전에 자 물 쇠 를 가 져 야 합 니 다.
    *
    * Implementation Considerations
    *
    * An implementation may (and typically does) require that the
    * current thread hold the lock associated with this {@code
    * Condition} when this method is called. Implementations must
    * document this precondition and any actions taken if the lock is
    * not held. Typically, an exception such as {@link
    * IllegalMonitorStateException} will be thrown.
    *
    * 주의사항:
    * 구현 클래스 는 signal 방법 을 호출 하기 전에 자 물 쇠 를 가지 고 있어 야 합 니 다. 자 물 쇠 를 가지 고 있 지 않 지만 signal 방법 을 사용 했다 는 것 을 명확 하 게 설명 합 니 다.
    * IllegalMonitor State Exception 이상 을 던 집 니 다
    */
    void signal();
    /**
    * Wakes up all waiting threads.
    *
    * 모든 대기 라인 을 깨 웁 니 다.
    * 기타 주의사항 은 시그 널 과 유사 합 니 다.
    *
    * If any threads are waiting on this condition then they are
    * all woken up. Each thread must re-acquire the lock before it can
    * return from {@code await}.
    *
    * Implementation Considerations
    *
    * An implementation may (and typically does) require that the
    * current thread hold the lock associated with this {@code
    * Condition} when this method is called. Implementations must
    * document this precondition and any actions taken if the lock is
    * not held. Typically, an exception such as {@link
    * IllegalMonitorStateException} will be thrown.
    */
    void signalAll();
    Condition Object 실현 클래스
    AbstractQueuedSynchronizer
    다 중 스 레 드 환경 에서 스 레 드 차단 과 차단 을 취소 하 는 동기 화 기 를 제공 합 니 다.
    Node
    AbstractQueued Synchronizer. ConditionObject 는 AbstractQueued Synchronizer. Node 에 의존 합 니 다. Condition Object 를 소개 하 는 데 는 Node 의 실현 만 살 펴 보 겠 습 니 다. Node 는 대기 열 을 나타 내 는 요소 로 이전 노드 의 지향 과 다음 노드 의 지향 이 있 습 니 다. 여러 노드 는 하나의 링크 특성 을 가 진 대기 열 을 구성 합 니 다. 대기 열 은 CLH 입 니 다.(Craig, Landin, and Hagersten) 잠 금 대기 열의 변종 입 니 다. CLH 잠 금 은 대기 열 (자체 노드 로 구 성 된 링크) 입 니 다.자 물 쇠 는 배 고 픔 이 없고 먼저 얻 는 공평 성 을 확보 할 수 있 습 니 다. 일반적으로 자 물 쇠 를 사용 합 니 다. Abstract Queued Synchronizer 에 서 는 Node 로 구 성 된 대기 열 자 물 쇠 는 자 물 쇠 를 사용 하 는 것 이 아니 라 동기 화 체 제 를 만 듭 니 다. 동기 화 체 제 를 실현 하 는 전략 은 자 물 쇠 를 제어 하 는 전략 과 유사 하 며, 또한 이전 노드 의 정 보 를 통 해 이 루어 집 니 다. 노드 에 서 는 속성 "status"가 있 습 니 다.스 레 드 의 차단 상 태 를 추적 하면 스 레 드 의 차단 상태 에 따라 변 경 됩 니 다. 현재 노드 의 스 레 드 가 실행 되 었 거나 공유 자원 을 방출 할 수 있 을 때 노드 를 방출 합 니 다. 즉, 링크 의 첫 번 째 노드 가 대기 열 에서 나 올 때 후속 노드 를 알려 줍 니 다. 후속 노드 는 해당 하 는 노드 통 지 를 받 기 전에 테스트 를 해 왔 습 니 다.그림 에서 아래로 실행 할 수 있 는 검사 가 져 오기 (프로 그래 밍 차단 상태 가 아 닌 아래로 실행 할 수 있 는 조건 이 충족 되 는 지 순환 검사)이렇게 하면 스 레 드 가 순서대로 자동 으로 막 히 고 순서대로 실 행 될 수 있 습 니 다. 스 레 드 가 노드 를 가 져 올 때 다른 스 레 드 와 같은 순서 로 경쟁 할 수 있 기 때문에 가 져 오 는 데 실패 하고 기다 릴 수 있 습 니 다. 하나의 요 소 를 CLH 대기 열 에 추가 하려 면 팀 꼬리 요 소 를 수정 하고 팀 꼬리 요소 의 다음 요 소 를 새 요소 로 가리 키 며 하나의 요 소 를 새 요소 에 추가 해 야 합 니 다.팀 끝의 인용 은 새로 추 가 된 요 소 를 가리킨다. (구체 적 인 조작 은 이보다 복잡 할 것 이다.)이 작업 은 원자 조작 을 보장 해 야 합 니 다. 같은 대기 열 에서 도 첫 번 째 요 소 를 조작 해 야 합 니 다. 대기 열의 첫 번 째 요 소 를 이전 첫 번 째 요소 의 다음 요소 로 가 리 킵 니 다. 이 작업 도 원자 성 을 보장 해 야 합 니 다. 또한 대기 시간 이 너무 길 거나 프로그램 이 중단 되 는 등 작업 이 진정 으로 성공 하 는 지 고려 해 야 합 니 다. Node 는 여러 가지 모양 으로 설계 되 어 있 습 니 다.상태, 예 를 들 어 CANCELLED (취소 상태, 노드 관련 스 레 드 가 종료 되 었 음 을 나타 낸다), SIGNAL (알림, 노드 경쟁 스 레 드 가 실 행 될 수 있 음 을 나타 낸다), CONDITION (대기, 스 레 드 가 적당 한 시 기 를 기다 리 고 있 음 을 나타 낸다), PROPAGATE (전파, 공유 모드 에서 (여러 스 레 드 가 하나의 노드 를 공유 함) 에서 다음 공헌 노드 는 중 계 를 기다 릴 수 있다). 노드 가 CANCELLED 상태 로 설정 되면 이전 노드 가 이 노드 를 가리 키 는 것 을 참조 하여 직위 가 취소 되 지 않 은 CANCELLED 노드 를 다시 가리 키 도록 합 니 다.
    ConditionObject
    Condition Object 는 단 방향 목록 의 조건 부 대기 열 입 니 다. Node 가 구현 하 는 단 방향 링크 와 결합 하여 AbstractQueued Synchronizer 에 차단 (await, awaitNanos, awaitUntil 등) 과 차단 (doSignal, doSignal All) 기능 을 취소 합 니 다.
    public class ConditionObject implements Condition, java.io.Serializable {
          private static final long serialVersionUID = 1173984872572414699L;
          /**
           *         Node   。    (signal   signalAll)               。
           *              。
           *
           */
          private transient Node firstWaiter;
    
          /**
           *          。    (addConditionWaiter)                   ,
           *             ,              。
           */
          private transient Node lastWaiter;
    
          /**
           * Creates a new {@code ConditionObject} instance.
           */
          public ConditionObject() { }
    
          // Internal methods
    
          /**
           *          。                  。
           * @return its new wait node
           */
          private Node addConditionWaiter() {
              Node t = lastWaiter;
              // If lastWaiter is cancelled, clean out.
              if (t != null && t.waitStatus != Node.CONDITION) {
                  unlinkCancelledWaiters();
                  t = lastWaiter;
              }
              Node node = new Node(Thread.currentThread(), Node.CONDITION);
              if (t == null)
                  firstWaiter = node;
              else
                  t.nextWaiter = node;
              lastWaiter = node;
              return node;
          }
    
          /**
           *   Node   (ConditionObject       Node   )      (           )
           *     Node      CONDITION(-2)    0,
           *   Node                   ,
           *           (AbstractQueuedSynchronizer     Node   )。
           * @param first (non-null) the first node on condition queue
           */
          private void doSignal(Node first) {
              do {
                  if ( (firstWaiter = first.nextWaiter) == null)
                      lastWaiter = null;
                  first.nextWaiter = null;
              } while (!transferForSignal(first) &&
                       (first = firstWaiter) != null);
          }
    
          /**
           *   ConditionObject       Node             AbstractQueuedSynchronizer   
           *     Node      。
           * @param first (non-null) the first node on condition queue
           */
          private void doSignalAll(Node first) {
              lastWaiter = firstWaiter = null;
              do {
                  Node next = first.nextWaiter;
                  first.nextWaiter = null;
                  transferForSignal(first);
                  first = next;
              } while (first != null);
          }
    
          /**
           *                   (Condition)   ,        。
           */
          private void unlinkCancelledWaiters() {
              Node t = firstWaiter;
              Node trail = null;
              while (t != null) {
                  Node next = t.nextWaiter;
                  if (t.waitStatus != Node.CONDITION) {
                      t.nextWaiter = null;
                      if (trail == null)
                          firstWaiter = next;
                      else
                          trail.nextWaiter = next;
                      if (next == null)
                          lastWaiter = trail;
                  }
                  else
                      trail = t;
                  t = next;
              }
          }
    
          // public methods
    
          /**
           *                  ConditionObject       Node           
           * AbstractQueuedSynchronizer       Node    
           *
           * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
           *         returns {@code false}
           */
          public final void signal() {
              if (!isHeldExclusively())
                  throw new IllegalMonitorStateException();
              Node first = firstWaiter;
              if (first != null)
                  doSignal(first);
          }
    
          /**
           *   signal   ,               。
           *
           * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
           *         returns {@code false}
           */
          public final void signalAll() {
              if (!isHeldExclusively())
                  throw new IllegalMonitorStateException();
              Node first = firstWaiter;
              if (first != null)
                  doSignalAll(first);
          }
    
          /**
           *                            。
           *    Node           ,     InterruptedException。
           *       。                ,          ;
           *    LockSupport.park(this)       。
           *          (LockSupport.park(this))   ,        (InterruptedException)
           */
          public final void await() throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException();
              Node node = addConditionWaiter();
              int savedState = fullyRelease(node);
              int interruptMode = 0;
              while (!isOnSyncQueue(node)) {
                  LockSupport.park(this);
                  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                      break;
              }
              if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                  interruptMode = REINTERRUPT;
              if (node.nextWaiter != null) // clean up if cancelled
                  unlinkCancelledWaiters();
              if (interruptMode != 0)
                  reportInterruptAfterWait(interruptMode);
          }
    
          /**
           *  await  ,          。
           */
          public final long awaitNanos(long nanosTimeout)
                  throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException();
              Node node = addConditionWaiter();
              int savedState = fullyRelease(node);
              final long deadline = System.nanoTime() + nanosTimeout;
              int interruptMode = 0;
              while (!isOnSyncQueue(node)) {
                  if (nanosTimeout <= 0L) {
                      transferAfterCancelledWait(node);
                      break;
                  }
                  if (nanosTimeout >= spinForTimeoutThreshold)
                      LockSupport.parkNanos(this, nanosTimeout);
                  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                      break;
                  nanosTimeout = deadline - System.nanoTime();
              }
              if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                  interruptMode = REINTERRUPT;
              if (node.nextWaiter != null)
                  unlinkCancelledWaiters();
              if (interruptMode != 0)
                  reportInterruptAfterWait(interruptMode);
              return deadline - System.nanoTime();
          }
    
          /**
           *  await  ,         
           */
          public final boolean awaitUntil(Date deadline)
                  throws InterruptedException {
              long abstime = deadline.getTime();
              if (Thread.interrupted())
                  throw new InterruptedException();
              Node node = addConditionWaiter();
              int savedState = fullyRelease(node);
              boolean timedout = false;
              int interruptMode = 0;
              while (!isOnSyncQueue(node)) {
                  if (System.currentTimeMillis() > abstime) {
                      timedout = transferAfterCancelledWait(node);
                      break;
                  }
                  LockSupport.parkUntil(this, abstime);
                  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                      break;
              }
              if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                  interruptMode = REINTERRUPT;
              if (node.nextWaiter != null)
                  unlinkCancelledWaiters();
              if (interruptMode != 0)
                  reportInterruptAfterWait(interruptMode);
              return !timedout;
          }
      }
    

    좋은 웹페이지 즐겨찾기