009 Reentrantlock 소스 | AQS

9785 단어
AQS는 전체적으로AbstractQueuedSynchronizer라고 불리며 intstate와 양단FIFO의 노드 대기열에 기대어 추상적인 대기열식 동기화기를 실현한다.AQS는 공유 자원에 다중 스레드로 접근하는 동기화 프레임워크를 정의했는데, 많은 동기화 클래스가 그것에 의존한다. 예를 들어 자주 사용하는 Reentrant Lock/Semaphore/Count Down Latch 등이다.
Node 클래스는 다음과 같습니다.
static final class Node {

    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;

        //    
    volatile Node prev;
    volatile Node next;

    //       
    volatile Thread thread;

    
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

대외적으로 다시 쓰는 방법이 필요합니다. 다시 쓰지 않으면 이상을 던질 수 있습니다.
//   
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

//   
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}


protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}


protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

//         
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

공개적으로 사용하는 방법은 아래와 같으며, 여기에 우리가 중점적으로 설명한 것을 열거한다
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

상기 보호된 방법과public의 방법에서 알 수 있듯이 acquire 방법은 사용자가 다시 쓰는 tryAcquire 방법을 호출하고 tryAcquire 반환 값에 따라 다음 동작을 결정하는데 release 방법은 같은 이치이다.
우리는 Reentrantlock의 실현과 결합하여 설명한다.
Reentrantlock | Sync
코드는 아래와 같습니다. 삭제는 중요하지 않습니다.
abstract static class Sync extends AbstractQueuedSynchronizer {

    abstract void lock(); //       lock  ,    
        
       //         
    final boolean nonfairTryAcquire(int acquires) {
          //    
        final Thread current = Thread.currentThread();
        int c = getState(); 
        if (c == 0) { //     , 0       
            if (compareAndSetState(0, acquires)) { //CAS     
                setExclusiveOwnerThread(current); //           
                return true;//  
            }
        }
        else if (current == getExclusiveOwnerThread()) { //       
            int nextc = c + acquires;
            if (nextc < 0) // 
                throw new Error("Maximum lock count exceeded");
            setState(nextc); //      
            return true;
        }
        return false;
    }
        //   
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;  //      
        if (Thread.currentThread() != getExclusiveOwnerThread()) //         
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { //         0,   ,  true,       。
            free = true;
            setExclusiveOwnerThread(null); //      
        }
        setState(c); 
        return free; //  ,  false,     
    }

    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
}

코드에서 알 수 있듯이sync는 aqs의state를 가지고 놀았을 뿐fifo 대기열과 관련이 없고 상술한 코드는 우리가 정의한 리셋 가능한 자물쇠와 대동소이하다.
Reentrantlock | NonfairSync
코드는 다음과 같습니다.
static final class NonfairSync extends Sync {
    //  Sync lock  
    final void lock() {
        if (compareAndSetState(0, 1)) //  cas  ,      ,       ,    
            setExclusiveOwnerThread(Thread.currentThread()); //      
        else
            acquire(1); //  aqs acquire,       tryAcquire
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires); //  nonfairTryAcquire
    }
}

Sync 생성 개체sync
Reentrantlock는sync의 acquire와realse 방법(aqs에서 온 것)을 사용하여 자물쇠를 가져오고 방출합니다.
public void lock() {
        sync.lock(); //       Sync,      Sync
 }

 public void unlock() {
        sync.release(1);
 }

그래서 중점 지향aqs acquire release 방법.코드 좀 더 붙여주세요.
acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) && //  tryAcquire  true,      ,  if,        
       // ,      ,  if   &&     
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter 메서드
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); //  Node
        Node pred = tail;
        if (pred != null) { //    
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node); //      ,              ,   cas  ,  enq  
      //      cas        
        return node;
    }

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued 메서드
 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { //  ,  
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { //        ,
                  // ,  node       head        ,       ,      ,        。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted; // head   ,         ,  
                }
                //      for  ,   ?            ,
                //            signal   ,        ?    !!!
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

//             signal,          
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //        signal,   
        return true;
    if (ws > 0) { //         
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//    ,      
  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

release
코드는 다음과 같습니다.
public final boolean release(int arg) {
        if (tryRelease(arg)) { 
        //     tryRelease    true,      
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
               //  successor, ,  head successor   ,
            return true;
        }
       //     ,       ,  true
        return false;
}

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);//    Node   0

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
          //     Node                 ,
         //              ,            
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
      //                     LockSupport.unpark()    
    if (s != null)
        LockSupport.unpark(s.thread);
}

Reentrantlock | FairSync
공평한 자물쇠는 막힌 라인이 깨어나면 새로운 라인에 뺏기지 않고 자물쇠를 얻을 수 있도록 보장한다.불공평한 잠금과 유일하게 다른 부분은ReentrantLock에서 상황에 따라 현재의 라인을 막는지 여부를 판단합니다
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
             //hasQueuedPredecessors   ,      ,       ,
            //      ,              
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//        ,             (   )
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t && 
        ((s = h.next) == null || s.thread != Thread.currentThread()); //
}

좋은 웹페이지 즐겨찾기