자바 소비자 생산자 모델 및 JDK 차단 대기 열 LinkedBlockingQueue 구현

7436 단어 병발 하 다
생산자 소비자 문제
   (영어:Producer-consumer problem)는 유한 버퍼 문제(영어:Bounded-buffer problem)라 고도 부 르 며 다 중 스 레 드 동기 화 문제 의 전형 적 인 사례 입 니 다.이 문 제 는 고정 크기 버퍼 를 공유 하 는 두 개의 스 레 드,즉'생산자'와'소비자'가 실제 운행 할 때 발생 하 는 문 제 를 묘사 했다.생산자 의 주요 역할 은 일 정량의 데 이 터 를 생 성하 여 버퍼 에 넣 고 이 과정 을 반복 하 는 것 이다.이와 함께 소비자 들 도 버퍼 에서 이 데 이 터 를 소모 하고 있다.이 문 제 는 생산자 가 버퍼 가 가득 찼 을 때 데 이 터 를 넣 지 않 고 소비자 도 버퍼 가 비어 있 을 때 데 이 터 를 소모 하지 않도록 하 는 것 이다.
    이 문 제 를 해결 하려 면 생산자 가 버퍼 가 가득 찼 을 때 휴면(아니면 아예 데 이 터 를 포기)하고 다음 소비자 가 버퍼 에 있 는 데 이 터 를 소모 할 때 까지 기 다 려 야 생산자 가 깨 어 나 버퍼 에 데 이 터 를 추가 할 수 있다.마찬가지 로 소비자 들 이 버퍼 가 비 었 을 때 휴면 에 들 어가 생산자 가 버퍼 에 데 이 터 를 추가 한 후에 소비 자 를 깨 울 수도 있다.일반적으로 프로 세 스 간 통신 방법 으로 이 문 제 를 해결 하 는데 자주 사용 하 는 방법 은 신호등 법[1]등 이 있다.해결 방법 이 완선 되 지 않 으 면,자물쇠 가 잠 겨 있 는 상황 이 발생 하기 쉽다.자물쇠 가 생기 면 두 라인 모두 휴면 에 빠 져 상대방 이 자신 을 깨 우 기 를 기다린다.이 문 제 는 여러 생산자 와 소비자 의 상황 에 도 보 급 될 수 있다.
현실 에서 의 응용
     예 를 들 어 식당 에는 요리사 와 종업원 이 있 습 니 다.이 종업원 은 요리사 가 음식 을 준비 할 때 까지 기 다 려 야 한다.요리사 가 준비 가 다 되 었 을 때,그 는 종업원 에 게 통지 한 후에 종업원 이 요 리 를 내 고 돌아 와 계속 기다 릴 것 이다.이것 은 임무 협력의 실례 이다.요리 사 는 생산 자 를 대표 하고 종업원 은 소비 자 를 대표 한다.두 가지 임 무 는 음식 이 생산 되 고 소 비 될 때 악 수 를 해 야 하고 시스템 은 질서 있 게 닫 아야 한다.
      다음 그림 으로 이런 관 계 를 나 타 낼 수 있다.
     
생산자 소비자 의 실현
여 기 는 차단 대기 열 LinkedBlockingQueue 로 이 루어 집 니 다.차단 대기 열 입 니 다.
package com.a.consumer;

import java.util.concurrent.*;

public class consumer3 {
    //         
    private LinkedBlockingQueue queue = new LinkedBlockingQueue(10);

    public consumer3() {
    }

    public void start() {
        new Producer().start();
        new Consumer().start();
    }

    public static void main(String[] args) throws Exception {
    	consumer3 s3 = new consumer3();
        s3.start();
    }

    class Producer extends Thread {
        public void run() {
            while (true) {
                try {
                    Object o = new Object();
                    //       
                    queue.put(o); //         
                    System.out.println("Producer: " + o);
                } catch (InterruptedException e) {
                	e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {
        public void run() {
            while (true) {
                try {
                    //       
                    Object o = queue.take();
                    System.out.println("Consumer: " + o);
                } catch (InterruptedException e) {
                	e.printStackTrace();
                }
            }
        }
    }

}

BlockingQueue 는 인터페이스 입 니 다.
BlockingQueue 가 비어 있 으 면 BlockingQueue 에서 물건 을 가 져 오 는 작업 이 차단 되 어 대기 상태 로 들 어 갑 니 다.BlockingQueue 에 물건 이 들 어 갈 때 까지 깨 어 납 니 다.마찬가지 로 BlockingQueue 가 가득 차 면 안에 물건 을 저장 하려 는 모든 작업 도 차단 되 어 대기 상태 로 들 어 갑 니 다.BlockingQueue 에 공간 이 있어 야 깨 어 나 계속 작업 할 수 있 습 니 다.      BlockingQueue 를 사용 하 는 핵심 기술 포 인 트 는 다음 과 같 습 니 다.      1.BlockingQueue 가 정의 하 는 일반적인 방법 은 다음 과 같다.          1)add(anObject):anObject 를 BlockingQueue 에 추가 합 니 다.즉,BlockingQueue 가 수용 할 수 있다 면 true 로 돌아 갑 니 다.그렇지 않 으 면 이상 을 보고 합 니 다.          2)offer(anObject):가능 하 다 면 anObject 를 BlockingQueue 에 추가 합 니 다.즉,BlockingQueue 가 수용 할 수 있다 면 true 로 돌아 갑 니 다.그렇지 않 으 면 false 로 돌아 갑 니 다.        3)put(anObject):anObject 를 BlockingQueue 에 추가 합 니 다.BlockingQueue 에 공간 이 없 으 면 이 방법 을 사용 하 는 스 레 드 가 차단 되 어 BlockingQueue 에 공간 이 있 을 때 까지 계속 합 니 다.          4)poll(time):BlockingQueue 에서 1 위 를 차지 하 는 대상 을 가 져 가 고 즉시 꺼 내지 못 하면 time 매개 변수 가 정 한 시간 을 기 다 려 서 null 로 되 돌아 갈 수 있 습 니 다.          5)take():BlockingQueue 에서 1 위 대상 을 가 져 가 BlockingQueue 가 비어 있 으 면 Blocking 에 새로운 대상 이 가입 할 때 까지 대기 상태 로 들 어 가 는 것 을 차단 합 니 다.      2.BlockingQueue 는 네 가지 구체 적 인 실현 유형 이 있 는데 서로 다른 수요 에 따라 서로 다른 실현 유형 을 선택한다.          1)Array BlockingQueue:크기 를 정 한 BlockingQueue 의 구조 함 수 는 int 매개 변 수 를 가지 고 크기 를 가 리 켜 야 합 니 다.그 대상 은 FIFO(선 입 선 출)순서 로 정렬 되 어 있 습 니 다.          2)LinkedBlockingQueue:크기 가 정 해 지지 않 은 BlockingQueue.구조 함수 가 정 해진 크기 의 매개 변 수 를 가지 고 있 으 면 생 성 된 BlockingQueue 는 크기 제한 이 있 습 니 다.크기 매개 변 수 를 가지 고 있 지 않 으 면 생 성 된 BlockingQueue 의 크기 는 Integer.MAX 입 니 다.VALUE 가 결정 합 니 다.그 대상 은 FIFO(선 입 선 출)순서 로 정렬 됩 니 다.          3)Priority BlockingQueue:링크 드 BlockQueue 와 유사 하지만 그 대상 의 순 서 는 FIFO 가 아니 라 대상 의 자연 정렬 순서 나 구조 함수 의 Comparator 에 따라 결정 되 는 순서 입 니 다.          4)SynchronousQueue:특수 한 BlockingQueue,그 조작 은 반드시 놓 고 가 져 와 교체 해 야 합 니 다.      3.LinkedBlockingQueue 와 Array BlockingQueue 를 비교 해 보면 뒤에 사용 하 는 데이터 구조 가 다 르 기 때문에 LinkedBlockingQueue 의 데이터 스루풋 은 Array BlockingQueue 보다 크 지만 스 레 드 수량 이 많 을 때 그 성능 의 예견 성 은 Array BlockingQueue 보다 낮다. 
링크 드 BlockingQueue 의 소스 코드 를 연구 해 보 겠 습 니 다.
LinkedBlockingQueue 실현 은 라인 이 안전 하고 선진 적 인 선 출 등 특성 을 실 현 했 기 때문에 생산자 소비자 로 서 의 첫 번 째 선택 입 니 다.LinkedBlockingQueue 는 용량 을 지정 할 수도 있 고 지정 하지 않 을 수도 있 습 니 다.지정 하지 않 으 면 기본적으로 최대 Integer.MAX 입 니 다.VALUE 는 주로 put 와 take 방법 을 사용 합 니 다.put 방법 은 대기 열 이 가득 찼 을 때 대기 열 구성원 이 소 비 될 때 까지 막 히 고 take 방법 은 대기 열 이 비 었 을 때 막 힙 니 다.대기 열 구성원 이 들 어 올 때 까지 막 힙 니 다.
offer 와 pool 방법 은 두 가지 가 있어 요.
offer 실패 시 false 로 돌아 갑 니 다.
poll 실패 하면 null 로 돌아 갑 니 다.
일단 puut 방법 을 볼 게 요.
이 puutLock.lockInterruptibly()방법 을 사용 하여 이 puutLock 자 물 쇠 를 가 져 오 려 고 합 니 다.
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();  
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            while (count.get() == capacity) { 
                    notFull.await();
            }
            enqueue(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

보다
lockInterruptibly()방법의 원본 코드 입 니 다.실제 적 으로 호출 된 sync 라 는 동기 화기 의 acquire Interruptibly 방법 입 니 다.
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

acquireInterruptibly 를 보 는 방법 은 현재 스 레 드 인 터 럽 트 표지 위치 가 true 인지 확인 하 는 것 입 니 다.true 일 때 인 터 럽 트 이상 을 던 집 니 다.그렇지 않 으 면 자 물 쇠 를 가 져 오 려 고 합 니 다.자 물 쇠 를 얻 지 못 했 을 때 doAcquire Interruptibly 라 는 방법 을 실행 합 니 다.
4.567913.다음은 doAcquire Interruptibly 라 는 방법 을 살 펴 보 겠 습 니 다.shouldPark After Failed Acquire 라 는 방법 에 주의해 야 합 니 다.바로 true 일 때 파 크 앤 드 체크 인 터 럽 트()라 는 방법 을 계속 실행 합 니 다.이것 도 사실 일 때 현재 순환 에서 벗 어 나 자 물 쇠 를 가 져 오 는 것 을 취소 하고 이상 을 던 집 니 다.
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

좋은 웹페이지 즐겨찾기