동시 프로 그래 밍 (10) 생산자 / 소비자

9463 단어 스 레 드
생산자, 소비자 디자인 모델 은 다 중 스 레 드 에서 전형 적 인 디자인 모델 이자 면접 에서 자주 묻 는 질문 이다. 본 장 에서 우 리 는 다 중 스 레 드 에서 생산자, 소비자 의 문 제 를 토론 할 것 이다.
1) 생산자, 소비자 문제 에 대한 설명
      생산자, 소비자 디자인 모델 은 사실은 '생산 - 소비 - 창고 - 제품' 모델 로 다음 과 같은 몇 가지 특징 을 가 져 야 한다.
    (1) 생산 자 는 창고 가 채 워 지지 않 을 때 만 제품 을 생산 하고 창고 가 채 워 지면 생산 을 중단한다.
    (2) 소비 자 는 창고 에 제품 의 여분 이 있 을 때 만 소비 하고 창고 가 비어 있 으 면 기다린다.
    (3) 소비자 가 창고 에 제품 의 여분 이 없다 는 것 을 발견 하면 생산자 에 게 생산 을 알 린 다.
    (4) 생산자 가 제품 을 생산 한 후에 기다 리 는 소비자 에 게 소 비 를 통지 합 니 다.
2) 해결 방안
(a) wait / notify / synchronized 방법 으로 구현:
우선 제품 클래스 를 만 듭 니 다.
//   
public class Product {
	
	//    
	private int productId;
	//    
	private String productName;

	public int getProductId() {
		return productId;
	}

	public void setProductId(int productId) {
		this.productId = productId;
	}

	public String getProductName() {
		return productName;
	}

	public void setProductName(String productName) {
		this.productName = productName;
	}

	public Product(int productId, String productName) {
		super();
		this.productId = productId;
		this.productName = productName;
	}
}
이런 종 류 는 비교적 간단 하 다. 주로 제품 류 (제품 번호, 제품 이름) 를 만 들 고 우 리 는 창 고 를 하나 더 만들어 제품 관 리 를 한다.
//   
public class WareHouse {

	//     
	private int size;

	LinkedList<Product> queue = new LinkedList<Product>();

	public WareHouse(int size) {
		this.size = size;
	}

	public void createProduct() {
		try {
			synchronized (queue) {
				while (queue.size() == size) {
					System.out.println("    ");
					System.out.println("Create_Thread[_"
							+ Thread.currentThread().getId() + "_]" + "  ");
					queue.wait();
				}
				int i = (int) Thread.currentThread().getId();
				Product p = new Product(i, "Product__" + i);
				queue.addFirst(p);
				System.out.println("Create_Thread[_" + Thread.currentThread().getId()
						+ "_]:   Product[" + i + "," + p.getProductName() + "],       :"+queue.size());
				queue.notifyAll();

			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

	public void consumeProduct() {
		try {
			synchronized (queue) {
				while (queue.size() == 0) {
					System.out.println("Consume_Thread[_"
							+ Thread.currentThread().getId() + "_]" + "  ,       :"+queue.size());
					queue.wait();
				}
				System.out.println("Consume_Thread[_" + Thread.currentThread().getId()
						+ "_]:   [" + queue.getFirst().getProductId() + ","
						+ queue.getFirst().getProductName() + "],       :"+(queue.size()-1));
				queue.removeFirst();
				queue.notifyAll();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

	public int getSize() {
		return size;
	}

	public void setSize(int size) {
		this.size = size;
	}

	public LinkedList<Product> getQueue() {
		return queue;
	}

	public void setQueue(LinkedList<Product> queue) {
		this.queue = queue;
	}

}
창 고 는 용량 속성 을 가지 고 LinkedList 를 만들어 produt 의 보존 을 실시 하여 두 가지 방법 을 실현 하여 제품 의 생 성과 소 비 를 하고 우리 의 생산자, 소비자 류 를 살 펴 보 았 다.
public class Producer extends Thread {

	private WareHouse house;

	public Producer(WareHouse house) {
		this.house = house;
	}

	@Override
	public void run() {
		house.createProduct();
	}
}
소비자:
public class Customer extends Thread {
	private WareHouse house;

	public Customer(WareHouse house) {
		this.house = house;
	}

	@Override
	public void run() {
		house.consumeProduct();
	}
}
테스트 클래스:
public class Test {     public static void main(String[] args) {         WareHouse house = new WareHouse(5);         for (int i = 0; i < 10; i++) {             Thread pro1 = new Producer(house);             Thread cus1 = new Customer(house);             cus1.start();             pro1.start();         }     } }
테스트 결 과 를 살 펴 보 자.
Consume_Thread[_9_]  ,       :0
Consume_Thread[_11_]  ,       :0
Create_Thread[_10_]:   Product[10,Product__10],       :1
Create_Thread[_8_]:   Product[8,Product__8],       :2
Consume_Thread[_17_]:   [8,Product__8],       :1
Consume_Thread[_11_]:   [10,Product__10],       :0
Consume_Thread[_19_]  ,       :0
Consume_Thread[_9_]  ,       :0
Create_Thread[_14_]:   Product[14,Product__14],       :1
Consume_Thread[_15_]:   [14,Product__14],       :0
Create_Thread[_12_]:   Product[12,Product__12],       :1
Consume_Thread[_13_]:   [12,Product__12],       :0
Create_Thread[_20_]:   Product[20,Product__20],       :1
Consume_Thread[_23_]:   [20,Product__20],       :0
Consume_Thread[_9_]  ,       :0
Consume_Thread[_19_]  ,       :0
Create_Thread[_18_]:   Product[18,Product__18],       :1
Consume_Thread[_21_]:   [18,Product__18],       :0
Create_Thread[_16_]:   Product[16,Product__16],       :1
Create_Thread[_24_]:   Product[24,Product__24],       :2
Consume_Thread[_19_]:   [24,Product__24],       :1
Create_Thread[_26_]:   Product[26,Product__26],       :2
Consume_Thread[_9_]:   [26,Product__26],       :1
Consume_Thread[_25_]:   [16,Product__16],       :0
Create_Thread[_22_]:   Product[22,Product__22],       :1
Consume_Thread[_27_]:   [22,Product__22],       :0
위 와 같이 간단 한 생산자, 소비자 모델 이 구축 되 었 습 니 다. 이런 방식 은 synchronized, wait, notify 등 방법 으로 병행 을 통제 하고 스 레 드 의 기본 적 인 실현 방법 에 속 합 니 다. 사실은 자바 에는 이미 기 존의 실현 유형 이 생산자, 소비자 모델 을 실현 하고 있 습 니 다. 다음은 우리 가 살 펴 보 겠 습 니 다.
(b) BlockingQueue 차단 대기 열 방식
창고 류 를 수정 해 보 겠 습 니 다.
public class WareHouse2 {
	//     
	private int size;

	private LinkedBlockingQueue<Product> queue;

	public WareHouse2(int size) {
		this.size = size;
		this.queue=new LinkedBlockingQueue<Product>(size);
	}

	public void createProduct() {
		try {
			int i = (int) Thread.currentThread().getId();
			Product p = new Product(i, "Product__" + i);
			queue.put(p);
			System.out.println("Thread_create>:["+Thread.currentThread().getId()+"],queue_size:"+queue.size());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void consumeProduct() {
		try {
			Product pt=queue.take();
			System.out.println("Thread_consume:["+Thread.currentThread().getId()+"],Product:["+pt.getProductId()+"]");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}
실행 결과:
Thread_create>:[8],queue_size:1
Thread_create>:[12],queue_size:2
Thread_create>:[16],queue_size:3
Thread_consume:[9],Product:[8]
Thread_consume:[11],Product:[12]
Thread_consume:[13],Product:[16]
Thread_create>:[10],queue_size:1
Thread_consume:[15],Product:[10]
Thread_consume:[19],Product:[14]
Thread_create>:[18],queue_size:1
Thread_create>:[20],queue_size:2
Thread_consume:[25],Product:[18]
Thread_create>:[24],queue_size:2
Thread_create>:[14],queue_size:2
Thread_consume:[17],Product:[20]
Thread_consume:[21],Product:[24]
Thread_consume:[23],Product:[22]
Thread_create>:[22],queue_size:0
Thread_consume:[27],Product:[26]
Thread_create>:[26],queue_size:0
소스 코드 를 통 해 이런 방식 의 코드 가 매우 간결 해 야 한 다 는 것 을 알 수 있 습 니 다. 그리고 창고 류 에서 우리 가 사용 하 는 put (), take () 방법 은 용량 이 최대 에 이 르 렀 을 때 put 는 자동 으로 스 레 드 를 막 습 니 다. take 방법 은 용량 이 0 일 때 도 자동 으로 스 레 드 를 막 습 니 다. 소스 코드 를 살 펴 보 겠 습 니 다.
   public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
우 리 는 방법 에 wait (), signal () 등 동기 화 방법 을 자동 으로 밀봉 한 것 을 발견 할 수 있 기 때문에 우리 프로그램 이 스스로 처리 할 필요 가 없다. 난이도 와 안전성 의 정확성 에 있어 서 모두 가 차단 대열 을 사용 하여 생산자, 소비자 모델 을 실현 하 는 것 을 권장 한다.

좋은 웹페이지 즐겨찾기