자바 고 병발 (4) - 자바 원자 류 상세 설명

15343 단어
자바 높 은 병발 (1) - 병발 프로 그래 밍 의 몇 가지 기본 개념 자바 높 은 병발 (2) - 자바 메모리 모델 과 스 레 드 자바 높 은 병발 (3) - CountDownlatch, Cyclic Barrier 와 Semaphore 자바 높 은 병발 (4) - 자바 원자 류 상세 해석 자바 높 은 병발 (5) - 스 레 드 안전 책 략 자바 높 은 병발 (6) - 자물쇠 의 최적화 및 JVM 이 잠 금 최적화 에 대한 노력
Countdown Latch, Semaphore 기반 이전 글 보기 Countdown Latch, CyclicBarrier, Semaphore
스 레 드 가 안전 하지 않 은 높 은 병행 실현
클 라 이언 트 는 5000 개의 작업 을 모 의 수행 합 니 다. 스 레 드 수량 은 200 이 고 스 레 드 마다 한 번 실행 하면 count 수 를 1 로 추가 합 니 다. 실행 이 끝 난 후에 count 의 값 을 인쇄 합 니 다.
package atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import annotation.NotThreadSafe;

@NotThreadSafe
public class NotThreadSafeConcurrency {

    private static int CLIENT_COUNT = 5000;
    private static int THREAD_COUNT = 200;
    private static int count = 0;
    private static int[] values = new int[11];

    private static ExecutorService executorService = Executors.newCachedThreadPool();
    private final static Semaphore semaphore = new Semaphore(THREAD_COUNT);
    private final static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_COUNT);

    public static void main(String[] args) throws Exception {
        testAtomicInt();
    }

    private static void testAtomicInt() throws Exception {
        for (int i = 0; i < CLIENT_COUNT; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // count   1,    1   
                countDownLatch.countDown();
            });
        }
        //              
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("ConcurrencyDemo:" + count);
    }

    private static void add() {
        count++;
    }
}
//----------------------------------    -------------------------------------------
         ,         <= 5000
ConcurrencyDemo:5000
ConcurrencyDemo:4999
ConcurrencyDemo:4991
ConcurrencyDemo:4997

스 레 드 안전 의 높 은 병행 실현 AtomicInteger
package concurrency;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import annotation.NotThreadSafe;
import annotation.ThreadSafe;

@ThreadSafe
public class ThreadSafeConcurrency {

    private static int CLIENT_COUNT = 5000;
    private static int THREAD_COUNT = 200;
    private static AtomicInteger count = new AtomicInteger(0);

    private static ExecutorService executorService = Executors.newCachedThreadPool();
    private final static Semaphore semaphore = new Semaphore(THREAD_COUNT);
    private final static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_COUNT);

    public static void main(String[] args) throws Exception {
        testAtomicInteger();
    }

    private static void testAtomicInteger() throws Exception {
        for (int i = 0; i < CLIENT_COUNT; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // count   1,    1   
                countDownLatch.countDown();
            });
        }
        //              
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("ConcurrencyDemo:" + count);
    }

    private static void add() {
        count.incrementAndGet();
    }
}

//----------------------------------    -------------------------------------------
        ,         == 5000
ConcurrencyDemo:5000
ConcurrencyDemo:5000
ConcurrencyDemo:5000

AtomicInteger 원자 성 확보
JDK 1.5 에 자바 util. concurrent (J. U. C) 패 키 지 를 추가 하여 CAS 위 에 세 웠 습 니 다.CAS 는 비 차단 알고리즘 의 흔 한 구현 으로 synchronized 와 같은 차단 알고리즘 에 비해 성능 이 좋 습 니 다.
CAS
CAS 란 Compare and Swap 이라는 뜻 으로 비교 하고 조작 한 다 는 뜻 이다.많은 CPU 가 CAS 명령 을 직접 지원 합 니 다.CAS 는 낙관적 인 잠 금 기술 로 여러 스 레 드 가 CAS 를 사용 하여 같은 변 수 를 동시에 업데이트 하려 고 시도 할 때 그 중의 한 스 레 드 만 변수의 값 을 업데이트 할 수 있 고 다른 스 레 드 는 모두 실 패 했 습 니 다. 실패 한 스 레 드 는 걸 리 지 않 고 이번 경쟁 에서 실 패 했 음 을 알 리 며 다시 시도 할 수 있 습 니 다.
CAS 작업 은 메모리 위치 (V), 예상 원 치 (A), 새 값 (B) 세 개 를 포함 합 니 다.예상 치 A 와 메모리 V 만 동시에 메모리 V 를 B 로 변경 하지 않 으 면 아무것도 하지 않 습 니 다.
JDK 1.5 에서 바 텀 지원 을 도입 하여 int, long 과 대상 의 인용 등 유형 에서 CAS 의 조작 을 공 개 했 고 JVM 은 이 를 바 텀 하드웨어 로 컴 파일 하여 제공 하 는 가장 효과 적 인 방법 으로 CAS 를 실행 하 는 플랫폼 에서 실행 할 때 이 를 해당 하 는 기계 명령 으로 컴 파일 했다.java. util. concurrent. atomic 패키지 아래 의 모든 원자 변수 유형, 예 를 들 어 AtomicInteger 는 이러한 바 텀 JVM 을 사용 하여 디지털 형식의 인용 유형 에 효율 적 인 CAS 작업 을 지원 합 니 다.
CAS 조작 에 서 는 ABA 문제 가 발생 할 수 있다.V 값 이 A 에서 B 로, B 에서 A 로 바 뀌 었 다 면 여전히 변화 가 있 었 다 고 생각 하고 알고리즘 에서 의 절 차 를 다시 수행 해 야 한 다 는 것 이다.
간단 한 해결 방안 이 있 습 니 다. 특정한 인용 값 을 업데이트 하 는 것 이 아니 라 두 개의 값 을 업데이트 하 는 것 입 니 다. 하나의 인용 과 하나의 버 전 번 호 를 포함 합 니 다. 이 값 이 A 에서 B 로 변 한 후에 B 가 A 로 변 하 더 라 도 버 전 번 호 는 다 릅 니 다.
Atomic Stamped Reference 와 Atomic Markable Reference 는 두 변수 에서 원자의 조건 업 데 이 트 를 수행 하 는 것 을 지원 합 니 다.AtomicStamped Reference 는 '대상 - 인용' 이원 그룹 을 업데이트 하고 인용 에 '버 전 번호' 를 추가 하여 ABA 문 제 를 피 합 니 다. AtomicMarkableReference 는 '대상 인용 - 불 값' 의 이원 그룹 을 업데이트 합 니 다.
AtomicInteger 구현
AtomicInteger 는 원자 조작 을 지원 하 는 Integer 류 로 AtomicInteger 유형 변수 에 대한 증가 와 감소 작업 이 원자 적 이 며 여러 스 레 드 에서 의 데이터 불일치 문제 가 발생 하지 않도록 하 는 것 입 니 다.AtomicInteger 를 사용 하지 않 으 면 순서대로 가 져 오 는 ID 를 실현 하려 면 가 져 올 때마다 잠 금 동작 을 해서 같은 ID 를 가 져 오 는 현상 을 피해 야 합 니 다.
package java.util.concurrent.atomic;

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    public AtomicInteger() {
    }

    public final int get() {
        return value;
    }

    //compareAndSet()      compareAndSwapInt()       native   。compareAndSet               value    ,update    1    , compareAndSet        Sun   UnSafe   compareAndSwapInt      ,     native   。
    //compareAndSwapInt      CPU   CAS       。     CAS            ,                        。     CAS     CPU   ,       。
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    //        value    ,    value   1,         next   ,  ,           ,          ,     compareAndSet   ,      ,          compareAndSet     。
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    public final int getAndDecrement() {
        for (;;) {
            int current = get();
            int next = current - 1;
            if (compareAndSet(current, next))
                return current;
        }
    }
}

AtomicInteger 에는 Increment AndGet () 과 Decrement AndGet () 방법 도 있다. 그들의 실현 원 리 는 위의 두 가지 방법 과 완전히 같 고, 반환 값 이 다르다 는 차이 가 있다. getAndIncrement () 와 getAndDecrement () 두 가지 방법 은 이전의 값, 즉 current 를 바 꾸 는 것 이다.Incrediment AndGet () 과 Decrement AndGet () 두 가지 방법 은 변 경 된 값, 즉 next 를 되 돌려 줍 니 다.
Atomic 확장 기타 클래스
원자 업데이트 기본 형식
원자 방식 으로 기본 유형 을 업데이트 하고 Atomic 패 키 지 는 다음 과 같은 세 가지 종 류 를 제공 합 니 다.
Atomic Boolean: 원자 업데이트 불 형식.
AtomicInteger: 원자 업데이트 정형.
AtomicLong: 원자 가 긴 정형 을 업데이트 합 니 다.
원자 업데이트 배열
원자 방식 으로 배열 의 특정한 요 소 를 업데이트 하고 Atomic 가방 은 다음 과 같은 세 가지 종 류 를 제공 합 니 다.Atomic IntegerArray: 원자 가 전체 배열 의 요 소 를 업데이트 합 니 다.Atomic LongArray: 원자 가 긴 정형 배열 의 요 소 를 업데이트 합 니 다.AtomicReferenceArray: 원자 업데이트 참조 형식 배열 의 요소 입 니 다.
int [] 테스트
package concurrency;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import annotation.NotThreadSafe;
import annotation.ThreadSafe;

@NotThreadSafe
public class ThreadSafeConcurrency {

    private static int CLIENT_COUNT = 5000;
    private static int THREAD_COUNT = 200;
    private static int[] values = new int[11];

    private static ExecutorService executorService = Executors.newCachedThreadPool();
    private final static Semaphore semaphore = new Semaphore(THREAD_COUNT);
    private final static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_COUNT);

    public static void main(String[] args) throws Exception {
        testAtomicIntArray();
    }

    private static void testAtomicIntArray() throws Exception {
        for (int i = 0; i < CLIENT_COUNT; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    for (int j = 0; j < 10; j++) {//     +1
                        values[j]++;
                    }
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // count   1,    1   
                countDownLatch.countDown();
            });
        }
        //              
        countDownLatch.await();
        executorService.shutdown();
        for (int i = 0; i < 10; i++) {
            System.out.print(values[i] + " ");
        }

    }
}
//----------------------------------    -------------------------------------------
4999 4998 4999 4997 4997 4998 4999 4998 4997 4997 

AtomicIntegerArray 테스트
package concurrency;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.logging.Logger;

import annotation.NotThreadSafe;
import annotation.ThreadSafe;

@ThreadSafe
public class ThreadSafeConcurrency {

    private static int CLIENT_COUNT = 5000;
    private static int THREAD_COUNT = 200;
    private static AtomicInteger count = new AtomicInteger(0);
    private static int[] values = new int[10];
    private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(values);

    private static ExecutorService executorService = Executors.newCachedThreadPool();
    private final static Semaphore semaphore = new Semaphore(THREAD_COUNT);
    private final static CountDownLatch countDownLatch = new CountDownLatch(CLIENT_COUNT);

    public static void main(String[] args) throws Exception {
        testAtomicIntegerArray();
    }

    private static void testAtomicIntegerArray() throws Exception {
        for (int i = 0; i < CLIENT_COUNT; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    for (int j = 0; j < 10; j++) {//     +1
                        atomicIntegerArray.incrementAndGet(j);
                    }
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // count   1,    1   
                countDownLatch.countDown();
            });
        }
        //              
        countDownLatch.await();
        executorService.shutdown();
        for (int i = 0; i < 10; i++) {
            System.out.print(atomicIntegerArray.get(i) + " ");
        }

    }
}
//----------------------------------    -------------------------------------------
5000 5000 5000 5000 5000 5000 5000 5000 5000 5000 

원자 업데이트 참조
원자 업데이트 기본 형식의 AtomicInteger 는 하나의 변수 만 업데이트 할 수 있 습 니 다. 원자 가 여러 변 수 를 업데이트 하려 면 이 원자 업데이트 참조 형식 이 제공 하 는 클래스 를 사용 해 야 합 니 다.Atomic 가방 은 다음 과 같은 세 가지 종 류 를 제공 합 니 다.AtomicReference: 원자 업데이트 참조 형식 입 니 다.AtomicReference FieldUpdater: 원자 업데이트 참조 형식의 필드 입 니 다.Atomic Markable Reference: 원자 업데이트 에 표 시 된 인용 형식 입 니 다.
package concurrency;
import java.util.concurrent.atomic.AtomicReference;

@ThreadSafe
public class ThreadSafeConcurrency {
    private static AtomicReference atomicUserRef = new AtomicReference(0);
    public static void main(String[] args) throws Exception {
        testAtomicReference();
    }
    private static void testAtomicReference() throws Exception {
        atomicUserRef.compareAndSet(0, 2);
        atomicUserRef.compareAndSet(0, 1);
        atomicUserRef.compareAndSet(1, 3);
        atomicUserRef.compareAndSet(2, 4);
        atomicUserRef.compareAndSet(3, 5);
        System.out.println("ConcurrencyDemo:" + atomicUserRef.get().toString());
    }
}
//----------------------------------    -------------------------------------------
ConcurrencyDemo:4

원자 업데이트 필드
특정한 종류의 필드 를 원자 적 으로 업데이트 하려 면 원자 업데이트 필드 류 를 사용 해 야 하 며, Atomic 패 키 지 는 다음 과 같은 3 가지 종 류 를 제공 하여 원자 필드 를 업데이트 합 니 다.AtomicInteger FieldUpdater: 원자 가 전체 필드 를 업데이트 하 는 업데이트 기 입 니 다.Atomic LongFieldUpdater: 원자 가 긴 필드 의 업데이트 기 를 업데이트 합 니 다.AtomicStamped Reference: 원자 업데이트 에 버 전 번호 가 있 는 참조 형식 입 니 다.이 종 류 는 전체 수 치 를 참조 와 연결 하여 원자의 업데이트 데이터 와 데 이 터 를 사용 할 수 있 는 버 전 번호 로 CAS 를 사용 하여 원자 업 데 이 트 를 할 때 발생 할 수 있 는 ABA 문 제 를 해결 할 수 있다.
필드 클래스 를 원자 적 으로 업데이트 하려 면 두 단계 가 필요 합 니 다.
첫 번 째 단 계 는 원자 업데이트 필드 클래스 가 추상 클래스 이기 때문에 사용 할 때마다 정적 방법 인 new Updater () 를 사용 하여 업데이트 기 를 만 들 고 업데이트 할 클래스 와 속성 을 설정 해 야 합 니 다.
두 번 째 단 계 는 클래스 의 필드 (속성) 를 업데이트 하려 면 Public volatile 수식 자 를 사용 해 야 합 니 다.
package concurrency;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

@ThreadSafe
public class ThreadSafeConcurrency {
    private static AtomicIntegerFieldUpdater atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
    private static User user;

    public static void main(String[] args) throws Exception {
        testAtomicIntegerFieldUpdater();
    }

    private static void testAtomicIntegerFieldUpdater() throws Exception {
        user = new User("user ", 100);
        atomicIntegerFieldUpdater.incrementAndGet(user);
        //              
        System.out.println("ConcurrencyDemo:" + atomicIntegerFieldUpdater.get(user));
    }
}

class User {
    private String name;
    public volatile int old;//     volatile   ,      static
    public User(String name, int old) {
        this.name = name;
        this.old = old;
    }
    public String getName() {
        return name;
    }
    public int getOld() {
        return old;
    }
}
//----------------------------------    -------------------------------------------
ConcurrencyDemo:101

좋은 웹페이지 즐겨찾기