멀티스레드-CountDownLatch, CyclicBarrier, Semaphore, Exchanger, Phaser

9164 단어
CountDownLatch는 다른 라인에서 실행 중인 작업을 완성하기 전에 한 개 이상의 라인이 계속 기다릴 수 있도록 하는 동기화 보조 클래스입니다.주어진 계수로 CountDownLatch를 초기화합니다.countDown () 계수를 1로 호출합니다. 계수가 0에 도달하기 전에 await () 방법이 계속 막혀서 계수를 리셋할 수 없습니다.
public class CountDownLatch {
    private final Sync sync;
    public CountDownLatch(int count);
    public void countDown() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
}

CountDownLatch에는 주로countDown()과 await() 방법이 있다.countDown () 체감 계수, 계수가 0에 도달하면 모든 대기 라인이 있는지 여부입니다. 1. 현재 카운트가 0보다 크면 카운트가 1로 줄어듭니다. 2. 만약 1을 줄인 후 계수가 0이 되면 이 계수가 0이 되기를 기다리는 모든 라인을 다시 조정한다. 3. 만약 계수가 이미 0이 되면 어떠한 조작도 일어나지 않는다.await () 는 현재 라인을 계수가 0이 되기 전까지 계속 막습니다. 라인이 중단되거나 지정한 대기 시간을 초과하지 않는 한.계수가 0이면,true가 이 방법에 들어갔을 때, 현재 스레드가 중단 상태를 설정했거나, 대기할 때 중단되었을 때, InterruptedException 이상을 던지고, 현재 스레드의 중단 상태를 지웁니다.지정한 대기 시간을 초과하면false를 되돌려줍니다. 이 시간이 0보다 작으면 이 방법은 기다리지 않습니다.
package org.github.lujiango;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test16 {

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch begin = new CountDownLatch(1);
        final CountDownLatch end = new CountDownLatch(10);
        final ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            final int no = i + 1;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        begin.await();
                        TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + " arrived");
                    } catch (Exception e) {

                    } finally {
                        end.countDown();
                    }
                }
            };
            exec.submit(run);
        }

        System.out.println("Game start");
        begin.countDown();
        end.await();
        System.out.println("Game over");
        exec.shutdown();
    }

}

CyclicBarrier는 공통 장벽 지점에 도달할 때까지 스레드 세트가 서로 기다릴 수 있는 동기식 보조 클래스입니다.고정된 크기의 루틴과 관련된 프로그램에서, 이 루틴들은 항상 서로 기다려야 한다.
package org.github.lujiango;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test16 {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final CyclicBarrier end = new CyclicBarrier(10);
        final ExecutorService exec = Executors.newFixedThreadPool(10);
        System.out.println("Game start");
        for (int i = 0; i < 10; i++) {
            final int no = i + 1;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        end.await();
                        TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + " arrived");
                    } catch (Exception e) {

                    } finally {
                    }
                }
            };
            exec.submit(run);
        }
        System.out.println("Game over");
        exec.shutdown();

    }

}

주 작업을 수행하려면 모든 하위 작업이 완료되어야 합니다. 이때 CyclicBarrier를 사용할 수 있습니다.
Semaphore는 하나의 계수 신호량입니다. 신호량은 허가 집합을 유지합니다. 허가가 사용되기 전에 모든 acquire () 를 막고 허가를 받습니다.모든release () 는 허가를 방출합니다. 차단 중인 수령자를 방출할 수 있습니다.Semaphore는 허가된 번호만 계산하고 해당하는 행동을 취한다. 신호를 받은 라인은 코드에 들어갈 수 있고 그렇지 않으면 기다릴 수 있다.
package org.github.lujiango;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Test17 {

    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semp = new Semaphore(5);
        for (int i = 0; i < 20; i++) {
            final int no = i;
            Runnable run = new Runnable() {

                @Override
                public void run() {
                    try {
                        semp.acquire();
                        System.out.println("Accessing: " + no);
                        TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                    } catch (Exception e) {

                    } finally {
                        semp.release();
                    }
                }
            };
            exec.submit(run);
        }
        exec.shutdown();
    }

}

Exchanger Exchanger는 두 스레드 간에 데이터를 교환할 수 있으며, 두 스레드만 사용할 수 있으며, 더 많은 스레드 간 데이터 교환은 지원되지 않습니다.루틴 A가 Exchanger 대상의 exchage () 방법을 호출하면 막힙니다.B 라인에서도 exchange () 방법을 호출한 다음, 라인이 안전한 방식으로 데이터를 교환한 후에 A와 B 라인이 계속 실행됩니다.
package org.github.lujiango;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Exchanger;

public class Test18 {

    public static void main(String[] args) {
        Exchanger> ex = new Exchanger>();
        new A(ex).start();
        new B(ex).start();
    }

}

class A extends Thread {
    List list = new ArrayList();
    Exchanger> ex;

    public A(Exchanger> ex) {
        this.ex = ex;
    }

    @Override
    public void run() {
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            list.clear();
            list.add(random.nextInt(10));
            list.add(random.nextInt(10));
            list.add(random.nextInt(10));
            try {
                list = ex.exchange(list);
            } catch (Exception e) {

            }
        }
    }
}

class B extends Thread {
    List list = new ArrayList();
    Exchanger> ex;

    public B(Exchanger> ex) {
        this.ex = ex;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                list = ex.exchange(list);
            } catch (Exception e) {

            }
            System.out.println(list);
        }
    }
}

Phaser Phaser는 CountDownLatch 및 CyclicBarrier와 관련된 기능을 포함하는 유연한 스레드 동기화 도구입니다.CountDownLatch의countDown()과 await()는 Phaser의arrive()와 awaitAdvance(int n)를 대체할 수 있고 CyclicBarrier의await는Phaser의arriveAndAwaitAdvance() 방법으로 CountDownLatch 대신 Phaser를 사용할 수 있다.
package org.github.lujiango;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Test19 {

    public static void main(String[] args) throws InterruptedException {
        final Phaser latch = new Phaser(10);
        for (int i = 1; i <= 10; i++) {
            final int id = i;
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
                        System.out.println("thread: " + id + " is running");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.arrive();
                    }
                }
            });
            t.start();
        }
        latch.awaitAdvance(latch.getPhase());
        System.out.println("all thread has run");

    }

}
package org.github.lujiango;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class Test19 {

    public static void main(String[] args) throws InterruptedException {
        final Phaser latch = new Phaser(10);
        for (int i = 1; i <= 10; i++) {
            final int id = i;
            Thread t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
                        latch.arriveAndAwaitAdvance(); //           ,      ,      
                        System.out.println("thread: " + id + " is running");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        latch.arrive();
                    }
                }
            });
            t.start();
        }
    }

}

  
 
전재 대상:https://www.cnblogs.com/lujiango/p/7581039.html

좋은 웹페이지 즐겨찾기