java 및 패키지 중 Semaphore 사용 및 원본 분석

6641 단어 다중 스레드
Semaphore는 하나의 스레드 컨트롤러로 초기화할 때 하나의 스레드 수량을 정하여 같은 시각을 제어할 수 있다. 지정된 수량의 스레드만 실행할 수 있고 다른 스레드가 있으면 그 중의 스레드가 풀린 후에만 계속 실행할 수 있다.
간단한 사용 예를 제시하다
public class SemaphoreDemo {
    public final static int SEM_SIZE = 10;

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(SEM_SIZE);
        MyThread t1 = new MyThread("t1", semaphore);
        MyThread t2 = new MyThread("t2", semaphore);
        t1.start();
        t2.start();
        int permits = 5;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(permits);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }


    }
}
class MyThread extends Thread {
    private Semaphore semaphore;

    public MyThread(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }

    public void run() {
        int count = 3;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(count);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(count);
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
    }
}

실행 결과
main trying to acquire
main acquire successfully
t2 trying to acquire
t2 acquire successfully
t1 trying to acquire
main release successfully
t1 acquire successfully
t2 release successfully
t1 release successfully

main 루틴은 먼저 가져오려고 시도합니다. 들어오는 매개 변수는 5입니다. 이때state의 값은 처음에 설정한 10에서 5로 변경됩니다. 다음에 t2 루틴은 가져오려고 시도하고state의 값을 2로 낮추려고 시도합니다. t1 루틴이 가져오려고 시도할 때state의 값은 acquire보다 작고 가져오는 데 실패하여 마운트합니다.main 루틴이 풀리면 t1 루틴은 계속 실행할 수 있습니다.
Semaphore의 원본 분석
Semaphore의 내부 구조는
Sync는 각각 NonfairSync와FairSync로 이루어지는데 전체적으로 Reentrant Lock과 유사하다. 다른 것은sync 방법에서 이루어진 것은 AQS의tryAcquire Shared이고 공유 자물쇠에 대한 획득 방식과tryRelease Shared이다.
라인에서 acquire 방법을 호출하는 실행 코드 논리
 public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

AQS 클래스
  public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

기본적으로 불공평한 경우 현재state값과 acquire를 판단하고 나머지state값을 되돌려줍니다
 final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

0보다 작으면 DoAcquireSharedInterruptibly 메서드
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //            
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor(); //           
                if (p == head) {
                    int r = tryAcquireShared(arg); //       ,  state      0
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); //  state   0,       head  
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //  state    0,       SIGNAL      
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

release 방법
public void release() {
        sync.releaseShared(1);
    }
  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

방출 시state의 값을 원래의 기초 위에서 1을 추가합니다. 업데이트가true로 되돌아오면 방출 작업을 실행합니다
private void doReleaseShared() {
      
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus; //         
                if (ws == Node.SIGNAL) {  
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //         0
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);   //          
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
 private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

머리 결점의 다음 노드를 깨우다
이상의 분석을 통해 Semaphore와 CountDownLatch의 실현 방식이 매우 유사하다는 것을 알 수 있다.둘 다 동기화 대기열을 사용하고 CyclicBarrier는 AQS의 조건 대기열을 사용합니다.
질문 있으면 바로잡아 주세요~

좋은 웹페이지 즐겨찾기