AQS 소결

10730 단어
0x 01 개술
자바 의 잠 금 메커니즘 을 언급 하면 보통 두 가지 가 있 는데 하 나 는 jvm 의 synchronized 키워드 로 이 루어 지고 하 나 는 ReentranLock 입 니 다.전 자 는 jvm 이 실현 되 고 후 자 는 자바 코드 가 실현 되 며 AbstractQueued Synchronizer 는 자바 와 가방 에 있 는 여러 가지 실현 기반 입 니 다. 예 를 들 어 ReentranLock, Count Downlatch, Cyclic Barrier 등 입 니 다.
자물쇠 의 통 제 를 실현 하려 면 몇 군데 가 있 는 지 생각해 보 세 요.
  • 왜 자 물 쇠 를 가지 고 있 습 니까?왜 자 물 쇠 를 못 받 았 지?자물쇠 가 뭐 예요?
  • 어떻게 선점 을 실현 합 니까?자 물 쇠 를 빼 앗 지 못 한 스 레 드 는 자물쇠 가 풀 린 후에 어떤 논리 로 자 물 쇠 를 얻 을 수 있 도록 어떤 데이터 구조 로 저장 해 야 합 니까?
  • 스 레 드 제어 와 강 한 잠 금 사 이 는 어떻게 조 화 롭 습 니까?성능 이 가장 좋다
  • 스 레 드 여러 상태 간 에 스 레 드 선점 관 계 를 어떻게 통제 합 니까
  • 자 물 쇠 를 풀 때 스 레 드 는 어떻게 경쟁 합 니까?

  • 면접 을 볼 때 이 원 리 를 어떻게 설명 합 니까?먼저 몇 가지 기초 데이터 구 조 를 주목 하 다.
  • FIFO 의 CLH 대기 열 로 스 레 드 node
  • 를 저장 합 니 다.
  • 노드 대상 은 대열 의 노드 대상 이 고 그 안에 thread, node statu 등
  • 이 봉인 되 어 있다.
  • volatile 의 int state 로 잠 금 을 동시에 가 져 올 수 있 는 수량 을 제어 합 니 다
  • 몇 가지 중요 한 과정 을 닫 고 있 습 니 다.
  • 대기 열 초기 화
  • 노드 추가
  • 노드 상태 변경
  • 하나의 스 레 드 는 하위 클래스 를 통 해 이 루어 진 acquire 방법 으로 자물쇠 에 대한 획득 을 실현 합 니 다. 예 를 들 어 흔히 볼 수 있 는 ReentranLock 이 실현 하 는 state 값 은 1 입 니 다. 즉, 하나의 스 레 드 만 자 물 쇠 를 가 져 올 수 있 습 니 다.
    자 물 쇠 를 얻 지 못 한 스 레 드 에 대해 경쟁 을 시작 합 니 다. 여기 서 밤 을 들 고 가입 할 때 T2 T3 두 스 레 드 가 자 물 쇠 를 얻 지 못 하면 그들 은 대열 에 들 어가 대열 을 초기 화하 고 머리 노드 를 설정 하 며 꼬리 노드 를 하나씩 추가 합 니 다.
    그러나 이 문제 가 또 왔 습 니 다. 어떻게 여러 스 레 드 가 대기 열 읽 기와 쓰기 의 안전성 을 보장 합 니까?지금 은 자물쇠 가 없다 는 개념 을 알 아야 한다. 자물쇠 의 기본 틀 이 실현 되 고 있 기 때문이다.그래서 자바 여 기 는 낙관 자 물 쇠 를 사 용 했 습 니 다.낙관적 인 자 물 쇠 를 언급 하면 카 스 조작 을 언급 하지 않 을 수 없다. 카 스 를 언급 하면 volatile 이 스 레 드 의 가시 성 을 확보 해 야 한다.
    다른 하 나 는 낙관적 인 자 물 쇠 는 보통 '재 시도' 작업 과 밀접 한 관 계 를 가진다. 카 스 는 한 번 의 작업 의 성공 만 보장 할 수 있다. 만약 에 한 스 레 드 작업 이 성공 하면 다른 스 레 드 를 무시 하지 않 고 그들 에 게 갈 곳 을 주어 야 한다. 논리, 예 를 들 어 재 시도 논 리 는 당신 의 코드 가 반복 적 으로 실행 되 어야 한 다 는 것 을 의미한다. 예 를 들 어 while (true), 예 를 들 어 for (;) 등 이다.무릇 사 순환 은 반드시 중지 조건 을 가지 고 있 을 것 이다.
    AQS 에 서 는 카 스, volatile, 재 시도 로 대기 열 작업 의 안전성 을 확보 합 니 다.
    T2, T3 는 현재 (순서대로 선점 순 서 를 본다) 대기 열 에 추가 되 었 습 니 다. T3 는 팀 의 끝 이 고 T2 는 두 번 째 노드 입 니 다. 머리 노드 는 초기 화 노드 가 빈 노드 입 니 다. (모 르 겠 습 니 다. 이렇게 디자인 되 었 습 니 다. 하나의 제어 노드 로 만 존재 합 니 다) T2 는 팀 의 끝 에 있 습 니 다. 만약 에 뒤에 새로 온 스 레 드 가 있 으 면 팀 의 끝 에 추 가 됩 니 다.
    대기 열 에 만 넣 으 면 부족 할 것 입 니 다. 입대 후 AQS 는 하나의 for (;) 로 노드 를 조작 합 니 다. 즉, 속칭 스 레 드 자전 입 니 다. 자전 은 스 레 드 가 잠시 걸 리 지 않 고 공전 (빈 순환) 입 니 다. 순환 할 때마다 자 물 쇠 를 가 져 오 려 고 시도 합 니 다. 자물쇠 가 풀 리 지 않 았 는 지 (하위 클래스 acquire 방법 을 호출) 없 으 면 Node 상 태 를 변경 합 니 다.초기 화 에서 차단 가능 한 상태 로 바 뀌 었 습 니 다. 다음 자전 후 상태 에 따라 스 레 드 를 걸 고 자전 을 중지 합 니 다.
    이상 은 개술 일 뿐 입 니 다. 다음은 관건 적 인 소스 코드 와 그림 을 보 겠 습 니 다.
    0x 02 키 코드 와 그림
    2.1 노드 코드
    static final class Node {
            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
            //             
            static final int CANCELLED = 1;
            //                   
            static final int SIGNAL = -1;
            //            
            static final int CONDITION = -2;
            //                      
            static final int PROPAGATE = -3;
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
    }
    

    2.2AQS 구조
    [사진 업로드 실패... (image - 38499 c - 1536736127795)]
    2.3 노드 노드 상태 도
    2.4AQS 키 코드
    /**
     *     ReentrantLock  ,  lock     CAS               
     */
    public final void acquire(int arg) {
        logger.info("  {}     ", Thread.currentThread().getId());
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { //  tryAcquire  cas      ,   
            //     &&   node       
            selfInterrupt();
        }
    }
    

    try Acquire 의 하위 클래스 구현, Sync 클래스 ReentranLock 의 클래스:
    /**
     *      
     */
    final boolean nonfairTryAcquire(int acquires) {
        logger.info("nonfairTryAcquire {}     begin", Thread.currentThread().getId());
        final Thread current = Thread.currentThread();
        int c = getState();
        //            logger.info("nonfairTryAcquire {}   stat:{}", Thread.currentThread().getId(), c);
        if (c == 0) {//    state0,         ,          cas   
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                logger.info("nonfairTryAcquire  {}     success state = 0 ", Thread.currentThread().getId());
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            logger.info("nonfairTryAcquire  nextc is {}", nextc);
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            logger.info("nonfairTryAcquire  {}     success state != 0 ", Thread.currentThread().getId());
            return true;
        }
        logger.info("nonfairTryAcquire {}     fail", Thread.currentThread().getId());
        return false;
    }
    

    줄 끝 에 추가 하거나 대기 열 초기 화
    /**
    * Creates and enqueues node for current thread and given mode.
    *       ,        
    *     ,mode   null
    * mode node  nextWaiter
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    */
    private Node addWaiter(Node mode) {
      logger.info("addWaiter -{}", Thread.currentThread().getId());
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      //      ,        ,        ,
      Node pred = tail;
      if (pred != null) {
          node.prev = pred;
          if (compareAndSetTail(pred, node)) {
              pred.next = node;
              logger.info("addWaiter        ,         ");
              try {
                  logger.info("       {},      {}", node.thread.getId(), node.prev.thread.getId());
    
              } catch (Exception e) {
              }
              return node;
          }
      }
      //  ,  ,   
      enq(node);
      return node;
    }
    

    대기 열 을 초기 화하 고 헤드 노드 를 생 성하 여 팀 끝 에 추가 합 니 다.
    private Node enq(final Node node) {
        logger.info("enq");
        int i = 0;
        for (; ; ) {
            Node t = tail;
            if (t == null) { //  //tail    ,     ,       new Node(       )
                logger.info("tail    ,     ,       new Node(       )");
                if (compareAndSetHead(new Node()))//CAS     ,CAS         ,  thread  ,       head,        ,  else  
                    tail = head; //         ,      
            } else {
                logger.info("  {},   {}      ", Thread.currentThread().getId(), ++i);
                node.prev = t;
                if (compareAndSetTail(t, node)) {//CAS     ,      ,       ,    t,cas      ,     CAS      
                    t.next = node;
                    return t;//       
                }
            }
        }
    }
    

    자전 대기, 상태 변경, 스 레 드 걸 기
    /**
     *  node       ,             ,
     *           ,             ,
     *            acquireQueued(node, arg),
     *            !                     ,
     *
     * @param node the node
     * @param arg  the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        logger.info("acquireQueued start... {}", Thread.currentThread().getId());
        boolean failed = true;
        try {
            boolean interrupted = false;
            int i = 0;
            for (; ; ) {
                logger.info("acquireQueue-{}-  {} ", Thread.currentThread().getId(), ++i);
                final Node p = node.predecessor();//       
                if (p == head && tryAcquire(arg)) {//         ,     tryAcquire   
                    setHead(node);//              ,       
                    logger.info("acquireQueued setHead {}", Thread.currentThread().getId());
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //       ,         
                //     shouldParkAfterFailedAcquire  ,
                //         waitStatus   
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    상태 변경:
    /**
     *    node          ,    node status  。
     *   true      (  LockSupport)
     * //              
     * > 0,         ,  false
     * < 0,    false,        waitStatus   SIGNAL,       ,       .
     *
     * @param pred      
     * @param node     
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        logger.info("shouldParkAfterFailedAcquire-{}begin..", Thread.currentThread().getId());
        int ws = pred.waitStatus;
        logger.info("shouldParkAfterFailedAcquire-{} wait status:{}", Thread.currentThread().getId(), ws);
        if (ws == Node.SIGNAL)
            //SIGNAL,   true          ,     ,      ,
            //          ,           ,    InterruptedException     .
            return true;
        if (ws > 0) { //           ,   CANCELLED     ,        
            //>0,         ,  false
            do { //     
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // <=0, SINNAL, CANCELLED    false,
            //         (acquireQueue              )waitStatus CAS  SIGNAL,       ,       .
            logger.info("shouldParkAfterFailedAcquire-{}      waitStatus CAS  SIGNAL,       ,       ", Thread.currentThread().getId());
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    마지막 으로 깨 우기 동작 을 붙 입 니 다.
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        logger.info("unparkSuccessor-{} wait status :{}", Thread.currentThread().getId(), ws);
        if (ws < 0)//  0   ,         
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
        debug("unparkSuccessor      :", s);
        logger.info("unparkSuccessor       node :{}", tail.thread.getId());
    
        if (s == null || s.waitStatus > 0) {
            //node     ==null,              ,    ,   
            //     tail     ,    node.next   ?
            //     node.next       null     ,
            //     tail             。
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)//                node
                    s = t;
        }
        //          ,           ,              
        if (s != null) {
            debug("unparkSuccessor", s);
            logger.info("unparkSuccessor-{}       {}", Thread.currentThread().getId(), s.thread.getId());
            LockSupport.unpark(s.thread);
        }
    }
    

    2.5 아래 그림 은 잠 금 되 지 않 은 스 레 드 호출 프로 세 스 입 니 다.

    좋은 웹페이지 즐겨찾기