JDK7에서 TransferQueue의 사용 및 TransferQueue와 SynchronousQueue의 차이

JDK7은 JDK5의 J.U.C 병행 도구를 강화했는데 그 중 하나는 TransferQueue를 추가한 것이다.자바와 관련된 JSR 규범은 Doug Lea가 관리하는 블로그를 볼 수 있습니다.이제 이런 종류의 사용 방식을 간단하게 소개합니다.
public interface TransferQueue extends BlockingQueue
{
    /**
     * Transfers the element to a waiting consumer immediately, if possible.
     *
     * 

More precisely, transfers the specified element immediately * if there exists a consumer already waiting to receive it (in * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), * otherwise returning {@code false} without enqueuing the element. * * @param e the element to transfer * @return {@code true} if the element was transferred, else * {@code false} * @throws ClassCastException if the class of the specified element * prevents it from being added to this queue * @throws NullPointerException if the specified element is null * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ boolean tryTransfer(E e); /** * Transfers the element to a consumer, waiting if necessary to do so. * *

More precisely, transfers the specified element immediately * if there exists a consumer already waiting to receive it (in * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), * else waits until the element is received by a consumer. * * @param e the element to transfer * @throws InterruptedException if interrupted while waiting, * in which case the element is not left enqueued * @throws ClassCastException if the class of the specified element * prevents it from being added to this queue * @throws NullPointerException if the specified element is null * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ void transfer(E e) throws InterruptedException; /** * Transfers the element to a consumer if it is possible to do so * before the timeout elapses. * *

More precisely, transfers the specified element immediately * if there exists a consumer already waiting to receive it (in * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), * else waits until the element is received by a consumer, * returning {@code false} if the specified wait time elapses * before the element can be transferred. * * @param e the element to transfer * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the * {@code timeout} parameter * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before completion, * in which case the element is not left enqueued * @throws InterruptedException if interrupted while waiting, * in which case the element is not left enqueued * @throws ClassCastException if the class of the specified element * prevents it from being added to this queue * @throws NullPointerException if the specified element is null * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * Returns {@code true} if there is at least one consumer waiting * to receive an element via {@link #take} or * timed {@link #poll(long,TimeUnit) poll}. * The return value represents a momentary state of affairs. * * @return {@code true} if there is at least one waiting consumer */ boolean hasWaitingConsumer(); /** * Returns an estimate of the number of consumers waiting to * receive elements via {@link #take} or timed * {@link #poll(long,TimeUnit) poll}. The return value is an * approximation of a momentary state of affairs, that may be * inaccurate if consumers have completed or given up waiting. * The value may be useful for monitoring and heuristics, but * not for synchronization control. Implementations of this * method are likely to be noticeably slower than those for * {@link #hasWaitingConsumer}. * * @return the number of consumers waiting to receive elements */ int getWaitingConsumerCount(); }


TransferQueue는 또한 막힌 대기열로 막힌 대기열의 모든 특성을 갖추고 있으며 주로 상기 5개의 새로운 API의 역할을 소개한다.
1.transfer(Ee) 현재 획득을 기다리는 소비자 라인이 존재하면 즉시 e를 넘겨준다.그렇지 않으면 원소 e를 대기열 끝에 삽입하고 현재 라인이 막힌 상태로 들어가 소비자 라인이 이 원소를 가져갈 때까지 합니다.
public class TransferQueueDemo {

	private static TransferQueue queue = new LinkedTransferQueue();

	public static void main(String[] args) throws Exception {

		new Productor(1).start();

		Thread.sleep(100);

		System.out.println("over.size=" + queue.size());
	}

	static class Productor extends Thread {
		private int id;

		public Productor(int id) {
			this.id = id;
		}

		@Override
		public void run() {
			try {
				String result = "id=" + this.id;
				System.out.println("begin to produce." + result);
				queue.transfer(result);
				System.out.println("success to produce." + result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

트랜스퍼 () 를 호출할 때 데이터를 얻기를 기다리는 소비자가 없기 때문에 생산자의 라인이 막히는 것을 볼 수 있다.대기열의 길이가 1이 된 것은 원소 e가 성공적으로 이관되지 않았을 때 막힌 대기열의 끝에 삽입된다는 것을 의미한다.
2.tryTransfer(Ee) 현재 가져오기를 기다리는 소비자 라인이 존재하면 이 방법은 즉시 e를 이동하고true로 돌아갑니다.존재하지 않으면false로 되돌아오지만, e를 대기열에 삽입하지 않습니다.이 방법은 현재 라인을 막지 않습니다.true로 되돌아오거나false로 되돌아오십시오.
3. hasWaitingConsumer()와 getWaitingConsumerCount()는 현재 소비를 기다리고 있는 소비자의 라인 개수를 판단하는 데 사용된다.
4.tryTransfer(E, long timeout, Time Unit unit)는 현재 가져오기를 기다리는 소비자 라인이 존재하면 즉시 전송한다.그렇지 않으면 원소 e를 대기열 끝에 삽입하고 소비자 라인에서 소비를 얻기를 기다립니다.지정한 시간 안에 원소 e를 소비자 라인에서 가져올 수 없으면false를 되돌려주고 이 원소는 대기열에서 제거합니다.
public class TransferQueueDemo {

	private static TransferQueue queue = new LinkedTransferQueue();

	public static void main(String[] args) throws Exception {

		new Productor(1).start();

		Thread.sleep(100);

		System.out.println("over.size=" + queue.size());//1
		
		Thread.sleep(1500);

		System.out.println("over.size=" + queue.size());//0
	}

	static class Productor extends Thread {
		private int id;

		public Productor(int id) {
			this.id = id;
		}

		@Override
		public void run() {
			try {
				String result = "id=" + this.id;
				System.out.println("begin to produce." + result);
				queue.tryTransfer(result, 1, TimeUnit.SECONDS);
				System.out.println("success to produce." + result);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

첫 번째로 지정된 시간이 되지 않아 요소가 대기열에 삽입되었고 모든 대기열의 길이는 1이다.두 번째로 지정한 시간 필름이 다 소모되어 원소가 대기열에서 제거되었기 때문에 대기열 길이는 0입니다.
이 글은 SynchronousQueue의 사용 방식을 설명하고 있는데, TransferQueue도 SynchronousQueue의 모든 기능을 가지고 있지만 TransferQueue의 기능은 더욱 강하다는 것을 알 수 있다.이 문서에서는 두 API의 차이점을 다룹니다.
TransferQueue is more generic and useful than SynchronousQueue however as it allows you to flexibly decide whether to use normal BlockingQueue 
semantics or a guaranteed hand-off. In the case where items are already in the queue, calling transfer will guarantee that all existing queue 
items will be processed before the transferred item.


SynchronousQueue implementation uses dual queues (for waiting producers and waiting consumers) and protects both queues with a single lock. The
 LinkedTransferQueue implementation uses CAS operations to form a nonblocking implementation and that is at the heart of avoiding serialization
 bottlenecks.

좋은 웹페이지 즐겨찾기