Java 다 중 스 레 드 시리즈--CyclicBarrier 상세 설명

9141 단어 자바
CyclicBarrier 안내
Cyclic Barrier 는 하나의 공공 장벽 점(comon barrier point)에 도달 할 때 까지 동기 화 보조 클래스 입 니 다.이 barrier 는 대기 라인 을 방출 한 후 다시 사용 할 수 있 기 때문에 순환 의 barrier 라 고 부른다.
CyclicBarrier 함수 목록
 
//        CyclicBarrier,                   ,        barrier          。 
 public CyclicBarrier(int parties) {
        this(parties, null);
   }

//        CyclicBarrier,                   ,     barrier           ,           barrier      
 public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
//            barrier     await     ,     。
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
//             barrier     await     ,     ,      
 public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

//    barrier       
 public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

//    barrier     
 public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
//   barrier
 public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
//         barrier      
 public int getParties() {
        return parties;
    }

CyclicBarrier 데이터 구조
CyclicBarrier 는'ReentrantLock 대상 lock'과'Condition 대상 trip'을 포함 하고 있 으 며 독점 잠 금 을 통 해 이 루어 집 니 다.

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();

CyclicBarrier 소스 코드 분석
1.구조 함수
Cyclicbarrier 의 구조 함 수 는 모두 2 개 입 니 다.Cyclicbarrier 와 Cyclicbarrier(int parties,Runnable barrier Action).첫 번 째 구조 함 수 는 두 번 째 구조 함 수 를 호출 하여 이 루어 진 것 이다.
public CyclicBarrier(int parties) {
        this(parties, null);
    }
 public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // parties        barrier     
        this.parties = parties;
         // count             
        this.count = parties;
        // barrierCommand  parties     barrier ,      
        this.barrierCommand = barrierAction;
    }

2.대기 함수
await()는 dowait()를 통 해 이 루어 집 니 다.
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

dowait()소스 코드:
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //      lock
        lock.lock();
        try {
            //    generation
            final Generation g = generation;

            //     generation false      
            if (g.broken)
                throw new BrokenBarrierException();
             //          ,   breakBarrier()  CyclicBarrier,   
             CyclicBarrier       
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //  count      
            int index = --count;
            //   index = 0  parties    barrier
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //    barrierCommand  null,      
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //          ,   generation
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //         ,    barrier  ,    ,     
            for (;;) {
                try {
                    //         ,   await  ,    awaitNanos  
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //     
            lock.unlock();
        }
    }

설명:dowait()의 역할 은 현재 스 레 드 를 막 는 것 입 니 다."parties 스 레 드 가 barrier 에 도착 할 때 까지"또는"현재 스 레 드 가 중단 되 었 습 니 다"또는"시간 초과"라 는 3 자 중 하나 가 발생 할 때 까지 현재 스 레 드 는 계속 실 행 됩 니 다.(01)generation 은 Cyclicbarrier 의 한 구성원 이 옮 겨 다 니 는 것 입 니 다.그 정 의 는 다음 과 같 습 니 다.
private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

Cyclic Barrier 에서 같은 라인 은 같은 세대,즉 같은 Generation 에 속한다.Cyclic Barrier 에 서 는 generation 대상 을 통 해 어느 세대 에 속 하 는 지 기록 합 니 다.파티 스 레 드 가 barrier 에 도착 하면 generation 은 업 데 이 트 됩 니 다.
(02)현재 스 레 드 가 중단 되면 Thread.interrupted()는 true 입 니 다.breakBarrier()를 통 해 CyclicBarrier 를 종료 합 니 다.breakBarrier()의 원본 코드 는 다음 과 같 습 니 다.
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()는 현재 중단 표 시 를 true 로 설정 합 니 다."이 Generation 을 중단 합 니 다"를 의미 합 니 다.동시에 count=parties 를 설정 하면 count 를 다시 초기 화 합 니 다.마지막 으로 signal All()을 통 해 Cyclic Barrier 의 모든 대기 스 레 드 를 깨 웁 니 다.
(03)"count 카운터"-1,즉--count;그리고'파티 스 레 드 가 barrier 에 도 착 했 습 니 다',즉 index 가 0 인지 아 닌 지 판단 합 니 다.index=0 일 때 barrierCommand 가 null 이 아니라면 이 barrierCommand 를 실행 합 니 다.barrierCommand 는 우리 가 CyclicBarrier 를 만 들 때 들 어 오 는 Runnable 대상 입 니 다.그 다음 에 nextGeneration()을 호출 하여 세대교체 작업 을 합 니 다.nextGeneration()의 소스 코드 는 다음 과 같 습 니 다.
private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

우선,signal All()을 호출 하여 Cyclic Barrier 의 모든 대기 스 레 드 를 깨 웁 니 다.이어서 count 를 다시 초기 화 합 니 다.마지막 으로 generation 의 값 을 업데이트 합 니 다.
(04)재 for(;)순환 중.timed 는 현재'시간 초과 대기'스 레 드 를 표시 하 는 데 사 용 됩 니 다.그렇지 않 으 면 trip.awat()를 통 해 대기 합 니 다.그렇지 않 으 면 awaitNanos()를 호출 하여 시간 초과 대기 합 니 다.
 
Cyclic Barrier 사용
public class Test {

    static class TaskThread extends Thread {

        CyclicBarrier barrier;

        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(getName() + "      A");
                barrier.await();
                System.out.println(getName() + "      A");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int threadNum = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "       ");
            }
        });

        for(int i = 0; i < threadNum; i++) {
            new TaskThread(barrier).start();
        }
    }
}
 

좋은 웹페이지 즐겨찾기