java.util.concurrent 패키지 원본 읽기 03 자물쇠

43487 단어 Concurrent
Condition 커넥터
응용 장면: 하나의 라인이 어떤 condition이 만족하지 않기 때문에 끊어집니다. 이 condition이 만족할 때까지.
Object의 wait/notify와 유사하기 때문에 Condition 대상은 다중 루트에 공유되어야 하며, 자물쇠를 사용하여 상태의 일치성을 보호해야 한다
 
코드 예:
class BoundedBuffer {

     final Lock lock = new ReentrantLock();

     final Condition notFull     = lock.newCondition();

     final Condition notEmpty = lock.newCondition();



     final Object[] items = new Object[100];

     int putptr, takeptr, count;



     public void put(Object x) throws InterruptedException {

          lock.lock();

          try {

               while (count == items.length)

                    notFull.await();

               items[putptr] = x;

               if (++putptr == items.length) putptr = 0;

               ++count;

               notEmpty.signal();

          } finally {

               lock.unlock();

          }

     }         



     public Object take() throws InterruptedException {

          lock.lock();

          try {

               while (count == 0)

                    notEmpty.await();

               Object x = items[takeptr];

               if (++takeptr == items.length) takeptr = 0;

               --count;

               notFull.signal();

               return x;

          } finally {

               lock.unlock();

          }

     }

}

 
상기 코드는 Condition이 어떻게 사용되는지 잘 알 수 있고 뒤에 있는 BlockingXXX 유형의 데이터 구조는 모두 Condition에 사용된다.
 
signal (notify) 알림을 사용할 때 어떤 순서로 알림을 해야 합니까?
 
세 가지 대기 방식: 끊기지 않고 일정한 시간 간격으로 어느 시점을 기다린다
 
Lock 및 ReadWriteLock
두 개의 인터페이스, 후자는 전자의 하위 인터페이스가 아니다. 다음 ReadWriteLock 코드를 통해 이들의 관계를 알 수 있다.
 
public interface ReadWriteLock {

    /**

     * Returns the lock used for reading.

     *

     * @return the lock used for reading.

     */

    Lock readLock();



    /**

     * Returns the lock used for writing.

     *

     * @return the lock used for writing.

     */

    Lock writeLock();

}

 
 
두 인터페이스 모두 리셋 가능(Reentrant Lock,Reentrant ReadWrite Lock)의 실현을 가지고 있다.
 
LockSupport
도구 클래스, 작업 객체는 스레드이며 Unsafe 클래스를 기반으로 합니다.
기본 조작 Park와 unpark.Park는 현재 스레드를 실효시킵니다. (다른 스레드를 조작하는 것은 제공하지 않았습니다. 사실은 실현할 수 있습니다.) 다음 몇 가지 상황 중 하나가 나타날 때까지 잠시 끊습니다.
1) 다른 스레드는 unpark 방법으로 이 스레드를 조작합니다 2) 이 스레드가 중단되었습니다 3)park 방법은 즉시 되돌아갑니다
Blocker에 대해 라인이 끊긴 동기화 대상은 Blocker가 필요하지 않습니다.
역할이 뭘까요?
Park에는 세 가지 시간 종류의 호출이 있습니다
public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline)
 
위의 세 가지 방법은 대응하는 재부팅 방법이 있는데, 바로 Blocker 대상을 매개 변수로 추가하는 것이다
public static void park(Object blocker)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline)
 
unpark 정보
public static void unpark(Thread thread)
처음에는 왜 퍼블릭 static void unpark () 가 현재 라인을 조작하지 않았는지 알 수 없었는데, 나중에 생각해 보니, 하나의 라인 Park가 이미 block에 의해 실행되었기 때문에, 구출된 라인을 호출할 수 없습니다.
 
AbstractOwnableSynchronizer, AbstractQueuedSynchronizer, AbstractQueuedLongSynchronizer
그 후 양자는 첫 번째 종류의 자류다.
마지막 클래스는 JDK6에서부터 출현한 것으로 아직 구체적으로 실현되지 않은 하위 클래스입니다.
중간 클래스의 하위 클래스는 자물쇠에 다시 들어갈 수 있습니다
Abstract Ownable Synchronizer는 스레드에 의해 이 기능을 독점하는 Synchronizer를 실현했을 뿐 여러 스레드의 동기화를 어떻게 관리하는지 포함하지 않습니다.exclusive OwnerThread, set/get 방법이 포함되어 있습니다.
 
AbstractQueuedSynchronizer는Queue의 방식을 이용하여 자물쇠의 사용과 동기화를 관리하는데 자물쇠의 관리자에 해당한다.
우선 네 가지 가장 핵심적인 방법에 주목한다.
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
 
앞의 두 가지는 독점 자물쇠에 사용되고 뒤의 두 가지는 공유 자물쇠에 사용된다. 이 네 가지 방법은 하위 클래스에서 이루어진다. 즉, 자물쇠를 어떻게 가져오고 방출하는지AbstractQueuedSynchronizer는 참여하지 않는다. 기본적인 실현은 지원하지 않는다. 즉, UnsupportedOperationException을 던지는 것이다.
AbstractQueuedSynchronizer는 무엇을 합니까?
 
현재 스레드가 자물쇠를 가져오려고 시도할 때, AbstractQueuedSynchronizer는tryAcquire나tryAcquireshared를 호출해서 가져오려고 합니다.false를 얻으면 현재 스레드를 대기 대기열에 넣고 더 많은 작업을 합니다.우리는 다음과 같은 6가지 상황을 분석했다. 앞의 세 가지는 독점 자물쇠에 사용되고 뒤의 세 가지는 공유에 사용된다. 독점 자물쇠나 공유 자물쇠는 대기 방식에 따라 세 가지로 나뉜다. 즉, 중단 불가 라인 대기, 중단 라인 대기, 시간 제한 대기 시간 초과 포기를 시도한다.
이 여섯 가지 방법은 모두 int 형식의 매개 변수를 포함하는데, 이것은 위의tryAcquire와 같은 방법에 사용되는 것이다. 즉, 이것은 사용자 정의 매개 변수로 일반적으로 사용자 정의 상태를 나타내는 데 쓰인다.
1) 독점 잠금, 대기열에 넣은 후 잠금이 성공적으로 획득될 때까지 라인의 중단을 무시합니다
public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

 
주의: 대기열의 헤더 노드는dummy 노드입니다. 진정으로 기다리는 첫 번째 노드는 헤더 뒤의 노드입니다.
그러나 acquire가 성공한 후 첫 번째 대기 노드가 발견되면dummy의 헤더는 이 노드로 설정됩니다.
prev와thread는null로 원래의dummy의header를 옮기고 새로운dummy의Node로 바꾸는 것과 같다.
addWaiter () 방법은 현재 스레드를 대기 대기열에 넣고 Node 대상을 되돌려줍니다
acquireQueued () 방법은 Node 대상이 대기열에서 변화하는 것을 감시합니다. 라인이 끊기면true로 돌아가고 그렇지 않으면false로 돌아갑니다.
대기 중 인터럽트 신호가 검출되면 acquireQueued가true로 되돌아와selfInterrupt로 현재 라인을 인터럽트합니다.
 
acquireQueued 소스 코드
    final boolean acquireQueued(final Node node, int arg) {

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

               //              

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    return interrupted;

                }

               //shouldParkAfterFailedAcquire               ,     ,

               //   parkAndCheckInterrupt    ,                

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

    }

 
shouldParkAfterFailedAcquire의 소스 코드
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)

            //SIGNAL                       ,       

            return true;

        if (ws > 0) {

            /*

             * ws  0       Node      ,    ,          

             *                Node

             */

         do {

          node.prev = pred = pred.prev;

         } while (pred.waitStatus > 0);

         pred.next = node;

        } else {

            /*

             *   ws 0  PROPAGATE,          ,    SIGNAL  

             * 0    ,PROPAGATE    ,compareAndSetWaitStatus       SIGNAL

             *   ,(      ,    false)      SIGNAL    

             */

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

        }

        return false;

    }

 
 
2) 잠금을 독점하여 대기열에 넣은 후 잠금을 성공적으로 가져오거나 중단될 때까지
    public final void acquireInterruptibly(long arg) throws InterruptedException {

        if (Thread.interrupted())

            throw new InterruptedException();

        if (!tryAcquire(arg))

            doAcquireInterruptibly(arg);

    }

 
 
 
여기와 1)의 차이점은 DoAcquire Interruptibly를 호출하는 데 있다. 실제로 DoAcquire Interruptibly와 acquire Queueued는 차이가 매우 작다. 전자는 끊기지 않는다. 단지 빨간색 부분을 참고하여 끊긴 것을 발견하고 직접break를 가져와 자물쇠를 얻으려는 계획을 취소한다.
doAcquireInterruptibly의 소스 코드
    private void doAcquireInterruptibly(long arg)

        throws InterruptedException {

        final Node node = addWaiter(Node.EXCLUSIVE);

        try {

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    return;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    break;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

        // Arrive here only if interrupted

        cancelAcquire(node);

        throw new InterruptedException();

    }

 
3) 기한
doAcquireNanos의 소스 코드
    private boolean doAcquireNanos(long arg, long nanosTimeout)

        throws InterruptedException {

        long lastTime = System.nanoTime();

        final Node node = addWaiter(Node.EXCLUSIVE);

        try {

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    return true;

                }

                if (nanosTimeout <= 0) {

                    cancelAcquire(node);

                    return false;

                }



               //   parkNanos,                

                if (nanosTimeout > spinForTimeoutThreshold &&

                    shouldParkAfterFailedAcquire(p, node))

                    LockSupport.parkNanos(this, nanosTimeout);

                long now = System.nanoTime();

                nanosTimeout -= now - lastTime;

                lastTime = now;



               //        ,    ,      

                if (Thread.interrupted())

                    break;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

        // Arrive here only if interrupted

        cancelAcquire(node);

        throw new InterruptedException();

    }

 
 
주의: 안전하게 계산하는 방법은 한 번의 기다림이 아니라 즉시 시간을 초과하는 것이다. 왜냐하면 한 번의 기다림 시간은 반드시 미리 설정한 값과 같지 않고 여러 번의 기다림이 있기 때문에 누적 계산은 비교적 안전하다.
 
4), 5), 6)의 공유 자물쇠는 독점 자물쇠와 거의 일치한다.
첫 번째 차이점은 줄을 서서 자신의 차례가 될 때 사용하는 setHead And Propagate 방법은 setHead에 비해 좀 복잡하다. 이것은 독점 자물쇠와 공유 자물쇠의 차이로 결정된다.
 
setHeadAndPropagate의 소스 코드???
    private void setHeadAndPropagate(Node node, long propagate) {

        Node h = head; // Record old head for check below

        setHead(node);

        /*

         * Try to signal next queued node if:

         * Propagation was indicated by caller,

         * or was recorded (as h.waitStatus) by a previous operation

         * (note: this uses sign-check of waitStatus because

         * PROPAGATE status may transition to SIGNAL.)

         * and

         * The next node is waiting in shared mode,

         * or we don't know, because it appears null

         *

         * The conservatism in both of these checks may cause

         * unnecessary wake-ups, but only when there are multiple

         * racing acquires/releases, so most need signals now or soon

         * anyway.

         */

        if (propagate > 0 || h == null || h.waitStatus < 0) {

            Node s = node.next;

            if (s == null || s.isShared())

                doReleaseShared();

        }

    }

 
 
4) DoAcquireShared는selfInterrupt()를 자신의 방법으로 옮긴다
 
2)release 섹션
작용, 자물쇠 방출, 다음 대기 라인 깨우기
 
AbstractQueuedLongSynchronizer
AbstractQueuedLongSynchronizer와 AbstractQueuedSynchronizer의 차이점은 acquire와release의arg 파라미터가 int 형식이 아닌 롱이라는 데 있다.
 
ReentrantLock
다시 들어갈 수 있는 자물쇠란,thread가lock을 얻었을 때, 다시 이 자물쇠를 요청하면 바로 되돌아오는 것이다.
AbstractQueuedSynchronizer의 하위 클래스(Sync,NonfairSync,FairSync)를 사용하여 자물쇠를 잠그고 해제하는 관리를 합니다.
 
state는 0과 같이 현재 라인이 자물쇠를 차지하지 않았음을 나타낸다. 다음 두 개의 자물쇠를 가져오는 과정은 기본적으로 유사하다. 공통된 과정은
우선 이 자물쇠를 사용했는지 확인하고, 없으면 이 자물쇠를 사용하고 set State를 사용합니다. 그렇지 않으면 그 자물쇠를 사용한 라인이 현재 라인인지 확인하고, 만약 그렇다면,
setState만 아니면 false로 돌아갑니다.
Sync#nonfairTryAcquire
        final boolean nonfairTryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0) // overflow

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

 
FairSync#tryAcquire
        protected final boolean tryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (isFirst(current) &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

    }

 
FairSync의 유일한 차이점은 isFirst 호출이고 UnfairSync는 누가 빼앗았는지 전혀 검사하지 않는다.
    final boolean isFirst(Thread current) {

        Node h, s;

        return ((h = head) == null ||

                ((s = h.next) != null && s.thread == current) ||

                fullIsFirst(current));

    }

 
isFirst는 라인이 줄을 서 있는지 확인합니다. 라인이 없으면 현재 라인은 자물쇠를 얻을 수 있습니다. 대기열이 있으면 현재 라인이 첫 번째인지 확인하십시오.
 
Sync#tryRelease
        protected final boolean tryRelease(int releases) {

            int c = getState() - releases;

            if (Thread.currentThread() != getExclusiveOwnerThread())

                throw new IllegalMonitorStateException();

            boolean free = false;

            if (c == 0) {

                free = true;

                setExclusiveOwnerThread(null);

            }

            setState(c);

            return free;

        }

 
try Release는 현재 스레드가 잠겨 있는지 확인합니다. 만약 리셋 이상이 아니라면.다음에state에서 계수를 빼서 새state를 얻을 것입니다. 만약state가 0이면 모든 자물쇠가 풀렸음을 나타냅니다.
 
ReentrantReadWriteLock
Reentrant ReadWrite Lock은 동료가 공유 자물쇠(읽기 자물쇠)와 단독 자물쇠(쓰기 자물쇠)를 관리하기 때문에 비교적 복잡하다.
또한 AbstractQueuedSynchronizer의 하위 클래스(Sync,NonfairSync,FairSync)를 사용하여 자물쇠를 잠그고 방출하는 관리를 해야 한다.(이름은 같지만 실현은 다르다).
Sync 클래스
1) Sync의 state는 32비트, 높은 16비트는 공유 잠금 상태, 낮은 16비트는 단독 잠금 상태.
 
        /*

         * Note that tryRelease and tryAcquire can be called by

         * Conditions. So it is possible that their arguments contain

         * both read and write holds that are all released during a

         * condition wait and re-established in tryAcquire.

         */



        protected final boolean tryRelease(int releases) {

            int nextc = getState() - releases;

            if (Thread.currentThread() != getExclusiveOwnerThread())

                throw new IllegalMonitorStateException();

            //          ,   0            ,        

            //     

            if (exclusiveCount(nextc) == 0) {

                setExclusiveOwnerThread(null);

                setState(nextc);

                return true;

            } else {

                setState(nextc);

                return false;

            }

        }



        protected final boolean tryAcquire(int acquires) {

            /*

             * Walkthrough:

             * 1. if read count nonzero or write count nonzero

             *     and owner is a different thread, fail.

             * 2. If count would saturate, fail. (This can only

             *    happen if count is already nonzero.)

             * 3. Otherwise, this thread is eligible for lock if

             *    it is either a reentrant acquire or

             *    queue policy allows it. If so, update state

             *    and set owner.

             */

            Thread current = Thread.currentThread();

            int c = getState();

            int w = exclusiveCount(c);

            if (c != 0) {

                // c != 0             ,w == 0       

                ////// (Note: if c != 0 and w == 0 then shared count != 0)

                if (w == 0 || current != getExclusiveOwnerThread())

                    return false;

                if (w + exclusiveCount(acquires) > MAX_COUNT)

                    throw new Error("Maximum lock count exceeded");

            }

            //      ,         :

            // 1) c == 0       (     w == 0    )

            // 2)          ,            



            // w == 0          ,     

            // writerShouldBlock     ,  FairSync UnfairSync     

            //                      

            //   FairSync,writerShouldBlock  isFirst  ,

            //   isFirst,        ,          ,  fullIsFirst   true

            //   fullIsFirst,     

            //   UnfairSync,writerShouldBlock    false,         (  Unfair)

            if ((w == 0 && writerShouldBlock(current)) ||

                !compareAndSetState(c, c + acquires))

                return false;



            //

            setExclusiveOwnerThread(current);

            return true;

        }



        //     HoldCounter   ThreadLocal                 

        // cachedHoldCounter                 ThreadLocal  

        protected final boolean tryReleaseShared(int unused) {

            HoldCounter rh = cachedHoldCounter;

            Thread current = Thread.currentThread();

            if (rh == null || rh.tid != current.getId())

                rh = readHolds.get();

            // tryDecrement()           ,  0       ( 1)。

            if (rh.tryDecrement() <= 0)

                throw new IllegalMonitorStateException();



            //         

            for (;;) {

                int c = getState();

                int nextc = c - SHARED_UNIT; //           

                if (compareAndSetState(c, nextc))

                    return nextc == 0;

            }

        }



        protected final int tryAcquireShared(int unused) {

            /*

             * Walkthrough:

             * 1. If write lock held by another thread, fail

             * 2. If count saturated, throw error

             * 3. Otherwise, this thread is eligible for

             *    lock wrt state, so ask if it should block

             *    because of queue policy. If not, try

             *    to grant by CASing state and updating count.

             *    Note that step does not check for reentrant

             *    acquires, which is postponed to full version

             *    to avoid having to check hold count in

             *    the more typical non-reentrant case.

             * 4. If step 3 fails either because thread

             *    apparently not eligible or CAS fails,

             *    chain to version with full retry loop.

             */

            Thread current = Thread.currentThread();

            int c = getState();

            //            

            if (exclusiveCount(c) != 0 &&

                getExclusiveOwnerThread() != current)

                return -1;

            //            

            if (sharedCount(c) == MAX_COUNT)

                throw new Error("Maximum lock count exceeded");



            //    writerShouldBlock,readerShouldBlock     ,     ,

            //                  

            //   UnfairSync,            ,                   ,

            //   block        

            //   FairSync,    isFirst      

            if (!readerShouldBlock(current) &&

                compareAndSetState(c, c + SHARED_UNIT)) {

                HoldCounter rh = cachedHoldCounter;

                if (rh == null || rh.tid != current.getId())

                    cachedHoldCounter = rh = readHolds.get();

                rh.count++;

                return 1;

            }

 

            //   CAS                

            //   :           (      ),                 ,     

            return fullTryAcquireShared(current);

        }

 
fullTryAcquireShared
적색 섹션과 함께 카운트 캐시 증가
        /**

         * Full version of acquire for reads, that handles CAS misses

         * and reentrant reads not dealt with in tryAcquireShared.

         */

        final int fullTryAcquireShared(Thread current) {

            /*

             * This code is in part redundant with that in

             * tryAcquireShared but is simpler overall by not

             * complicating tryAcquireShared with interactions between

             * retries and lazily reading hold counts.

             */

            HoldCounter rh = cachedHoldCounter;

            if (rh == null || rh.tid != current.getId())

                rh = readHolds.get();

            for (;;) {

                int c = getState();

                int w = exclusiveCount(c);

                //

                if ((w != 0 && getExclusiveOwnerThread() != current) ||

                    ((rh.count | w) == 0 && readerShouldBlock(current)))

                    return -1;

                if (sharedCount(c) == MAX_COUNT)

                    throw new Error("Maximum lock count exceeded");

                if (compareAndSetState(c, c + SHARED_UNIT)) {

                    cachedHoldCounter = rh; // cache for release

                    rh.count++;

                    return 1;

                }

            }

        }

 
 
ReadLock 및 WriteLock 정보
AbstractQueuedSynchronizer가 공유 자물쇠/독점 자물쇠에 대한 acquire와release를 호출하는 방법으로 실현
 
Lock은 독점 자물쇠만 있는 자물쇠가 없다는 걸 알 수 있어요.
 
Node#nextWaiter
null, 단독 잠금 Node
SHARED, 공유 잠금 Node
기타, 어떤 조건하의 다음 대기자

좋은 웹페이지 즐겨찾기